diff --git a/api/proto/api/fuota.proto b/api/proto/api/fuota.proto index ca47896d..c96e2f8e 100644 --- a/api/proto/api/fuota.proto +++ b/api/proto/api/fuota.proto @@ -30,6 +30,9 @@ service FuotaService { // Delete the FUOTA deployment for the given ID. rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {} + // Start the FUOTA deployment. + rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {} + // List the FUOTA deployments. rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {} @@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest { string id = 1; } +message StartFuotaDeploymentRequest { + // FUOTA deployment ID. + string id = 1; +} + message ListFuotaDeploymentsRequest { // Max number of FUOTA deployments to return in the result-set. // If not set, it will be treated as 0, and the response will only return the total_count. diff --git a/api/rust/proto/chirpstack/api/fuota.proto b/api/rust/proto/chirpstack/api/fuota.proto index ca47896d..c96e2f8e 100644 --- a/api/rust/proto/chirpstack/api/fuota.proto +++ b/api/rust/proto/chirpstack/api/fuota.proto @@ -30,6 +30,9 @@ service FuotaService { // Delete the FUOTA deployment for the given ID. rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {} + // Start the FUOTA deployment. + rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {} + // List the FUOTA deployments. rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {} @@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest { string id = 1; } +message StartFuotaDeploymentRequest { + // FUOTA deployment ID. + string id = 1; +} + message ListFuotaDeploymentsRequest { // Max number of FUOTA deployments to return in the result-set. // If not set, it will be treated as 0, and the response will only return the total_count. diff --git a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql index e42d4fa8..555a60f6 100644 --- a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql +++ b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql @@ -58,4 +58,5 @@ create table fuota_deployment_job ( primary key (fuota_deployment_id, job) ); +create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at); create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after); diff --git a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql index 32c231dc..ec72c889 100644 --- a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql +++ b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql @@ -58,4 +58,5 @@ create table fuota_deployment_job ( primary key (fuota_deployment_id, job) ); +create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at); create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after); diff --git a/chirpstack/src/api/fuota.rs b/chirpstack/src/api/fuota.rs index f0318850..39f5bde9 100644 --- a/chirpstack/src/api/fuota.rs +++ b/chirpstack/src/api/fuota.rs @@ -10,7 +10,7 @@ use lrwn::EUI64; use crate::api::auth::validator; use crate::api::error::ToStatus; use crate::api::helpers::{self, FromProto, ToProto}; -use crate::storage::fuota; +use crate::storage::{fields, fuota}; pub struct Fuota { validator: validator::RequestValidator, @@ -228,6 +228,37 @@ impl FuotaService for Fuota { Ok(resp) } + async fn start_deployment( + &self, + request: Request, + ) -> Result, Status> { + let req = request.get_ref(); + let id = Uuid::from_str(&req.id).map_err(|e| e.status())?; + + self.validator + .validate( + request.extensions(), + validator::ValidateFuotaDeploymentAccess::new(validator::Flag::Update, id), + ) + .await?; + + let d = fuota::get_deployment(id).await.map_err(|e| e.status())?; + + fuota::create_job(fuota::FuotaDeploymentJob { + fuota_deployment_id: d.id, + job: fields::FuotaJob::McGroupSetup, + max_attempt_count: d.unicast_attempt_count, + ..Default::default() + }) + .await + .map_err(|e| e.status())?; + + let mut resp = Response::new(()); + resp.metadata_mut() + .insert("x-log-fuota_deployment_id", req.id.parse().unwrap()); + Ok(resp) + } + async fn list_deployments( &self, request: Request, @@ -651,6 +682,21 @@ mod test { assert_eq!(1, list_resp.result.len()); assert_eq!(create_resp.id, list_resp.result[0].id); + // start deployment + let start_req = get_request( + &u.id, + api::StartFuotaDeploymentRequest { + id: create_resp.id.clone(), + }, + ); + service.start_deployment(start_req).await.unwrap(); + let jobs = fuota::list_jobs(Uuid::from_str(&create_resp.id).unwrap()) + .await + .unwrap(); + assert_eq!(1, jobs.len()); + assert_eq!(create_resp.id, jobs[0].fuota_deployment_id.to_string()); + assert_eq!(fields::FuotaJob::McGroupSetup, jobs[0].job); + // add device let add_dev_req = get_request( &u.id, diff --git a/chirpstack/src/storage/fuota.rs b/chirpstack/src/storage/fuota.rs index db96bdc9..31b4c36d 100644 --- a/chirpstack/src/storage/fuota.rs +++ b/chirpstack/src/storage/fuota.rs @@ -1,11 +1,12 @@ -use anyhow::Result; -use chrono::{DateTime, Utc}; +use anyhow::{Context, Result}; +use chrono::{DateTime, Duration, Utc}; use diesel::{dsl, prelude::*}; use diesel_async::RunQueryDsl; use tracing::info; use uuid::Uuid; use validator::Validate; +use crate::config; use crate::storage::error::Error; use crate::storage::schema::{ application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway, @@ -133,7 +134,7 @@ impl Default for FuotaDeploymentGateway { } } -#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq)] +#[derive(Clone, Queryable, QueryableByName, Insertable, Debug, PartialEq, Eq)] #[diesel(table_name = fuota_deployment_job)] pub struct FuotaDeploymentJob { pub fuota_deployment_id: fields::Uuid, @@ -520,6 +521,72 @@ pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result Result> { + let mut c = get_async_db_conn().await?; + db_transaction::, Error, _>(&mut c, |c| { + Box::pin(async move { + let conf = config::get(); + diesel::sql_query(if cfg!(feature = "sqlite") { + r#" + update + fuota_deployment_job + set + scheduler_run_after = ?3 + where + (fuota_deployment_id, job) in ( + select + fuota_deployment_id, + job + from + fuota_deployment_job + where + completed_at is null + and scheduler_run_after <= ?2 + order by + created_at + limit ?1 + ) + returning * + "# + } else { + r#" + update + fuota_deployment_job + set + scheduler_run_after = $3 + where + (fuota_deployment_id, job) in ( + select + fuota_deployment_id, + job + from + fuota_deployment_job + where + completed_at is null + and scheduler_run_after <= $2 + order by + created_at + limit $1 + ) + returning * + "# + }) + .bind::(limit as i32) + .bind::(Utc::now()) + .bind::( + Utc::now() + Duration::from_std(2 * conf.network.scheduler.interval).unwrap(), + ) + .load(c) + .await + .map_err(|e| Error::from_diesel(e, "".into())) + }) + }) + .await + .context("Get FUOTA jobs") +} + #[cfg(test)] mod test { use super::*; @@ -861,5 +928,13 @@ mod test { assert!(jobs[0].completed_at.is_some()); assert_eq!(job2.job, jobs[1].job); assert!(jobs[1].completed_at.is_none()); + + // get schedulable jobs + let jobs = get_schedulable_jobs(10).await.unwrap(); + assert_eq!(1, jobs.len()); + assert_eq!(job2.job, jobs[0].job); + + let jobs = get_schedulable_jobs(10).await.unwrap(); + assert_eq!(0, jobs.len()); } }