From a3e27d8b65c9abe203b066eaa858ea6fec2d8423 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Tue, 28 Jan 2025 14:09:12 +0000 Subject: [PATCH] Add tests + add fuota jobs functions. --- .../down.sql | 1 + .../up.sql | 14 + .../down.sql | 1 + .../up.sql | 22 +- chirpstack/src/api/fuota.rs | 278 ++++++++++- chirpstack/src/storage/fields/fuota.rs | 81 ++++ chirpstack/src/storage/fields/mod.rs | 2 +- chirpstack/src/storage/fuota.rs | 440 +++++++++++++++++- chirpstack/src/storage/schema_postgres.rs | 15 + chirpstack/src/storage/schema_sqlite.rs | 14 + 10 files changed, 859 insertions(+), 9 deletions(-) diff --git a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/down.sql b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/down.sql index f90d66e8..21bf56cb 100644 --- a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/down.sql +++ b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/down.sql @@ -1,3 +1,4 @@ +drop table fuota_deployment_job; drop table fuota_deployment_gateway; drop table fuota_deployment_device; drop table fuota_deployment; diff --git a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql index 3a8935f3..e42d4fa8 100644 --- a/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql +++ b/chirpstack/migrations_postgres/2025-01-21-093745_add_fuota_support/up.sql @@ -45,3 +45,17 @@ create table fuota_deployment_gateway ( primary key (fuota_deployment_id, gateway_id) ); + +create table fuota_deployment_job ( + fuota_deployment_id uuid not null references fuota_deployment on delete cascade, + job varchar(20) not null, + created_at timestamp with time zone not null, + completed_at timestamp with time zone null, + max_attempt_count smallint not null, + attempt_count smallint not null, + scheduler_run_after timestamp with time zone not null, + + primary key (fuota_deployment_id, job) +); + +create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after); diff --git a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/down.sql b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/down.sql index f90d66e8..21bf56cb 100644 --- a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/down.sql +++ b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/down.sql @@ -1,3 +1,4 @@ +drop table fuota_deployment_job; drop table fuota_deployment_gateway; drop table fuota_deployment_device; drop table fuota_deployment; diff --git a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql index af414964..32c231dc 100644 --- a/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql +++ b/chirpstack/migrations_sqlite/2025-01-27-100007_add_fuota_support/up.sql @@ -39,9 +39,23 @@ create table fuota_deployment_device ( ); create table fuota_deployment_gateway ( - fuota_deployment_id text not null references fuota_deployment on delete cascade, - gateway_id blob not null references gateway on delete cascade, - created_at datetime not null, + fuota_deployment_id text not null references fuota_deployment on delete cascade, + gateway_id blob not null references gateway on delete cascade, + created_at datetime not null, - primary key (fuota_deployment_id, gateway_id) + primary key (fuota_deployment_id, gateway_id) ); + +create table fuota_deployment_job ( + fuota_deployment_id text not null references fuota_deployment on delete cascade, + job varchar(20) not null, + created_at datetime not null, + completed_at datetime null, + max_attempt_count smallint not null, + attempt_count smallint not null, + scheduler_run_after datetime not null, + + primary key (fuota_deployment_id, job) +); + +create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after); diff --git a/chirpstack/src/api/fuota.rs b/chirpstack/src/api/fuota.rs index 522dc365..f0318850 100644 --- a/chirpstack/src/api/fuota.rs +++ b/chirpstack/src/api/fuota.rs @@ -473,7 +473,7 @@ impl FuotaService for Fuota { let count = fuota::get_gateway_count(dp_id) .await .map_err(|e| e.status())?; - let items = fuota::get_gateway(dp_id, req.limit as i64, req.offset as i64) + let items = fuota::get_gateways(dp_id, req.limit as i64, req.offset as i64) .await .map_err(|e| e.status())?; @@ -495,3 +495,279 @@ impl FuotaService for Fuota { Ok(resp) } } + +#[cfg(test)] +mod test { + use super::*; + use crate::api::auth::validator::RequestValidator; + use crate::api::auth::AuthID; + use crate::storage::{application, device, device_profile, gateway, tenant, user}; + use crate::test; + + #[tokio::test] + async fn test_fuota() { + let _guard = test::prepare().await; + + // setup admin user + let u = user::User { + is_admin: true, + is_active: true, + email: "admin@admin".into(), + email_verified: true, + ..Default::default() + }; + let u = user::create(u).await.unwrap(); + + // create tenant + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + can_have_gateways: true, + ..Default::default() + }) + .await + .unwrap(); + + // create app + let app = application::create(application::Application { + tenant_id: t.id, + name: "test-app".into(), + ..Default::default() + }) + .await + .unwrap(); + + // create dp + let dp = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + // create device + let dev = device::create(device::Device { + dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + application_id: app.id, + device_profile_id: dp.id, + ..Default::default() + }) + .await + .unwrap(); + + // create gateway + let gw = gateway::create(gateway::Gateway { + gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + tenant_id: t.id, + name: "test-gw".into(), + ..Default::default() + }) + .await + .unwrap(); + + // setup api + let service = Fuota::new(RequestValidator::new()); + + // create deployment + let create_req = get_request( + &u.id, + api::CreateFuotaDeploymentRequest { + deployment: Some(api::FuotaDeployment { + application_id: app.id.to_string(), + device_profile_id: dp.id.to_string(), + name: "test-fuota".into(), + ..Default::default() + }), + }, + ); + let create_resp = service.create_deployment(create_req).await.unwrap(); + let create_resp = create_resp.get_ref(); + + // get deployment + let get_req = get_request( + &u.id, + api::GetFuotaDeploymentRequest { + id: create_resp.id.clone(), + }, + ); + let get_resp = service.get_deployment(get_req).await.unwrap(); + let get_resp = get_resp.get_ref(); + assert_eq!( + Some(api::FuotaDeployment { + id: create_resp.id.clone(), + application_id: app.id.to_string(), + device_profile_id: dp.id.to_string(), + name: "test-fuota".into(), + ..Default::default() + }), + get_resp.deployment + ); + + // update deployment + let update_req = get_request( + &u.id, + api::UpdateFuotaDeploymentRequest { + deployment: Some(api::FuotaDeployment { + id: create_resp.id.clone(), + application_id: app.id.to_string(), + device_profile_id: dp.id.to_string(), + name: "updated-test-fuota".into(), + ..Default::default() + }), + }, + ); + service.update_deployment(update_req).await.unwrap(); + let get_req = get_request( + &u.id, + api::GetFuotaDeploymentRequest { + id: create_resp.id.clone(), + }, + ); + let get_resp = service.get_deployment(get_req).await.unwrap(); + let get_resp = get_resp.get_ref(); + assert_eq!( + Some(api::FuotaDeployment { + id: create_resp.id.clone(), + application_id: app.id.to_string(), + device_profile_id: dp.id.to_string(), + name: "updated-test-fuota".into(), + ..Default::default() + }), + get_resp.deployment + ); + + // list deployments + let list_req = get_request( + &u.id, + api::ListFuotaDeploymentsRequest { + application_id: app.id.to_string(), + limit: 10, + offset: 0, + }, + ); + let list_resp = service.list_deployments(list_req).await.unwrap(); + let list_resp = list_resp.get_ref(); + assert_eq!(1, list_resp.total_count); + assert_eq!(1, list_resp.result.len()); + assert_eq!(create_resp.id, list_resp.result[0].id); + + // add device + let add_dev_req = get_request( + &u.id, + api::AddDevicesToFuotaDeploymentRequest { + fuota_deployment_id: create_resp.id.clone(), + dev_euis: vec![dev.dev_eui.to_string()], + }, + ); + service.add_devices(add_dev_req).await.unwrap(); + + // list devices + let list_devs_req = get_request( + &u.id, + api::ListFuotaDeploymentDevicesRequest { + fuota_deployment_id: create_resp.id.clone(), + limit: 10, + offset: 0, + }, + ); + let list_devs_resp = service.list_devices(list_devs_req).await.unwrap(); + let list_devs_resp = list_devs_resp.get_ref(); + assert_eq!(1, list_devs_resp.total_count); + assert_eq!(1, list_devs_resp.result.len()); + assert_eq!(dev.dev_eui.to_string(), list_devs_resp.result[0].dev_eui); + + // remove devices + let remove_devs_req = get_request( + &u.id, + api::RemoveDevicesFromFuotaDeploymentRequest { + fuota_deployment_id: create_resp.id.clone(), + dev_euis: vec![dev.dev_eui.to_string()], + }, + ); + service.remove_devices(remove_devs_req).await.unwrap(); + let list_devs_req = get_request( + &u.id, + api::ListFuotaDeploymentDevicesRequest { + fuota_deployment_id: create_resp.id.clone(), + limit: 10, + offset: 0, + }, + ); + let list_devs_resp = service.list_devices(list_devs_req).await.unwrap(); + let list_devs_resp = list_devs_resp.get_ref(); + assert_eq!(0, list_devs_resp.total_count); + assert_eq!(0, list_devs_resp.result.len()); + + // add gateway + let add_gws_req = get_request( + &u.id, + api::AddGatewaysToFuotaDeploymentRequest { + fuota_deployment_id: create_resp.id.clone(), + gateway_ids: vec![gw.gateway_id.to_string()], + }, + ); + service.add_gateways(add_gws_req).await.unwrap(); + + // list gateways + let list_gws_req = get_request( + &u.id, + api::ListFuotaDeploymentGatewaysRequest { + fuota_deployment_id: create_resp.id.clone(), + limit: 10, + offset: 0, + }, + ); + let list_gws_resp = service.list_gateways(list_gws_req).await.unwrap(); + let list_gws_resp = list_gws_resp.get_ref(); + assert_eq!(1, list_gws_resp.total_count); + assert_eq!(1, list_gws_resp.result.len()); + assert_eq!( + gw.gateway_id.to_string(), + list_gws_resp.result[0].gateway_id + ); + + // remove gateways + let remove_gws_req = get_request( + &u.id, + api::RemoveGatewaysFromFuotaDeploymentRequest { + fuota_deployment_id: create_resp.id.clone(), + gateway_ids: vec![gw.gateway_id.to_string()], + }, + ); + service.remove_gateways(remove_gws_req).await.unwrap(); + let list_gws_req = get_request( + &u.id, + api::ListFuotaDeploymentGatewaysRequest { + fuota_deployment_id: create_resp.id.clone(), + limit: 10, + offset: 0, + }, + ); + let list_gws_resp = service.list_gateways(list_gws_req).await.unwrap(); + let list_gws_resp = list_gws_resp.get_ref(); + assert_eq!(0, list_gws_resp.total_count); + assert_eq!(0, list_gws_resp.result.len()); + + // delete deployment + let delete_req = get_request( + &u.id, + api::DeleteFuotaDeploymentRequest { + id: create_resp.id.clone(), + }, + ); + service.delete_deployment(delete_req).await.unwrap(); + let delete_req = get_request( + &u.id, + api::DeleteFuotaDeploymentRequest { + id: create_resp.id.clone(), + }, + ); + assert!(service.delete_deployment(delete_req).await.is_err()); + } + + fn get_request(user_id: &Uuid, req: T) -> Request { + let mut req = Request::new(req); + req.extensions_mut().insert(AuthID::User(*user_id)); + req + } +} diff --git a/chirpstack/src/storage/fields/fuota.rs b/chirpstack/src/storage/fields/fuota.rs index 8852ea54..1cab5f82 100644 --- a/chirpstack/src/storage/fields/fuota.rs +++ b/chirpstack/src/storage/fields/fuota.rs @@ -1,3 +1,5 @@ +use std::fmt; + use anyhow::Error; use diesel::backend::Backend; use diesel::sql_types::Text; @@ -77,3 +79,82 @@ impl serialize::ToSql for RequestFragmentationSessionStatus { Ok(serialize::IsNull::No) } } + +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AsExpression, FromSqlRow)] +#[diesel(sql_type = Text)] +pub enum FuotaJob { + McGroupSetup, + McSession, + FragSessionSetup, + Enqueue, + FragStatus, +} + +impl fmt::Display for FuotaJob { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "{}", String::from(self)) + } +} + +impl From<&FuotaJob> for String { + fn from(value: &FuotaJob) -> Self { + match value { + FuotaJob::McGroupSetup => "MC_GROUP_SETUP", + FuotaJob::McSession => "MC_SESSION", + FuotaJob::FragSessionSetup => "FRAG_SESSION_SETUP", + FuotaJob::Enqueue => "ENQUEUE", + FuotaJob::FragStatus => "FRAG_STATUS", + } + .to_string() + } +} + +impl TryFrom<&str> for FuotaJob { + type Error = Error; + + fn try_from(value: &str) -> Result { + Ok(match value { + "MC_GROUP_SETUP" => Self::McGroupSetup, + "MC_SESSION" => Self::McSession, + "FRAG_SESSION_SETUP" => Self::FragSessionSetup, + "ENQUEUE" => Self::Enqueue, + "FRAG_STATUS" => Self::FragStatus, + _ => return Err(anyhow!("Invalid FuotaJob value: {}", value)), + }) + } +} + +impl deserialize::FromSql for FuotaJob +where + DB: Backend, + *const str: deserialize::FromSql, +{ + fn from_sql(value: ::RawValue<'_>) -> deserialize::Result { + let string = <*const str>::from_sql(value)?; + Ok(Self::try_from(unsafe { &*string })?) + } +} + +#[cfg(feature = "postgres")] +impl serialize::ToSql for FuotaJob +where + str: serialize::ToSql, +{ + fn to_sql<'b>( + &'b self, + out: &mut serialize::Output<'b, '_, diesel::pg::Pg>, + ) -> serialize::Result { + >::to_sql( + &String::from(self), + &mut out.reborrow(), + ) + } +} + +#[cfg(feature = "sqlite")] +impl serialize::ToSql for FuotaJob { + fn to_sql(&self, out: &mut serialize::Output<'_, '_, Sqlite>) -> serialize::Result { + out.set_value(String::from(self)); + Ok(serialize::IsNull::No) + } +} diff --git a/chirpstack/src/storage/fields/mod.rs b/chirpstack/src/storage/fields/mod.rs index a436d574..77ff8a85 100644 --- a/chirpstack/src/storage/fields/mod.rs +++ b/chirpstack/src/storage/fields/mod.rs @@ -12,7 +12,7 @@ pub use big_decimal::BigDecimal; pub use dev_nonces::DevNonces; pub use device_profile::{AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams}; pub use device_session::DeviceSession; -pub use fuota::RequestFragmentationSessionStatus; +pub use fuota::{FuotaJob, RequestFragmentationSessionStatus}; pub use key_value::KeyValue; pub use measurements::*; pub use multicast_group_scheduling_type::MulticastGroupSchedulingType; diff --git a/chirpstack/src/storage/fuota.rs b/chirpstack/src/storage/fuota.rs index 3ac0f83a..db96bdc9 100644 --- a/chirpstack/src/storage/fuota.rs +++ b/chirpstack/src/storage/fuota.rs @@ -9,9 +9,9 @@ use validator::Validate; use crate::storage::error::Error; use crate::storage::schema::{ application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway, - gateway, tenant, + fuota_deployment_job, gateway, tenant, }; -use crate::storage::{self, device_profile, fields, get_async_db_conn}; +use crate::storage::{self, db_transaction, device_profile, fields, get_async_db_conn}; use lrwn::EUI64; #[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq, Validate)] @@ -133,6 +133,34 @@ impl Default for FuotaDeploymentGateway { } } +#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq)] +#[diesel(table_name = fuota_deployment_job)] +pub struct FuotaDeploymentJob { + pub fuota_deployment_id: fields::Uuid, + pub job: fields::FuotaJob, + pub created_at: DateTime, + pub completed_at: Option>, + pub max_attempt_count: i16, + pub attempt_count: i16, + pub scheduler_run_after: DateTime, +} + +impl Default for FuotaDeploymentJob { + fn default() -> Self { + let now = Utc::now(); + + Self { + fuota_deployment_id: Uuid::nil().into(), + job: fields::FuotaJob::McGroupSetup, + created_at: now, + completed_at: None, + max_attempt_count: 0, + attempt_count: 0, + scheduler_run_after: now, + } + } +} + pub async fn create_deployment(d: FuotaDeployment) -> Result { d.validate()?; @@ -250,6 +278,8 @@ pub async fn add_devices(fuota_deployment_id: Uuid, dev_euis: Vec) -> Res fuota_deployment::table .on(fuota_deployment::dsl::device_profile_id.eq(device::dsl::device_profile_id)), ) + .inner_join(application::table.on(application::dsl::id.eq(device::dsl::application_id))) + .filter(application::dsl::id.eq(fuota_deployment::dsl::application_id)) .filter(fuota_deployment::dsl::id.eq(fields::Uuid::from(fuota_deployment_id))) .filter(device::dsl::dev_eui.eq_any(&dev_euis)) .load(&mut get_async_db_conn().await?) @@ -412,7 +442,7 @@ pub async fn get_gateway_count(fuota_deployment_id: Uuid) -> Result .map_err(|e| Error::from_diesel(e, "".into())) } -pub async fn get_gateway( +pub async fn get_gateways( fuota_deployment_id: Uuid, limit: i64, offset: i64, @@ -429,3 +459,407 @@ pub async fn get_gateway( .await .map_err(|e| Error::from_diesel(e, "".into())) } + +// Creating a new job, will set any pending job(s) to completed within the same transaction. +pub async fn create_job(j: FuotaDeploymentJob) -> Result { + let mut c = get_async_db_conn().await?; + let j: FuotaDeploymentJob = db_transaction::(&mut c, |c| { + Box::pin(async move { + // set pending job(s) to completed + diesel::update( + fuota_deployment_job::dsl::fuota_deployment_job + .filter( + fuota_deployment_job::dsl::fuota_deployment_id.eq(&j.fuota_deployment_id), + ) + .filter(fuota_deployment_job::dsl::completed_at.is_null()), + ) + .set(fuota_deployment_job::dsl::completed_at.eq(Utc::now())) + .execute(c) + .await?; + + // create new job + diesel::insert_into(fuota_deployment_job::table) + .values(&j) + .get_result(c) + .await + .map_err(|e| Error::from_diesel(e, j.fuota_deployment_id.to_string())) + }) + }) + .await?; + + info!(fuota_deployment_id = %j.fuota_deployment_id, job = %j.job, "FUOTA deployment job created"); + Ok(j) +} + +pub async fn update_job(j: FuotaDeploymentJob) -> Result { + let j: FuotaDeploymentJob = diesel::update( + fuota_deployment_job::dsl::fuota_deployment_job.find((&j.fuota_deployment_id, &j.job)), + ) + .set(( + fuota_deployment_job::completed_at.eq(&j.completed_at), + fuota_deployment_job::attempt_count.eq(&j.attempt_count), + fuota_deployment_job::scheduler_run_after.eq(&j.scheduler_run_after), + )) + .get_result(&mut get_async_db_conn().await?) + .await + .map_err(|e| Error::from_diesel(e, j.fuota_deployment_id.to_string()))?; + + info!(fuota_deployment_id = %j.fuota_deployment_id, job = %j.job, "FUOTA deployment job updated"); + Ok(j) +} + +pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result, Error> { + fuota_deployment_job::dsl::fuota_deployment_job + .filter( + fuota_deployment_job::dsl::fuota_deployment_id + .eq(fields::Uuid::from(fuota_deployment_id)), + ) + .order_by(fuota_deployment_job::dsl::scheduler_run_after) + .load(&mut get_async_db_conn().await?) + .await + .map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string())) +} + +#[cfg(test)] +mod test { + use super::*; + use crate::storage::{application, device, device_profile, gateway, tenant}; + use crate::test; + + #[tokio::test] + async fn test_fuota() { + let _guard = test::prepare().await; + + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + ..Default::default() + }) + .await + .unwrap(); + + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let dp = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + // create + let mut d = create_deployment(FuotaDeployment { + application_id: app.id, + device_profile_id: dp.id, + name: "test-fuota-deployment".into(), + ..Default::default() + }) + .await + .unwrap(); + + let d_get = get_deployment(d.id.into()).await.unwrap(); + assert_eq!(d, d_get); + + // update + d.name = "updated-test-fuota-deployment".into(); + let d = update_deployment(d).await.unwrap(); + + // count + assert_eq!(1, get_deployment_count(app.id.into()).await.unwrap()); + + // list + assert_eq!( + vec![FuotaDeploymentListItem { + id: d.id, + created_at: d.created_at, + updated_at: d.updated_at, + started_at: None, + completed_at: None, + name: d.name.clone(), + }], + list_deployments(app.id.into(), 10, 0).await.unwrap() + ); + + // delete + delete_deployment(d.id.into()).await.unwrap(); + assert!(delete_deployment(d.id.into()).await.is_err()); + } + + #[tokio::test] + async fn test_fuota_devices() { + let _guard = test::prepare().await; + + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + ..Default::default() + }) + .await + .unwrap(); + + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let app2 = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let dp = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + let dp2 = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + let dev = device::create(device::Device { + application_id: app.id, + device_profile_id: dp.id, + name: "test-device".into(), + dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + ..Default::default() + }) + .await + .unwrap(); + + let dev2 = device::create(device::Device { + application_id: app.id, + device_profile_id: dp2.id, + name: "test-device".into(), + dev_eui: EUI64::from_be_bytes([2, 2, 3, 4, 5, 6, 7, 8]), + ..Default::default() + }) + .await + .unwrap(); + + let dev3 = device::create(device::Device { + application_id: app2.id, + device_profile_id: dp.id, + name: "test-device".into(), + dev_eui: EUI64::from_be_bytes([3, 2, 3, 4, 5, 6, 7, 8]), + ..Default::default() + }) + .await + .unwrap(); + + // create + let d = create_deployment(FuotaDeployment { + application_id: app.id, + device_profile_id: dp.id, + name: "test-fuota-deployment".into(), + ..Default::default() + }) + .await + .unwrap(); + + // can't add devices from multiple device-profiles + assert!(add_devices(d.id.into(), vec![dev2.dev_eui]).await.is_err()); + + // can't add devices from other applications + assert!(add_devices(d.id.into(), vec![dev3.dev_eui]).await.is_err()); + + // add devices + add_devices(d.id.into(), vec![dev.dev_eui]).await.unwrap(); + + // get device count + assert_eq!(1, get_device_count(d.id.into()).await.unwrap()); + + // get devices + let devices = get_devices(d.id.into(), 10, 0).await.unwrap(); + assert_eq!(1, devices.len()); + assert_eq!(dev.dev_eui, devices[0].dev_eui); + assert_eq!(d.id, devices[0].fuota_deployment_id); + + // remove devices + remove_devices(d.id.into(), vec![dev.dev_eui]) + .await + .unwrap(); + assert_eq!(0, get_device_count(d.id.into()).await.unwrap()); + } + + #[tokio::test] + async fn test_fuota_gateways() { + let _guard = test::prepare().await; + + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + can_have_gateways: true, + ..Default::default() + }) + .await + .unwrap(); + + let t2 = tenant::create(tenant::Tenant { + name: "test-tenant-2".into(), + can_have_gateways: true, + ..Default::default() + }) + .await + .unwrap(); + + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let dp = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + let d = create_deployment(FuotaDeployment { + application_id: app.id, + device_profile_id: dp.id, + name: "test-fuota-deployment".into(), + ..Default::default() + }) + .await + .unwrap(); + + let gw = gateway::create(gateway::Gateway { + gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]), + name: "gw-1".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let gw2 = gateway::create(gateway::Gateway { + gateway_id: EUI64::from_be_bytes([2, 2, 3, 4, 5, 6, 7, 8]), + name: "gw-2".into(), + tenant_id: t2.id, + ..Default::default() + }) + .await + .unwrap(); + + // adding gateteway from other tenant fails + assert!(add_gateways(d.id.into(), vec![gw2.gateway_id]) + .await + .is_err()); + + // add gateway + add_gateways(d.id.into(), vec![gw.gateway_id]) + .await + .unwrap(); + + // get count + assert_eq!(1, get_gateway_count(d.id.into()).await.unwrap()); + + // get gateways + let gateways = get_gateways(d.id.into(), 10, 0).await.unwrap(); + assert_eq!(1, gateways.len()); + + // remove gateways + remove_gateways(d.id.into(), vec![gw.gateway_id]) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_jobs() { + let _guard = test::prepare().await; + + let t = tenant::create(tenant::Tenant { + name: "test-tenant".into(), + ..Default::default() + }) + .await + .unwrap(); + + let app = application::create(application::Application { + name: "test-app".into(), + tenant_id: t.id, + ..Default::default() + }) + .await + .unwrap(); + + let dp = device_profile::create(device_profile::DeviceProfile { + tenant_id: t.id, + name: "test-dp".into(), + ..Default::default() + }) + .await + .unwrap(); + + // create + let d = create_deployment(FuotaDeployment { + application_id: app.id, + device_profile_id: dp.id, + name: "test-fuota-deployment".into(), + ..Default::default() + }) + .await + .unwrap(); + + // create job + let mut job = create_job(FuotaDeploymentJob { + fuota_deployment_id: d.id, + job: fields::FuotaJob::McGroupSetup, + max_attempt_count: 3, + attempt_count: 1, + ..Default::default() + }) + .await + .unwrap(); + + // list jobs + let jobs = list_jobs(d.id.into()).await.unwrap(); + assert_eq!(vec![job.clone()], jobs); + + // update job + job.attempt_count = 2; + let job = update_job(job).await.unwrap(); + let jobs = list_jobs(d.id.into()).await.unwrap(); + assert_eq!(vec![job.clone()], jobs); + + // create new job + // we expect that this sets the previous one as completed + let job2 = create_job(FuotaDeploymentJob { + fuota_deployment_id: d.id, + job: fields::FuotaJob::FragStatus, + max_attempt_count: 3, + attempt_count: 1, + ..Default::default() + }) + .await + .unwrap(); + + let jobs = list_jobs(d.id.into()).await.unwrap(); + assert_eq!(2, jobs.len()); + assert_eq!(job.job, jobs[0].job); + assert!(jobs[0].completed_at.is_some()); + assert_eq!(job2.job, jobs[1].job); + assert!(jobs[1].completed_at.is_none()); + } +} diff --git a/chirpstack/src/storage/schema_postgres.rs b/chirpstack/src/storage/schema_postgres.rs index 58f6036f..a0c7e85f 100644 --- a/chirpstack/src/storage/schema_postgres.rs +++ b/chirpstack/src/storage/schema_postgres.rs @@ -234,6 +234,19 @@ diesel::table! { } } +diesel::table! { + fuota_deployment_job (fuota_deployment_id, job) { + fuota_deployment_id -> Uuid, + #[max_length = 20] + job -> Varchar, + created_at -> Timestamptz, + completed_at -> Nullable, + max_attempt_count -> Int2, + attempt_count -> Int2, + scheduler_run_after -> Timestamptz, + } +} + diesel::table! { gateway (gateway_id) { gateway_id -> Bytea, @@ -392,6 +405,7 @@ diesel::joinable!(fuota_deployment_device -> device (dev_eui)); diesel::joinable!(fuota_deployment_device -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(fuota_deployment_gateway -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(fuota_deployment_gateway -> gateway (gateway_id)); +diesel::joinable!(fuota_deployment_job -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(gateway -> tenant (tenant_id)); diesel::joinable!(multicast_group -> application (application_id)); diesel::joinable!(multicast_group_device -> device (dev_eui)); @@ -416,6 +430,7 @@ diesel::allow_tables_to_appear_in_same_query!( fuota_deployment, fuota_deployment_device, fuota_deployment_gateway, + fuota_deployment_job, gateway, multicast_group, multicast_group_device, diff --git a/chirpstack/src/storage/schema_sqlite.rs b/chirpstack/src/storage/schema_sqlite.rs index 9a9ed0a4..8e16f068 100644 --- a/chirpstack/src/storage/schema_sqlite.rs +++ b/chirpstack/src/storage/schema_sqlite.rs @@ -210,6 +210,18 @@ diesel::table! { } } +diesel::table! { + fuota_deployment_job (fuota_deployment_id, job) { + fuota_deployment_id -> Text, + job -> Text, + created_at -> TimestamptzSqlite, + completed_at -> Nullable, + max_attempt_count -> SmallInt, + attempt_count -> SmallInt, + scheduler_run_after -> TimestamptzSqlite, + } +} + diesel::table! { gateway (gateway_id) { gateway_id -> Binary, @@ -359,6 +371,7 @@ diesel::joinable!(fuota_deployment_device -> device (dev_eui)); diesel::joinable!(fuota_deployment_device -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(fuota_deployment_gateway -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(fuota_deployment_gateway -> gateway (gateway_id)); +diesel::joinable!(fuota_deployment_job -> fuota_deployment (fuota_deployment_id)); diesel::joinable!(gateway -> tenant (tenant_id)); diesel::joinable!(multicast_group -> application (application_id)); diesel::joinable!(multicast_group_device -> device (dev_eui)); @@ -383,6 +396,7 @@ diesel::allow_tables_to_appear_in_same_query!( fuota_deployment, fuota_deployment_device, fuota_deployment_gateway, + fuota_deployment_job, gateway, multicast_group, multicast_group_device,