Add warnings to fuota deployment job + UI.
Some checks failed
CI / tests (postgres) (push) Has been cancelled
CI / tests (sqlite) (push) Has been cancelled
CI / dist (postgres) (push) Has been cancelled
CI / dist (sqlite) (push) Has been cancelled

In case some devices do not complete a job, this makes it possible
to show a warning in the UI showing the amount of devices that did
not complete the job.
This commit is contained in:
Orne Brocaar 2025-03-19 14:47:47 +00:00
parent f02256245c
commit 5bbd71ab3a
10 changed files with 232 additions and 95 deletions

View File

@ -393,6 +393,9 @@ message FuotaDeploymentJob {
// Scheduler run after.
google.protobuf.Timestamp scheduler_run_after = 6;
// Warning message.
string warning_msg = 7;
// Error message.
string error_msg = 7;
string error_msg = 8;
}

View File

@ -393,6 +393,9 @@ message FuotaDeploymentJob {
// Scheduler run after.
google.protobuf.Timestamp scheduler_run_after = 6;
// Warning message.
string warning_msg = 7;
// Error message.
string error_msg = 7;
string error_msg = 8;
}

View File

@ -15,6 +15,8 @@ create table fuota_deployment (
multicast_class_b_ping_slot_nb_k smallint not null,
multicast_frequency bigint not null,
multicast_timeout smallint not null,
multicast_session_start timestamp with time zone null,
multicast_session_end timestamp with time zone null,
unicast_max_retry_count smallint not null,
fragmentation_fragment_size smallint not null,
fragmentation_redundancy_percentage smallint not null,
@ -57,6 +59,7 @@ create table fuota_deployment_job (
max_retry_count smallint not null,
attempt_count smallint not null,
scheduler_run_after timestamp with time zone not null,
warning_msg text not null,
error_msg text not null,
primary key (fuota_deployment_id, job)

View File

@ -15,6 +15,8 @@ create table fuota_deployment (
multicast_class_b_ping_slot_nb_k smallint not null,
multicast_frequency bigint not null,
multicast_timeout smallint not null,
multicast_session_start datetime null,
multicast_session_end datetime null,
unicast_max_retry_count smallint not null,
fragmentation_fragment_size smallint not null,
fragmentation_redundancy_percentage smallint not null,
@ -57,6 +59,7 @@ create table fuota_deployment_job (
max_retry_count smallint not null,
attempt_count smallint not null,
scheduler_run_after datetime not null,
warning_msg text not null,
error_msg text not null,
primary key (fuota_deployment_id, job)

View File

@ -630,6 +630,7 @@ impl FuotaService for Fuota {
scheduler_run_after: Some(helpers::datetime_to_prost_timestamp(
&j.scheduler_run_after,
)),
warning_msg: j.warning_msg.clone(),
error_msg: j.error_msg.clone(),
})
.collect(),

View File

@ -197,27 +197,38 @@ impl Flow {
}
async fn multicast_group_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::FragSessionSetup, Utc::now())));
}
info!("Sending McGroupSetupReq commands to devices");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
// Filter on devices that have not completed the McGroupSetup.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.mc_group_setup_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(None);
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to McGroupSetupReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
true,
false,
false,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the multicast group setup",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::FragSessionSetup, Utc::now())));
}
info!("Sending McGroupSetupReq commands to devices");
self.job.attempt_count += 1;
for fuota_dev in &fuota_devices {
let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?;
@ -316,25 +327,11 @@ impl Flow {
}
async fn fragmentation_session_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::McSession, Utc::now())));
}
info!("Set timeout error to devices that did not respond to McGroupSetupReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), true, false, false, false)
.await?;
info!("Sending FragSessionSetupReq commands to devices");
self.job.attempt_count += 1;
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
let fragments =
(self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize;
let padding =
(fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_mc_group_setup_count = fuota_devices
.iter()
.filter(|d| d.mc_group_setup_completed_at.is_some())
.count();
// Filter on devices that have completed the previous step, but not yet the FragSessionSetup.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
@ -345,11 +342,41 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to FragSessionSetupReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
true,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the fragmentation session setup",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::McSession, Utc::now())));
}
info!("Sending FragSessionSetupReq commands to devices");
self.job.attempt_count += 1;
if fuota_devices_completed_mc_group_setup_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
let fragments =
(self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize;
let padding =
(fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size;
for fuota_dev in &fuota_devices {
let pl = match self.device_profile.app_layer_params.ts004_version {
Some(Ts004Version::V100) => fragmentation::v1::Payload::FragSessionSetupReq(
@ -436,19 +463,11 @@ impl Flow {
}
async fn multicast_session_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::Enqueue, Utc::now())));
}
info!("Set timeout error to devices that did not respond to FragSessionSetupReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, false, true, false)
.await?;
info!("Sending McClassB/McClassCSessionReq commands to devices");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_frag_session_setup_count = fuota_devices
.iter()
.filter(|d| d.frag_session_setup_completed_at.is_some())
.count();
// Filter on devices that have completed the previous step, but not yet the McSession.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
@ -458,23 +477,79 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to McSessionReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
true,
false,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the multicast session setup",
fuota_devices.len()
);
}
return Ok(Some((
FuotaJob::Enqueue,
self.fuota_deployment
.multicast_session_start
.unwrap_or_else(|| Utc::now()),
)));
}
info!("Sending McClassB/McClassCSessionReq commands to devices");
self.job.attempt_count += 1;
if fuota_devices_completed_frag_session_setup_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
// Calculate the session start and end dates the first time this job is executed.
if self.fuota_deployment.multicast_session_start.is_none()
&& self.fuota_deployment.multicast_session_end.is_none()
{
// We want to start the session (retry_count + 1) x the uplink_interval.
// Note that retry_count=0 means only one attempt.
let session_start = (Utc::now()
let session_start = Utc::now()
+ TimeDelta::seconds(
(self.job.max_retry_count as i64 + 1)
* self.device_profile.uplink_interval as i64,
))
);
let session_end = {
let timeout = match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => Duration::from_secs(
128 * (1 << self.fuota_deployment.multicast_timeout as u64),
),
"C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64),
_ => return Err(anyhow!("Invalid multicast-group type")),
};
session_start + timeout
};
self.fuota_deployment.multicast_session_start = Some(session_start);
self.fuota_deployment.multicast_session_end = Some(session_end);
self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?;
}
let session_start = self
.fuota_deployment
.multicast_session_start
.ok_or_else(|| anyhow!("multicast_session_start is None"))?
.to_gps_time()
.num_seconds()
% (1 << 32);
% (1 << 32);
for fuota_dev in &fuota_devices {
let pl = match self.device_profile.app_layer_params.ts005_version {
Some(Ts005Version::V100) => {
match self.fuota_deployment.multicast_group_type.as_ref() {
@ -572,12 +647,19 @@ impl Flow {
.await?;
}
// In this case we need to exactly try the max. attempts, because this is what the
// session-start time calculation is based on. If we continue with enqueueing too
// early, the multicast-session hasn't started yet.
let scheduler_run_after =
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::McSession, scheduler_run_after)))
if !fuota_devices.is_empty() {
// There are devices pending setup, we need to re-run this job.
let scheduler_run_after =
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::McSession, scheduler_run_after)))
} else {
Ok(Some((
FuotaJob::Enqueue,
self.fuota_deployment
.multicast_session_start
.unwrap_or_else(|| Utc::now()),
)))
}
}
async fn enqueue(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
@ -586,16 +668,12 @@ impl Flow {
return Ok(Some((FuotaJob::FragStatus, Utc::now())));
}
info!("Set timeout error to devices that did not respond to McSessionReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, true, false, false)
.await?;
info!("Enqueueing fragmented payload to multicast group");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
// Filter on devices that have completed the previous step, but not yet the McSession.
// Filter on devices that have completed the previous step.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.mc_session_completed_at.is_some())
@ -674,35 +752,30 @@ impl Flow {
}
match self.fuota_deployment.request_fragmentation_session_status {
RequestFragmentationSessionStatus::NoRequest => {
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
}
RequestFragmentationSessionStatus::NoRequest => Ok(Some((
FuotaJob::DeleteMcGroup,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
))),
RequestFragmentationSessionStatus::AfterFragEnqueue => {
Ok(Some((FuotaJob::FragStatus, Utc::now())))
}
RequestFragmentationSessionStatus::AfterSessTimeout => {
let timeout = match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => Duration::from_secs(
128 * (1 << self.fuota_deployment.multicast_timeout as u64),
),
"C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64),
_ => return Err(anyhow!("Invalid multicast-group type")),
};
Ok(Some((FuotaJob::FragStatus, Utc::now() + timeout)))
}
RequestFragmentationSessionStatus::AfterSessTimeout => Ok(Some((
FuotaJob::FragStatus,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
))),
}
}
async fn fragmentation_status(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
info!("Enqueue FragSessionStatusReq");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_mc_session_count = fuota_devices
.iter()
.filter(|d| d.mc_session_completed_at.is_some())
.count();
// Filter on devices that have completed the multicast-session setup but
// not yet responded to the FragSessionStatusReq.
@ -711,7 +784,32 @@ impl Flow {
.filter(|d| d.mc_session_completed_at.is_some() && d.frag_status_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to FragSessionStatusReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
false,
true,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the fragmentation status",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
info!("Enqueue FragSessionStatusReq");
self.job.attempt_count += 1;
if fuota_devices_completed_mc_session_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
@ -750,7 +848,12 @@ impl Flow {
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::FragStatus, scheduler_run_after)))
} else {
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
Ok(Some((
FuotaJob::DeleteMcGroup,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
)))
}
}
@ -785,21 +888,15 @@ impl Flow {
} else {
fuota::set_device_completed(self.fuota_deployment.id.into(), true, true, true, true)
.await?;
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
false,
true,
)
.await?;
}
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_count = fuota_devices.len();
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.completed_at.is_some() && d.error_msg.is_empty())
.collect();
let fuota_devices_completed_count = fuota_devices.len();
for fuota_device in &fuota_devices {
let mut d = device::get(&fuota_device.dev_eui).await?;
@ -809,9 +906,15 @@ impl Flow {
let _ = device::update(d).await?;
}
let mut d = self.fuota_deployment.clone();
d.completed_at = Some(Utc::now());
let _ = fuota::update_deployment(d).await?;
if fuota_devices_count != fuota_devices_completed_count {
self.job.warning_msg = format!(
"{} devices did not complete the FUOTA deployment",
fuota_devices_count - fuota_devices_completed_count
);
}
self.fuota_deployment.completed_at = Some(Utc::now());
self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?;
Ok(None)
}

View File

@ -36,6 +36,8 @@ pub struct FuotaDeployment {
pub multicast_class_b_ping_slot_nb_k: i16,
pub multicast_frequency: i64,
pub multicast_timeout: i16,
pub multicast_session_start: Option<DateTime<Utc>>,
pub multicast_session_end: Option<DateTime<Utc>>,
pub unicast_max_retry_count: i16,
pub fragmentation_fragment_size: i16,
pub fragmentation_redundancy_percentage: i16,
@ -69,6 +71,8 @@ impl Default for FuotaDeployment {
multicast_class_b_ping_slot_nb_k: 0,
multicast_frequency: 0,
multicast_timeout: 0,
multicast_session_start: None,
multicast_session_end: None,
unicast_max_retry_count: 0,
fragmentation_fragment_size: 0,
fragmentation_redundancy_percentage: 0,
@ -154,6 +158,7 @@ pub struct FuotaDeploymentJob {
pub max_retry_count: i16,
pub attempt_count: i16,
pub scheduler_run_after: DateTime<Utc>,
pub warning_msg: String,
pub error_msg: String,
}
@ -169,6 +174,7 @@ impl Default for FuotaDeploymentJob {
max_retry_count: 0,
attempt_count: 0,
scheduler_run_after: now,
warning_msg: "".into(),
error_msg: "".into(),
}
}
@ -220,6 +226,8 @@ pub async fn update_deployment(d: FuotaDeployment) -> Result<FuotaDeployment, Er
.eq(&d.multicast_class_b_ping_slot_nb_k),
fuota_deployment::multicast_frequency.eq(&d.multicast_frequency),
fuota_deployment::multicast_timeout.eq(&d.multicast_timeout),
fuota_deployment::multicast_session_start.eq(&d.multicast_session_start),
fuota_deployment::multicast_session_end.eq(&d.multicast_session_end),
fuota_deployment::unicast_max_retry_count.eq(&d.unicast_max_retry_count),
fuota_deployment::fragmentation_fragment_size.eq(&d.fragmentation_fragment_size),
fuota_deployment::fragmentation_redundancy_percentage
@ -642,6 +650,7 @@ pub async fn update_job(j: FuotaDeploymentJob) -> Result<FuotaDeploymentJob, Err
fuota_deployment_job::completed_at.eq(&j.completed_at),
fuota_deployment_job::attempt_count.eq(&j.attempt_count),
fuota_deployment_job::scheduler_run_after.eq(&j.scheduler_run_after),
fuota_deployment_job::warning_msg.eq(&j.warning_msg),
fuota_deployment_job::error_msg.eq(&j.error_msg),
))
.get_result(&mut get_async_db_conn().await?)

View File

@ -203,6 +203,8 @@ diesel::table! {
multicast_class_b_ping_slot_nb_k -> Int2,
multicast_frequency -> Int8,
multicast_timeout -> Int2,
multicast_session_start -> Nullable<Timestamptz>,
multicast_session_end -> Nullable<Timestamptz>,
unicast_max_retry_count -> Int2,
fragmentation_fragment_size -> Int2,
fragmentation_redundancy_percentage -> Int2,
@ -249,6 +251,7 @@ diesel::table! {
max_retry_count -> Int2,
attempt_count -> Int2,
scheduler_run_after -> Timestamptz,
warning_msg -> Text,
error_msg -> Text,
}
}

View File

@ -180,6 +180,8 @@ diesel::table! {
multicast_class_b_ping_slot_nb_k -> SmallInt,
multicast_frequency -> BigInt,
multicast_timeout -> SmallInt,
multicast_session_start -> Nullable<TimestamptzSqlite>,
multicast_session_end -> Nullable<TimestamptzSqlite>,
unicast_max_retry_count -> SmallInt,
fragmentation_fragment_size -> SmallInt,
fragmentation_redundancy_percentage -> SmallInt,
@ -224,6 +226,7 @@ diesel::table! {
max_retry_count -> SmallInt,
attempt_count -> SmallInt,
scheduler_run_after -> TimestamptzSqlite,
warning_msg -> Text,
error_msg -> Text,
}
}

View File

@ -63,6 +63,12 @@ function FuotaDeploymentDashboard(props: IProps) {
);
} else if (!record.completedAt) {
return <Spin indicator={<LoadingOutlined spin />} size="small" />;
} else if (record.warningMsg !== "") {
return (
<Popover content={record.warningMsg} placement="right">
<Tag color="orange">warning</Tag>
</Popover>
);
} else {
return <Tag color="green">ok</Tag>;
}