mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-02-21 17:26:37 +00:00
WIP.
This commit is contained in:
parent
8dfaefb10c
commit
a7280d9c21
66
chirpstack/src/applayer/fragmentation.rs
Normal file
66
chirpstack/src/applayer/fragmentation.rs
Normal file
@ -0,0 +1,66 @@
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::storage::fields::device_profile::Ts004Version;
|
||||
use crate::storage::{device, device_profile, fuota};
|
||||
use lrwn::applayer::fragmentation;
|
||||
|
||||
pub async fn handle_uplink(
|
||||
dev: &device::Device,
|
||||
dp: &device_profile::DeviceProfile,
|
||||
data: &[u8],
|
||||
) -> Result<()> {
|
||||
let version = dp
|
||||
.app_layer_params
|
||||
.ts004_version
|
||||
.ok_or_else(|| anyhow!("Device does not support TS004"))?;
|
||||
|
||||
match version {
|
||||
Ts004Version::V100 => handle_uplink_v100(dev, data).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> {
|
||||
let pl = fragmentation::v1::Payload::from_slice(true, data)?;
|
||||
|
||||
match pl {
|
||||
fragmentation::v1::Payload::FragSessionSetupAns(pl) => {
|
||||
handle_v1_frag_session_setup_ans(dev, pl).await?
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_v1_frag_session_setup_ans(
|
||||
dev: &device::Device,
|
||||
pl: fragmentation::v1::FragSessionSetupAnsPayload,
|
||||
) -> Result<()> {
|
||||
info!("Handling FragSessionSetupAns");
|
||||
|
||||
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
|
||||
|
||||
if pl.encoding_unsupported
|
||||
| pl.not_enough_memory
|
||||
| pl.frag_session_index_not_supported
|
||||
| pl.wrong_descriptor
|
||||
{
|
||||
warn!(
|
||||
frag_index = pl.frag_index,
|
||||
encoding_unsupported = pl.encoding_unsupported,
|
||||
not_enough_memory = pl.not_enough_memory,
|
||||
frag_session_index_not_supported = pl.frag_session_index_not_supported,
|
||||
wrong_descriptor = pl.wrong_descriptor,
|
||||
"FragSessionAns contains errors"
|
||||
);
|
||||
fuota_dev.return_msg = format!("Error: FragSessionAns response encoding_unsupported={}, not_enough_memory={}, frag_session_index_not_supported={}, wrong_descriptor={}", pl.encoding_unsupported, pl.not_enough_memory, pl.frag_session_index_not_supported, pl.wrong_descriptor);
|
||||
} else {
|
||||
fuota_dev.frag_session_setup_completed_at = Some(Utc::now());
|
||||
}
|
||||
|
||||
let _ = fuota::update_device(fuota_dev).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
420
chirpstack/src/applayer/fuota/flow.rs
Normal file
420
chirpstack/src/applayer/fuota/flow.rs
Normal file
@ -0,0 +1,420 @@
|
||||
use anyhow::Result;
|
||||
use chrono::{TimeDelta, Utc};
|
||||
use tracing::info;
|
||||
|
||||
use lrwn::applayer::{fragmentation, multicastsetup};
|
||||
use lrwn::region::MacVersion;
|
||||
|
||||
use crate::config;
|
||||
use crate::downlink;
|
||||
use crate::gpstime::ToGpsTime;
|
||||
use crate::storage::fields::FuotaJob;
|
||||
use crate::storage::{device_keys, device_profile, device_queue, fuota, multicast};
|
||||
|
||||
pub struct Flow {
|
||||
job: fuota::FuotaDeploymentJob,
|
||||
fuota_deployment: fuota::FuotaDeployment,
|
||||
device_profile: device_profile::DeviceProfile,
|
||||
}
|
||||
|
||||
impl Flow {
|
||||
pub async fn handle_job(job: fuota::FuotaDeploymentJob) -> Result<()> {
|
||||
let fuota_deployment = fuota::get_deployment(job.fuota_deployment_id.into()).await?;
|
||||
let device_profile =
|
||||
device_profile::get(&fuota_deployment.device_profile_id.into()).await?;
|
||||
|
||||
let mut flow = Flow {
|
||||
job,
|
||||
fuota_deployment,
|
||||
device_profile,
|
||||
};
|
||||
flow.dispatch().await
|
||||
}
|
||||
|
||||
async fn dispatch(&mut self) -> Result<()> {
|
||||
let conf = config::get();
|
||||
self.job.attempt_count += 1;
|
||||
|
||||
info!(attempt_count = self.job.attempt_count, "Starting job");
|
||||
|
||||
let resp = match self.job.job {
|
||||
FuotaJob::CreateMcGroup => self.create_mc_group().await,
|
||||
FuotaJob::AddDevsToMcGroup => self.add_devices_to_multicast_group().await,
|
||||
FuotaJob::AddGwsToMcGroup => self.add_gateways_to_multicast_group().await,
|
||||
FuotaJob::McGroupSetup => self.multicast_group_setup().await,
|
||||
FuotaJob::FragSessionSetup => self.fragmentation_session_setup().await,
|
||||
FuotaJob::McSession => self.multicast_session_setup().await,
|
||||
FuotaJob::Enqueue => self.enqueue().await,
|
||||
FuotaJob::FragStatus => self.fragmentation_status().await,
|
||||
};
|
||||
|
||||
match resp {
|
||||
Ok(Some(next_job)) => {
|
||||
if self.job.job == next_job {
|
||||
// Re-run the same job in the future.
|
||||
let job = self.job.clone();
|
||||
let _ = fuota::update_job(job).await?;
|
||||
} else {
|
||||
// Create the next job (which automatically sets the current job to completed).
|
||||
let _ = fuota::create_job(fuota::FuotaDeploymentJob {
|
||||
fuota_deployment_id: self.job.fuota_deployment_id,
|
||||
job: next_job,
|
||||
max_retry_count: match next_job {
|
||||
FuotaJob::FragSessionSetup | FuotaJob::McGroupSetup => {
|
||||
self.fuota_deployment.unicast_max_retry_count
|
||||
}
|
||||
_ => 0,
|
||||
},
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
// No further jobs to execute, set the current job to completed.
|
||||
let mut job = self.job.clone();
|
||||
job.completed_at = Some(Utc::now());
|
||||
let _ = fuota::update_job(job).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
// Re-run the same job in the future.
|
||||
let mut job = self.job.clone();
|
||||
job.scheduler_run_after = Utc::now() + conf.network.scheduler.interval;
|
||||
job.return_msg = format!("Error: {}", e);
|
||||
let _ = fuota::update_job(job).await?;
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_mc_group(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// If this job fails, then there is no need to execute the others.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
info!("Creating multicast-group for FUOTA deployment");
|
||||
|
||||
// Get McAppSKey + McNwkSKey.
|
||||
let mc_app_s_key = multicastsetup::v1::get_mc_app_s_key(
|
||||
self.fuota_deployment.multicast_key,
|
||||
self.fuota_deployment.multicast_addr,
|
||||
)?;
|
||||
let mc_nwk_s_key = multicastsetup::v1::get_mc_net_s_key(
|
||||
self.fuota_deployment.multicast_key,
|
||||
self.fuota_deployment.multicast_addr,
|
||||
)?;
|
||||
|
||||
let _ = multicast::create(multicast::MulticastGroup {
|
||||
id: self.fuota_deployment.id,
|
||||
application_id: self.fuota_deployment.application_id,
|
||||
name: format!("fuota-{}", self.fuota_deployment.id),
|
||||
region: self.device_profile.region,
|
||||
mc_addr: self.fuota_deployment.multicast_addr,
|
||||
mc_nwk_s_key,
|
||||
mc_app_s_key,
|
||||
f_cnt: 0,
|
||||
group_type: self.fuota_deployment.multicast_group_type.clone(),
|
||||
frequency: self.fuota_deployment.multicast_frequency,
|
||||
dr: self.fuota_deployment.multicast_dr,
|
||||
class_b_ping_slot_nb_k: self.fuota_deployment.multicast_class_b_ping_slot_nb_k,
|
||||
class_c_scheduling_type: self.fuota_deployment.multicast_class_c_scheduling_type,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Some(FuotaJob::AddDevsToMcGroup))
|
||||
}
|
||||
|
||||
async fn add_devices_to_multicast_group(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// If this job fails, then there is no need to execute the others.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
info!("Adding devices to multicast-group");
|
||||
|
||||
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
|
||||
for fuota_d in fuota_devices {
|
||||
multicast::add_device(&fuota_d.fuota_deployment_id, &fuota_d.dev_eui).await?;
|
||||
}
|
||||
|
||||
Ok(Some(FuotaJob::AddGwsToMcGroup))
|
||||
}
|
||||
|
||||
async fn add_gateways_to_multicast_group(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// If this job fails, then there is no need to execute the others.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(None);
|
||||
}
|
||||
|
||||
info!("Adding gateways to multicast-group (if any)");
|
||||
|
||||
let fuota_gws = fuota::get_gateways(self.job.fuota_deployment_id.into(), -1, 0).await?;
|
||||
for fuota_gw in fuota_gws {
|
||||
multicast::add_gateway(&fuota_gw.fuota_deployment_id, &fuota_gw.gateway_id).await?;
|
||||
}
|
||||
|
||||
Ok(Some(FuotaJob::McGroupSetup))
|
||||
}
|
||||
|
||||
async fn multicast_group_setup(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// Proceed with next step after reaching the max attempts.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(Some(FuotaJob::FragSessionSetup));
|
||||
}
|
||||
|
||||
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
|
||||
|
||||
// Filter on devices that have not completed the McGroupSetup.
|
||||
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
|
||||
.into_iter()
|
||||
.filter(|d| d.mc_group_setup_completed_at.is_none())
|
||||
.collect();
|
||||
|
||||
for fuota_dev in &fuota_devices {
|
||||
let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?;
|
||||
let mc_root_key = match self.device_profile.mac_version {
|
||||
MacVersion::LORAWAN_1_0_0
|
||||
| MacVersion::LORAWAN_1_0_1
|
||||
| MacVersion::LORAWAN_1_0_2
|
||||
| MacVersion::LORAWAN_1_0_3
|
||||
| MacVersion::LORAWAN_1_0_4 => {
|
||||
multicastsetup::v1::get_mc_root_key_for_gen_app_key(dev_keys.gen_app_key)?
|
||||
}
|
||||
MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => {
|
||||
multicastsetup::v1::get_mc_root_key_for_app_key(dev_keys.app_key)?
|
||||
}
|
||||
};
|
||||
let mc_ke_key = multicastsetup::v1::get_mc_ke_key(mc_root_key)?;
|
||||
let mc_key_encrypted =
|
||||
multicastsetup::v1::encrypt_mc_key(mc_ke_key, self.fuota_deployment.multicast_key);
|
||||
|
||||
let pl = multicastsetup::v1::Payload::McGroupSetupReq(
|
||||
multicastsetup::v1::McGroupSetupReqPayload {
|
||||
mc_group_id_header: multicastsetup::v1::McGroupSetupReqPayloadMcGroupIdHeader {
|
||||
mc_group_id: 0,
|
||||
},
|
||||
mc_addr: self.fuota_deployment.multicast_addr,
|
||||
mc_key_encrypted,
|
||||
min_mc_f_count: 0,
|
||||
max_mc_f_count: u32::MAX,
|
||||
},
|
||||
);
|
||||
|
||||
device_queue::enqueue_item(device_queue::DeviceQueueItem {
|
||||
dev_eui: fuota_dev.dev_eui,
|
||||
f_port: self.device_profile.app_layer_params.ts005_f_port.into(),
|
||||
data: pl.to_vec()?,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !fuota_devices.is_empty() {
|
||||
// There are devices pending setup, we need to re-run this job.
|
||||
self.job.scheduler_run_after =
|
||||
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
|
||||
Ok(Some(FuotaJob::McGroupSetup))
|
||||
} else {
|
||||
// All devices have completed the setup, move on to next job.
|
||||
Ok(Some(FuotaJob::FragSessionSetup))
|
||||
}
|
||||
}
|
||||
|
||||
async fn fragmentation_session_setup(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// Proceed with next step after reaching the max attempts.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(Some(FuotaJob::McSession));
|
||||
}
|
||||
|
||||
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
|
||||
let fragments =
|
||||
(self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize;
|
||||
let padding =
|
||||
(fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size;
|
||||
|
||||
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
|
||||
|
||||
// Filter on devices that have completed the previous step, but not yet the FragSessionSetup.
|
||||
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
|
||||
.into_iter()
|
||||
.filter(|d| {
|
||||
d.mc_group_setup_completed_at.is_some()
|
||||
&& d.frag_session_setup_completed_at.is_none()
|
||||
})
|
||||
.collect();
|
||||
|
||||
for fuota_dev in &fuota_devices {
|
||||
let pl = fragmentation::v1::Payload::FragSessionSetupReq(
|
||||
fragmentation::v1::FragSessionSetupReqPayload {
|
||||
frag_session: fragmentation::v1::FragSessionSetuReqPayloadFragSession {
|
||||
mc_group_bit_mask: [true, false, false, false],
|
||||
frag_index: 0,
|
||||
},
|
||||
nb_frag: fragments as u16,
|
||||
frag_size: fragment_size as u8,
|
||||
padding: padding as u8,
|
||||
control: fragmentation::v1::FragSessionSetuReqPayloadControl {
|
||||
block_ack_delay: 0,
|
||||
fragmentation_matrix: 0,
|
||||
},
|
||||
descriptor: [0, 0, 0, 0],
|
||||
},
|
||||
);
|
||||
|
||||
device_queue::enqueue_item(device_queue::DeviceQueueItem {
|
||||
dev_eui: fuota_dev.dev_eui,
|
||||
f_port: self.device_profile.app_layer_params.ts004_f_port.into(),
|
||||
data: pl.to_vec()?,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
if !fuota_devices.is_empty() {
|
||||
// There are devices pending setup, we need to re-run this job.
|
||||
self.job.scheduler_run_after =
|
||||
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
|
||||
Ok(Some(FuotaJob::FragSessionSetup))
|
||||
} else {
|
||||
// All devices have completed the setup, move on to next job.
|
||||
Ok(Some(FuotaJob::McSession))
|
||||
}
|
||||
}
|
||||
|
||||
async fn multicast_session_setup(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// Proceed with next step after reaching the max attempts.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(Some(FuotaJob::Enqueue));
|
||||
}
|
||||
|
||||
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
|
||||
|
||||
// Filter on devices that have completed the previous step, but not yet the McSession.
|
||||
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
|
||||
.into_iter()
|
||||
.filter(|d| {
|
||||
d.frag_session_setup_completed_at.is_some() && d.mc_session_completed_at.is_none()
|
||||
})
|
||||
.collect();
|
||||
|
||||
for fuota_dev in &fuota_devices {
|
||||
// We want to start the session (retry_count + 1) x the uplink_interval.
|
||||
// Note that retry_count=0 means only one attempt.
|
||||
let session_start = (Utc::now()
|
||||
+ TimeDelta::seconds(
|
||||
(self.job.max_retry_count as i64 + 1)
|
||||
* self.device_profile.uplink_interval as i64,
|
||||
))
|
||||
.to_gps_time()
|
||||
.num_seconds()
|
||||
% (1 << 32);
|
||||
|
||||
let pl = match self.fuota_deployment.multicast_group_type.as_ref() {
|
||||
"B" => multicastsetup::v1::Payload::McClassBSessionReq(
|
||||
multicastsetup::v1::McClassBSessionReqPayload {
|
||||
mc_group_id_header:
|
||||
multicastsetup::v1::McClassBSessionReqPayloadMcGroupIdHeader {
|
||||
mc_group_id: 0,
|
||||
},
|
||||
session_time: (session_start - (session_start % 128)) as u32,
|
||||
time_out_periodicity:
|
||||
multicastsetup::v1::McClassBSessionReqPayloadTimeOutPeriodicity {
|
||||
time_out: self.fuota_deployment.multicast_timeout as u8,
|
||||
periodicity: self.fuota_deployment.multicast_class_b_ping_slot_nb_k
|
||||
as u8,
|
||||
},
|
||||
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
|
||||
dr: self.fuota_deployment.multicast_dr as u8,
|
||||
},
|
||||
),
|
||||
"C" => multicastsetup::v1::Payload::McClassCSessionReq(
|
||||
multicastsetup::v1::McClassCSessionReqPayload {
|
||||
mc_group_id_header:
|
||||
multicastsetup::v1::McClassCSessionReqPayloadMcGroupIdHeader {
|
||||
mc_group_id: 0,
|
||||
},
|
||||
session_time: session_start as u32,
|
||||
session_time_out:
|
||||
multicastsetup::v1::McClassCSessionReqPayloadSessionTimeOut {
|
||||
time_out: self.fuota_deployment.multicast_timeout as u8,
|
||||
},
|
||||
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
|
||||
dr: self.fuota_deployment.multicast_dr as u8,
|
||||
},
|
||||
),
|
||||
_ => {
|
||||
return Err(anyhow!(
|
||||
"Unsupported group-type: {}",
|
||||
self.fuota_deployment.multicast_group_type
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
device_queue::enqueue_item(device_queue::DeviceQueueItem {
|
||||
dev_eui: fuota_dev.dev_eui,
|
||||
f_port: self.device_profile.app_layer_params.ts005_f_port.into(),
|
||||
data: pl.to_vec()?,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
// In this case we need to exactly try the max. attempts, because this is what the
|
||||
// session-start time calculation is based on. If we continue with enqueueing too
|
||||
// early, the multicast-session hasn't started yet.
|
||||
self.job.scheduler_run_after =
|
||||
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
|
||||
Ok(Some(FuotaJob::McSession))
|
||||
}
|
||||
|
||||
async fn enqueue(&mut self) -> Result<Option<FuotaJob>> {
|
||||
// Proceed with next step after reaching the max attempts.
|
||||
if self.job.attempt_count - 1 > self.job.max_retry_count {
|
||||
return Ok(Some(FuotaJob::FragStatus));
|
||||
}
|
||||
|
||||
let payload_length = self.fuota_deployment.payload.len();
|
||||
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
|
||||
let padding = (fragment_size - (payload_length % fragment_size)) % fragment_size;
|
||||
|
||||
let fragments = (payload_length as f32 / fragment_size as f32).ceil() as usize;
|
||||
let redundancy = (fragments as f32
|
||||
* self.fuota_deployment.fragmentation_redundancy_percentage as f32
|
||||
/ 100.0)
|
||||
.ceil() as usize;
|
||||
|
||||
let mut payload = self.fuota_deployment.payload.clone();
|
||||
payload.extend_from_slice(&vec![0; padding]);
|
||||
|
||||
let encoded_fragments = fragmentation::v1::encode(&payload, fragment_size, redundancy)?;
|
||||
|
||||
for (i, frag) in encoded_fragments.iter().enumerate() {
|
||||
let pl =
|
||||
fragmentation::v1::Payload::DataFragment(fragmentation::v1::DataFragmentPayload {
|
||||
index_and_n: fragmentation::v1::DataFragmentPayloadIndexAndN {
|
||||
frag_index: 0,
|
||||
n: (i + 1) as u16,
|
||||
},
|
||||
data: frag.clone(),
|
||||
});
|
||||
|
||||
let _ = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem {
|
||||
multicast_group_id: self.fuota_deployment.id,
|
||||
f_port: self.device_profile.app_layer_params.ts004_f_port as i16,
|
||||
data: pl.to_vec()?,
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Some(FuotaJob::FragStatus))
|
||||
}
|
||||
|
||||
async fn fragmentation_status(&mut self) -> Result<Option<FuotaJob>> {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
9
chirpstack/src/applayer/fuota/mod.rs
Normal file
9
chirpstack/src/applayer/fuota/mod.rs
Normal file
@ -0,0 +1,9 @@
|
||||
use tracing::info;
|
||||
|
||||
pub mod flow;
|
||||
pub mod scheduler;
|
||||
|
||||
pub async fn setup() {
|
||||
info!("Setting up FUOTA scheduler loop");
|
||||
tokio::spawn(scheduler::scheduler_loop());
|
||||
}
|
45
chirpstack/src/applayer/fuota/scheduler.rs
Normal file
45
chirpstack/src/applayer/fuota/scheduler.rs
Normal file
@ -0,0 +1,45 @@
|
||||
use anyhow::Result;
|
||||
use tokio::time::sleep;
|
||||
use tracing::{error, span, trace, Instrument, Level};
|
||||
|
||||
use crate::applayer::fuota::flow;
|
||||
use crate::config;
|
||||
use crate::storage::fuota;
|
||||
|
||||
pub async fn scheduler_loop() {
|
||||
let conf = config::get();
|
||||
|
||||
loop {
|
||||
trace!("Starting fuota scheduler_loop run");
|
||||
if let Err(err) = schedule_batch(conf.network.scheduler.batch_size).await {
|
||||
error!(error = %err, "Scheduling FUOTA batch error");
|
||||
} else {
|
||||
trace!("schedule_batch completed without error");
|
||||
}
|
||||
sleep(conf.network.scheduler.interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn schedule_batch(size: usize) -> Result<()> {
|
||||
trace!("Get schedulable fuota jobs");
|
||||
let jobs = fuota::get_schedulable_jobs(size).await?;
|
||||
trace!(job_count = jobs.len(), "Got this number of fuota jobs");
|
||||
|
||||
let mut handles = vec![];
|
||||
|
||||
for job in jobs {
|
||||
// Spawn the batch as async tasks.
|
||||
let handle = tokio::spawn(async move {
|
||||
let span = span!(Level::INFO, "job", fuota_deployment_id = %job.fuota_deployment_id, job = %job.job);
|
||||
|
||||
if let Err(e) = flow::Flow::handle_job(job).instrument(span).await {
|
||||
error!(error = %e, "Handle FUOTA job error");
|
||||
}
|
||||
});
|
||||
handles.push(handle);
|
||||
}
|
||||
|
||||
futures::future::join_all(handles).await;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -5,6 +5,9 @@ use crate::storage::{device, device_profile};
|
||||
use chirpstack_api::gw;
|
||||
|
||||
pub mod clocksync;
|
||||
pub mod fragmentation;
|
||||
pub mod fuota;
|
||||
pub mod multicastsetup;
|
||||
|
||||
pub async fn handle_uplink(
|
||||
dev: &device::Device,
|
||||
@ -31,9 +34,15 @@ async fn _handle_uplink(
|
||||
.instrument(span)
|
||||
.await
|
||||
} else if dp.app_layer_params.ts004_f_port == f_port {
|
||||
unimplemented!()
|
||||
let span = span!(Level::INFO, "ts004");
|
||||
fragmentation::handle_uplink(dev, dp, data)
|
||||
.instrument(span)
|
||||
.await
|
||||
} else if dp.app_layer_params.ts005_f_port == f_port {
|
||||
unimplemented!()
|
||||
let span = span!(Level::INFO, "ts005");
|
||||
multicastsetup::handle_uplink(dev, dp, data)
|
||||
.instrument(span)
|
||||
.await
|
||||
} else {
|
||||
return Err(anyhow!("Unexpected f_port {}", f_port));
|
||||
}
|
||||
|
139
chirpstack/src/applayer/multicastsetup.rs
Normal file
139
chirpstack/src/applayer/multicastsetup.rs
Normal file
@ -0,0 +1,139 @@
|
||||
use anyhow::Result;
|
||||
use chrono::Utc;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::storage::fields::device_profile::Ts005Version;
|
||||
use crate::storage::{device, device_profile, fuota};
|
||||
use lrwn::applayer::multicastsetup;
|
||||
|
||||
pub async fn handle_uplink(
|
||||
dev: &device::Device,
|
||||
dp: &device_profile::DeviceProfile,
|
||||
data: &[u8],
|
||||
) -> Result<()> {
|
||||
let version = dp
|
||||
.app_layer_params
|
||||
.ts005_version
|
||||
.ok_or_else(|| anyhow!("Device does not support TS005"))?;
|
||||
|
||||
match version {
|
||||
Ts005Version::V100 => handle_uplink_v100(dev, data).await,
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> {
|
||||
let pl = multicastsetup::v1::Payload::from_slice(true, data)?;
|
||||
|
||||
match pl {
|
||||
multicastsetup::v1::Payload::McGroupSetupAns(pl) => {
|
||||
handle_v1_mc_group_setup_ans(dev, pl).await?
|
||||
}
|
||||
multicastsetup::v1::Payload::McClassBSessionAns(pl) => {
|
||||
handle_v1_mc_class_b_session_ans(dev, pl).await?
|
||||
}
|
||||
multicastsetup::v1::Payload::McClassCSessionAns(pl) => {
|
||||
handle_v1_mc_class_c_session_ans(dev, pl).await?
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_v1_mc_group_setup_ans(
|
||||
dev: &device::Device,
|
||||
pl: multicastsetup::v1::McGroupSetupAnsPayload,
|
||||
) -> Result<()> {
|
||||
info!("Handling McGroupSetupAns");
|
||||
|
||||
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
|
||||
|
||||
if pl.mc_group_id_header.id_error {
|
||||
warn!(
|
||||
mc_group_id = pl.mc_group_id_header.mc_group_id,
|
||||
id_error = true,
|
||||
"McGroupSetupAns contains errors"
|
||||
);
|
||||
fuota_dev.return_msg = "Error: McGroupSetupAns response id_error=true".into();
|
||||
} else {
|
||||
fuota_dev.mc_group_setup_completed_at = Some(Utc::now());
|
||||
}
|
||||
|
||||
let _ = fuota::update_device(fuota_dev).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_v1_mc_class_b_session_ans(
|
||||
dev: &device::Device,
|
||||
pl: multicastsetup::v1::McClassBSessionAnsPayload,
|
||||
) -> Result<()> {
|
||||
info!("Handling McClassBSessionAns");
|
||||
|
||||
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
|
||||
|
||||
if pl.status_and_mc_group_id.dr_error
|
||||
| pl.status_and_mc_group_id.freq_error
|
||||
| pl.status_and_mc_group_id.mc_group_undefined
|
||||
{
|
||||
warn!(
|
||||
dr_error = pl.status_and_mc_group_id.dr_error,
|
||||
freq_error = pl.status_and_mc_group_id.freq_error,
|
||||
mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined,
|
||||
"McClassBSessionAns contains errors"
|
||||
);
|
||||
|
||||
fuota_dev.return_msg = format!("Error: McClassBSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}",
|
||||
pl.status_and_mc_group_id.dr_error,
|
||||
pl.status_and_mc_group_id.freq_error,
|
||||
pl.status_and_mc_group_id.mc_group_undefined,
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
time_to_start = pl.time_to_start.unwrap_or_default(),
|
||||
"McClassBSessionAns OK"
|
||||
);
|
||||
fuota_dev.mc_session_completed_at = Some(Utc::now());
|
||||
}
|
||||
|
||||
let _ = fuota::update_device(fuota_dev).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_v1_mc_class_c_session_ans(
|
||||
dev: &device::Device,
|
||||
pl: multicastsetup::v1::McClassCSessionAnsPayload,
|
||||
) -> Result<()> {
|
||||
info!("Handling McClassCSessionAns");
|
||||
|
||||
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
|
||||
|
||||
if pl.status_and_mc_group_id.dr_error
|
||||
| pl.status_and_mc_group_id.freq_error
|
||||
| pl.status_and_mc_group_id.mc_group_undefined
|
||||
{
|
||||
warn!(
|
||||
dr_error = pl.status_and_mc_group_id.dr_error,
|
||||
freq_error = pl.status_and_mc_group_id.freq_error,
|
||||
mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined,
|
||||
"McClassCSessionAns contains errors"
|
||||
);
|
||||
|
||||
fuota_dev.return_msg = format!("Error: McClassCSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}",
|
||||
pl.status_and_mc_group_id.dr_error,
|
||||
pl.status_and_mc_group_id.freq_error,
|
||||
pl.status_and_mc_group_id.mc_group_undefined,
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
time_to_start = pl.time_to_start.unwrap_or_default(),
|
||||
"McClassCSessionAns OK"
|
||||
);
|
||||
fuota_dev.mc_session_completed_at = Some(Utc::now());
|
||||
}
|
||||
|
||||
let _ = fuota::update_device(fuota_dev).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
@ -5,7 +5,7 @@ use signal_hook_tokio::Signals;
|
||||
use tracing::{info, warn};
|
||||
|
||||
use crate::gateway;
|
||||
use crate::{adr, api, backend, downlink, integration, region, storage};
|
||||
use crate::{adr, api, applayer::fuota, backend, downlink, integration, region, storage};
|
||||
|
||||
pub async fn run() -> Result<()> {
|
||||
info!(
|
||||
@ -21,6 +21,7 @@ pub async fn run() -> Result<()> {
|
||||
integration::setup().await?;
|
||||
gateway::backend::setup().await?;
|
||||
downlink::setup().await;
|
||||
fuota::setup().await;
|
||||
api::setup().await?;
|
||||
|
||||
let mut signals = Signals::new([SIGINT, SIGTERM]).unwrap();
|
||||
|
@ -10,10 +10,7 @@ mod uuid;
|
||||
|
||||
pub use big_decimal::BigDecimal;
|
||||
pub use dev_nonces::DevNonces;
|
||||
pub use device_profile::{
|
||||
AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams, Ts003Version, Ts004Version,
|
||||
Ts005Version,
|
||||
};
|
||||
pub use device_profile::{AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams};
|
||||
pub use device_session::DeviceSession;
|
||||
pub use fuota::{FuotaJob, RequestFragmentationSessionStatus};
|
||||
pub use key_value::KeyValue;
|
||||
|
@ -362,6 +362,15 @@ pub async fn get_device(
|
||||
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))
|
||||
}
|
||||
|
||||
pub async fn get_latest_device_by_dev_eui(dev_eui: EUI64) -> Result<FuotaDeploymentDevice, Error> {
|
||||
fuota_deployment_device::dsl::fuota_deployment_device
|
||||
.filter(fuota_deployment_device::dsl::dev_eui.eq(&dev_eui))
|
||||
.order_by(fuota_deployment_device::created_at.desc())
|
||||
.first(&mut get_async_db_conn().await?)
|
||||
.await
|
||||
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))
|
||||
}
|
||||
|
||||
pub async fn update_device(d: FuotaDeploymentDevice) -> Result<FuotaDeploymentDevice, Error> {
|
||||
let d: FuotaDeploymentDevice = diesel::update(
|
||||
fuota_deployment_device::dsl::fuota_deployment_device
|
||||
@ -650,7 +659,11 @@ pub async fn get_max_fragment_size(d: &FuotaDeployment) -> Result<usize> {
|
||||
.n
|
||||
- 3;
|
||||
|
||||
Ok(max_pl_size)
|
||||
Ok(if max_pl_size > d.payload.len() {
|
||||
d.payload.len()
|
||||
} else {
|
||||
max_pl_size
|
||||
})
|
||||
}
|
||||
|
||||
pub fn get_multicast_timeout(d: &FuotaDeployment) -> Result<usize> {
|
||||
|
Loading…
x
Reference in New Issue
Block a user