Add start job + get schedulable jobs functions + API.
Some checks failed
CI / tests (postgres) (push) Has been cancelled
CI / tests (sqlite) (push) Has been cancelled
CI / dist (postgres) (push) Has been cancelled
CI / dist (sqlite) (push) Has been cancelled

This commit is contained in:
Orne Brocaar 2025-01-30 10:10:33 +00:00
parent 021bec07e5
commit d6839e2d6c
6 changed files with 143 additions and 4 deletions

View File

@ -30,6 +30,9 @@ service FuotaService {
// Delete the FUOTA deployment for the given ID.
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
// Start the FUOTA deployment.
rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
// List the FUOTA deployments.
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest {
string id = 1;
}
message StartFuotaDeploymentRequest {
// FUOTA deployment ID.
string id = 1;
}
message ListFuotaDeploymentsRequest {
// Max number of FUOTA deployments to return in the result-set.
// If not set, it will be treated as 0, and the response will only return the total_count.

View File

@ -30,6 +30,9 @@ service FuotaService {
// Delete the FUOTA deployment for the given ID.
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
// Start the FUOTA deployment.
rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
// List the FUOTA deployments.
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest {
string id = 1;
}
message StartFuotaDeploymentRequest {
// FUOTA deployment ID.
string id = 1;
}
message ListFuotaDeploymentsRequest {
// Max number of FUOTA deployments to return in the result-set.
// If not set, it will be treated as 0, and the response will only return the total_count.

View File

@ -58,4 +58,5 @@ create table fuota_deployment_job (
primary key (fuota_deployment_id, job)
);
create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at);
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);

View File

@ -58,4 +58,5 @@ create table fuota_deployment_job (
primary key (fuota_deployment_id, job)
);
create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at);
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);

View File

@ -10,7 +10,7 @@ use lrwn::EUI64;
use crate::api::auth::validator;
use crate::api::error::ToStatus;
use crate::api::helpers::{self, FromProto, ToProto};
use crate::storage::fuota;
use crate::storage::{fields, fuota};
pub struct Fuota {
validator: validator::RequestValidator,
@ -228,6 +228,37 @@ impl FuotaService for Fuota {
Ok(resp)
}
async fn start_deployment(
&self,
request: Request<api::StartFuotaDeploymentRequest>,
) -> Result<Response<()>, Status> {
let req = request.get_ref();
let id = Uuid::from_str(&req.id).map_err(|e| e.status())?;
self.validator
.validate(
request.extensions(),
validator::ValidateFuotaDeploymentAccess::new(validator::Flag::Update, id),
)
.await?;
let d = fuota::get_deployment(id).await.map_err(|e| e.status())?;
fuota::create_job(fuota::FuotaDeploymentJob {
fuota_deployment_id: d.id,
job: fields::FuotaJob::McGroupSetup,
max_attempt_count: d.unicast_attempt_count,
..Default::default()
})
.await
.map_err(|e| e.status())?;
let mut resp = Response::new(());
resp.metadata_mut()
.insert("x-log-fuota_deployment_id", req.id.parse().unwrap());
Ok(resp)
}
async fn list_deployments(
&self,
request: Request<api::ListFuotaDeploymentsRequest>,
@ -651,6 +682,21 @@ mod test {
assert_eq!(1, list_resp.result.len());
assert_eq!(create_resp.id, list_resp.result[0].id);
// start deployment
let start_req = get_request(
&u.id,
api::StartFuotaDeploymentRequest {
id: create_resp.id.clone(),
},
);
service.start_deployment(start_req).await.unwrap();
let jobs = fuota::list_jobs(Uuid::from_str(&create_resp.id).unwrap())
.await
.unwrap();
assert_eq!(1, jobs.len());
assert_eq!(create_resp.id, jobs[0].fuota_deployment_id.to_string());
assert_eq!(fields::FuotaJob::McGroupSetup, jobs[0].job);
// add device
let add_dev_req = get_request(
&u.id,

View File

@ -1,11 +1,12 @@
use anyhow::Result;
use chrono::{DateTime, Utc};
use anyhow::{Context, Result};
use chrono::{DateTime, Duration, Utc};
use diesel::{dsl, prelude::*};
use diesel_async::RunQueryDsl;
use tracing::info;
use uuid::Uuid;
use validator::Validate;
use crate::config;
use crate::storage::error::Error;
use crate::storage::schema::{
application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway,
@ -133,7 +134,7 @@ impl Default for FuotaDeploymentGateway {
}
}
#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq)]
#[derive(Clone, Queryable, QueryableByName, Insertable, Debug, PartialEq, Eq)]
#[diesel(table_name = fuota_deployment_job)]
pub struct FuotaDeploymentJob {
pub fuota_deployment_id: fields::Uuid,
@ -520,6 +521,72 @@ pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result<Vec<FuotaDeploymentJ
.map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string()))
}
// Selected jobs will automatically have their scheduler_run_after column updated to now + 2 x scheduler interval value.
// This is such that concurrent queries will not result in the same job being executed twice.
pub async fn get_schedulable_jobs(limit: usize) -> Result<Vec<FuotaDeploymentJob>> {
let mut c = get_async_db_conn().await?;
db_transaction::<Vec<FuotaDeploymentJob>, Error, _>(&mut c, |c| {
Box::pin(async move {
let conf = config::get();
diesel::sql_query(if cfg!(feature = "sqlite") {
r#"
update
fuota_deployment_job
set
scheduler_run_after = ?3
where
(fuota_deployment_id, job) in (
select
fuota_deployment_id,
job
from
fuota_deployment_job
where
completed_at is null
and scheduler_run_after <= ?2
order by
created_at
limit ?1
)
returning *
"#
} else {
r#"
update
fuota_deployment_job
set
scheduler_run_after = $3
where
(fuota_deployment_id, job) in (
select
fuota_deployment_id,
job
from
fuota_deployment_job
where
completed_at is null
and scheduler_run_after <= $2
order by
created_at
limit $1
)
returning *
"#
})
.bind::<diesel::sql_types::Integer, _>(limit as i32)
.bind::<fields::sql_types::Timestamptz, _>(Utc::now())
.bind::<fields::sql_types::Timestamptz, _>(
Utc::now() + Duration::from_std(2 * conf.network.scheduler.interval).unwrap(),
)
.load(c)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
})
})
.await
.context("Get FUOTA jobs")
}
#[cfg(test)]
mod test {
use super::*;
@ -861,5 +928,13 @@ mod test {
assert!(jobs[0].completed_at.is_some());
assert_eq!(job2.job, jobs[1].job);
assert!(jobs[1].completed_at.is_none());
// get schedulable jobs
let jobs = get_schedulable_jobs(10).await.unwrap();
assert_eq!(1, jobs.len());
assert_eq!(job2.job, jobs[0].job);
let jobs = get_schedulable_jobs(10).await.unwrap();
assert_eq!(0, jobs.len());
}
}