mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-03-14 00:06:39 +00:00
Add tests + add fuota jobs functions.
This commit is contained in:
parent
dae5ba6802
commit
021bec07e5
@ -1,3 +1,4 @@
|
||||
drop table fuota_deployment_job;
|
||||
drop table fuota_deployment_gateway;
|
||||
drop table fuota_deployment_device;
|
||||
drop table fuota_deployment;
|
||||
|
@ -45,3 +45,17 @@ create table fuota_deployment_gateway (
|
||||
|
||||
primary key (fuota_deployment_id, gateway_id)
|
||||
);
|
||||
|
||||
create table fuota_deployment_job (
|
||||
fuota_deployment_id uuid not null references fuota_deployment on delete cascade,
|
||||
job varchar(20) not null,
|
||||
created_at timestamp with time zone not null,
|
||||
completed_at timestamp with time zone null,
|
||||
max_attempt_count smallint not null,
|
||||
attempt_count smallint not null,
|
||||
scheduler_run_after timestamp with time zone not null,
|
||||
|
||||
primary key (fuota_deployment_id, job)
|
||||
);
|
||||
|
||||
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
||||
|
@ -1,3 +1,4 @@
|
||||
drop table fuota_deployment_job;
|
||||
drop table fuota_deployment_gateway;
|
||||
drop table fuota_deployment_device;
|
||||
drop table fuota_deployment;
|
||||
|
@ -39,9 +39,23 @@ create table fuota_deployment_device (
|
||||
);
|
||||
|
||||
create table fuota_deployment_gateway (
|
||||
fuota_deployment_id text not null references fuota_deployment on delete cascade,
|
||||
gateway_id blob not null references gateway on delete cascade,
|
||||
created_at datetime not null,
|
||||
fuota_deployment_id text not null references fuota_deployment on delete cascade,
|
||||
gateway_id blob not null references gateway on delete cascade,
|
||||
created_at datetime not null,
|
||||
|
||||
primary key (fuota_deployment_id, gateway_id)
|
||||
primary key (fuota_deployment_id, gateway_id)
|
||||
);
|
||||
|
||||
create table fuota_deployment_job (
|
||||
fuota_deployment_id text not null references fuota_deployment on delete cascade,
|
||||
job varchar(20) not null,
|
||||
created_at datetime not null,
|
||||
completed_at datetime null,
|
||||
max_attempt_count smallint not null,
|
||||
attempt_count smallint not null,
|
||||
scheduler_run_after datetime not null,
|
||||
|
||||
primary key (fuota_deployment_id, job)
|
||||
);
|
||||
|
||||
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
||||
|
@ -473,7 +473,7 @@ impl FuotaService for Fuota {
|
||||
let count = fuota::get_gateway_count(dp_id)
|
||||
.await
|
||||
.map_err(|e| e.status())?;
|
||||
let items = fuota::get_gateway(dp_id, req.limit as i64, req.offset as i64)
|
||||
let items = fuota::get_gateways(dp_id, req.limit as i64, req.offset as i64)
|
||||
.await
|
||||
.map_err(|e| e.status())?;
|
||||
|
||||
@ -495,3 +495,279 @@ impl FuotaService for Fuota {
|
||||
Ok(resp)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::api::auth::validator::RequestValidator;
|
||||
use crate::api::auth::AuthID;
|
||||
use crate::storage::{application, device, device_profile, gateway, tenant, user};
|
||||
use crate::test;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fuota() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
// setup admin user
|
||||
let u = user::User {
|
||||
is_admin: true,
|
||||
is_active: true,
|
||||
email: "admin@admin".into(),
|
||||
email_verified: true,
|
||||
..Default::default()
|
||||
};
|
||||
let u = user::create(u).await.unwrap();
|
||||
|
||||
// create tenant
|
||||
let t = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant".into(),
|
||||
can_have_gateways: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create app
|
||||
let app = application::create(application::Application {
|
||||
tenant_id: t.id,
|
||||
name: "test-app".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create dp
|
||||
let dp = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create device
|
||||
let dev = device::create(device::Device {
|
||||
dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create gateway
|
||||
let gw = gateway::create(gateway::Gateway {
|
||||
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
|
||||
tenant_id: t.id,
|
||||
name: "test-gw".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// setup api
|
||||
let service = Fuota::new(RequestValidator::new());
|
||||
|
||||
// create deployment
|
||||
let create_req = get_request(
|
||||
&u.id,
|
||||
api::CreateFuotaDeploymentRequest {
|
||||
deployment: Some(api::FuotaDeployment {
|
||||
application_id: app.id.to_string(),
|
||||
device_profile_id: dp.id.to_string(),
|
||||
name: "test-fuota".into(),
|
||||
..Default::default()
|
||||
}),
|
||||
},
|
||||
);
|
||||
let create_resp = service.create_deployment(create_req).await.unwrap();
|
||||
let create_resp = create_resp.get_ref();
|
||||
|
||||
// get deployment
|
||||
let get_req = get_request(
|
||||
&u.id,
|
||||
api::GetFuotaDeploymentRequest {
|
||||
id: create_resp.id.clone(),
|
||||
},
|
||||
);
|
||||
let get_resp = service.get_deployment(get_req).await.unwrap();
|
||||
let get_resp = get_resp.get_ref();
|
||||
assert_eq!(
|
||||
Some(api::FuotaDeployment {
|
||||
id: create_resp.id.clone(),
|
||||
application_id: app.id.to_string(),
|
||||
device_profile_id: dp.id.to_string(),
|
||||
name: "test-fuota".into(),
|
||||
..Default::default()
|
||||
}),
|
||||
get_resp.deployment
|
||||
);
|
||||
|
||||
// update deployment
|
||||
let update_req = get_request(
|
||||
&u.id,
|
||||
api::UpdateFuotaDeploymentRequest {
|
||||
deployment: Some(api::FuotaDeployment {
|
||||
id: create_resp.id.clone(),
|
||||
application_id: app.id.to_string(),
|
||||
device_profile_id: dp.id.to_string(),
|
||||
name: "updated-test-fuota".into(),
|
||||
..Default::default()
|
||||
}),
|
||||
},
|
||||
);
|
||||
service.update_deployment(update_req).await.unwrap();
|
||||
let get_req = get_request(
|
||||
&u.id,
|
||||
api::GetFuotaDeploymentRequest {
|
||||
id: create_resp.id.clone(),
|
||||
},
|
||||
);
|
||||
let get_resp = service.get_deployment(get_req).await.unwrap();
|
||||
let get_resp = get_resp.get_ref();
|
||||
assert_eq!(
|
||||
Some(api::FuotaDeployment {
|
||||
id: create_resp.id.clone(),
|
||||
application_id: app.id.to_string(),
|
||||
device_profile_id: dp.id.to_string(),
|
||||
name: "updated-test-fuota".into(),
|
||||
..Default::default()
|
||||
}),
|
||||
get_resp.deployment
|
||||
);
|
||||
|
||||
// list deployments
|
||||
let list_req = get_request(
|
||||
&u.id,
|
||||
api::ListFuotaDeploymentsRequest {
|
||||
application_id: app.id.to_string(),
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
},
|
||||
);
|
||||
let list_resp = service.list_deployments(list_req).await.unwrap();
|
||||
let list_resp = list_resp.get_ref();
|
||||
assert_eq!(1, list_resp.total_count);
|
||||
assert_eq!(1, list_resp.result.len());
|
||||
assert_eq!(create_resp.id, list_resp.result[0].id);
|
||||
|
||||
// add device
|
||||
let add_dev_req = get_request(
|
||||
&u.id,
|
||||
api::AddDevicesToFuotaDeploymentRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
dev_euis: vec![dev.dev_eui.to_string()],
|
||||
},
|
||||
);
|
||||
service.add_devices(add_dev_req).await.unwrap();
|
||||
|
||||
// list devices
|
||||
let list_devs_req = get_request(
|
||||
&u.id,
|
||||
api::ListFuotaDeploymentDevicesRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
},
|
||||
);
|
||||
let list_devs_resp = service.list_devices(list_devs_req).await.unwrap();
|
||||
let list_devs_resp = list_devs_resp.get_ref();
|
||||
assert_eq!(1, list_devs_resp.total_count);
|
||||
assert_eq!(1, list_devs_resp.result.len());
|
||||
assert_eq!(dev.dev_eui.to_string(), list_devs_resp.result[0].dev_eui);
|
||||
|
||||
// remove devices
|
||||
let remove_devs_req = get_request(
|
||||
&u.id,
|
||||
api::RemoveDevicesFromFuotaDeploymentRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
dev_euis: vec![dev.dev_eui.to_string()],
|
||||
},
|
||||
);
|
||||
service.remove_devices(remove_devs_req).await.unwrap();
|
||||
let list_devs_req = get_request(
|
||||
&u.id,
|
||||
api::ListFuotaDeploymentDevicesRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
},
|
||||
);
|
||||
let list_devs_resp = service.list_devices(list_devs_req).await.unwrap();
|
||||
let list_devs_resp = list_devs_resp.get_ref();
|
||||
assert_eq!(0, list_devs_resp.total_count);
|
||||
assert_eq!(0, list_devs_resp.result.len());
|
||||
|
||||
// add gateway
|
||||
let add_gws_req = get_request(
|
||||
&u.id,
|
||||
api::AddGatewaysToFuotaDeploymentRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
gateway_ids: vec![gw.gateway_id.to_string()],
|
||||
},
|
||||
);
|
||||
service.add_gateways(add_gws_req).await.unwrap();
|
||||
|
||||
// list gateways
|
||||
let list_gws_req = get_request(
|
||||
&u.id,
|
||||
api::ListFuotaDeploymentGatewaysRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
},
|
||||
);
|
||||
let list_gws_resp = service.list_gateways(list_gws_req).await.unwrap();
|
||||
let list_gws_resp = list_gws_resp.get_ref();
|
||||
assert_eq!(1, list_gws_resp.total_count);
|
||||
assert_eq!(1, list_gws_resp.result.len());
|
||||
assert_eq!(
|
||||
gw.gateway_id.to_string(),
|
||||
list_gws_resp.result[0].gateway_id
|
||||
);
|
||||
|
||||
// remove gateways
|
||||
let remove_gws_req = get_request(
|
||||
&u.id,
|
||||
api::RemoveGatewaysFromFuotaDeploymentRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
gateway_ids: vec![gw.gateway_id.to_string()],
|
||||
},
|
||||
);
|
||||
service.remove_gateways(remove_gws_req).await.unwrap();
|
||||
let list_gws_req = get_request(
|
||||
&u.id,
|
||||
api::ListFuotaDeploymentGatewaysRequest {
|
||||
fuota_deployment_id: create_resp.id.clone(),
|
||||
limit: 10,
|
||||
offset: 0,
|
||||
},
|
||||
);
|
||||
let list_gws_resp = service.list_gateways(list_gws_req).await.unwrap();
|
||||
let list_gws_resp = list_gws_resp.get_ref();
|
||||
assert_eq!(0, list_gws_resp.total_count);
|
||||
assert_eq!(0, list_gws_resp.result.len());
|
||||
|
||||
// delete deployment
|
||||
let delete_req = get_request(
|
||||
&u.id,
|
||||
api::DeleteFuotaDeploymentRequest {
|
||||
id: create_resp.id.clone(),
|
||||
},
|
||||
);
|
||||
service.delete_deployment(delete_req).await.unwrap();
|
||||
let delete_req = get_request(
|
||||
&u.id,
|
||||
api::DeleteFuotaDeploymentRequest {
|
||||
id: create_resp.id.clone(),
|
||||
},
|
||||
);
|
||||
assert!(service.delete_deployment(delete_req).await.is_err());
|
||||
}
|
||||
|
||||
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
|
||||
let mut req = Request::new(req);
|
||||
req.extensions_mut().insert(AuthID::User(*user_id));
|
||||
req
|
||||
}
|
||||
}
|
||||
|
@ -1,3 +1,5 @@
|
||||
use std::fmt;
|
||||
|
||||
use anyhow::Error;
|
||||
use diesel::backend::Backend;
|
||||
use diesel::sql_types::Text;
|
||||
@ -77,3 +79,82 @@ impl serialize::ToSql<Text, Sqlite> for RequestFragmentationSessionStatus {
|
||||
Ok(serialize::IsNull::No)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, AsExpression, FromSqlRow)]
|
||||
#[diesel(sql_type = Text)]
|
||||
pub enum FuotaJob {
|
||||
McGroupSetup,
|
||||
McSession,
|
||||
FragSessionSetup,
|
||||
Enqueue,
|
||||
FragStatus,
|
||||
}
|
||||
|
||||
impl fmt::Display for FuotaJob {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
write!(f, "{}", String::from(self))
|
||||
}
|
||||
}
|
||||
|
||||
impl From<&FuotaJob> for String {
|
||||
fn from(value: &FuotaJob) -> Self {
|
||||
match value {
|
||||
FuotaJob::McGroupSetup => "MC_GROUP_SETUP",
|
||||
FuotaJob::McSession => "MC_SESSION",
|
||||
FuotaJob::FragSessionSetup => "FRAG_SESSION_SETUP",
|
||||
FuotaJob::Enqueue => "ENQUEUE",
|
||||
FuotaJob::FragStatus => "FRAG_STATUS",
|
||||
}
|
||||
.to_string()
|
||||
}
|
||||
}
|
||||
|
||||
impl TryFrom<&str> for FuotaJob {
|
||||
type Error = Error;
|
||||
|
||||
fn try_from(value: &str) -> Result<Self, Self::Error> {
|
||||
Ok(match value {
|
||||
"MC_GROUP_SETUP" => Self::McGroupSetup,
|
||||
"MC_SESSION" => Self::McSession,
|
||||
"FRAG_SESSION_SETUP" => Self::FragSessionSetup,
|
||||
"ENQUEUE" => Self::Enqueue,
|
||||
"FRAG_STATUS" => Self::FragStatus,
|
||||
_ => return Err(anyhow!("Invalid FuotaJob value: {}", value)),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
impl<DB> deserialize::FromSql<Text, DB> for FuotaJob
|
||||
where
|
||||
DB: Backend,
|
||||
*const str: deserialize::FromSql<Text, DB>,
|
||||
{
|
||||
fn from_sql(value: <DB as Backend>::RawValue<'_>) -> deserialize::Result<Self> {
|
||||
let string = <*const str>::from_sql(value)?;
|
||||
Ok(Self::try_from(unsafe { &*string })?)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "postgres")]
|
||||
impl serialize::ToSql<Text, diesel::pg::Pg> for FuotaJob
|
||||
where
|
||||
str: serialize::ToSql<Text, diesel::pg::Pg>,
|
||||
{
|
||||
fn to_sql<'b>(
|
||||
&'b self,
|
||||
out: &mut serialize::Output<'b, '_, diesel::pg::Pg>,
|
||||
) -> serialize::Result {
|
||||
<str as serialize::ToSql<Text, diesel::pg::Pg>>::to_sql(
|
||||
&String::from(self),
|
||||
&mut out.reborrow(),
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "sqlite")]
|
||||
impl serialize::ToSql<Text, Sqlite> for FuotaJob {
|
||||
fn to_sql(&self, out: &mut serialize::Output<'_, '_, Sqlite>) -> serialize::Result {
|
||||
out.set_value(String::from(self));
|
||||
Ok(serialize::IsNull::No)
|
||||
}
|
||||
}
|
||||
|
@ -12,7 +12,7 @@ pub use big_decimal::BigDecimal;
|
||||
pub use dev_nonces::DevNonces;
|
||||
pub use device_profile::{AbpParams, AppLayerParams, ClassBParams, ClassCParams, RelayParams};
|
||||
pub use device_session::DeviceSession;
|
||||
pub use fuota::RequestFragmentationSessionStatus;
|
||||
pub use fuota::{FuotaJob, RequestFragmentationSessionStatus};
|
||||
pub use key_value::KeyValue;
|
||||
pub use measurements::*;
|
||||
pub use multicast_group_scheduling_type::MulticastGroupSchedulingType;
|
||||
|
@ -9,9 +9,9 @@ use validator::Validate;
|
||||
use crate::storage::error::Error;
|
||||
use crate::storage::schema::{
|
||||
application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway,
|
||||
gateway, tenant,
|
||||
fuota_deployment_job, gateway, tenant,
|
||||
};
|
||||
use crate::storage::{self, device_profile, fields, get_async_db_conn};
|
||||
use crate::storage::{self, db_transaction, device_profile, fields, get_async_db_conn};
|
||||
use lrwn::EUI64;
|
||||
|
||||
#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq, Validate)]
|
||||
@ -133,6 +133,34 @@ impl Default for FuotaDeploymentGateway {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq)]
|
||||
#[diesel(table_name = fuota_deployment_job)]
|
||||
pub struct FuotaDeploymentJob {
|
||||
pub fuota_deployment_id: fields::Uuid,
|
||||
pub job: fields::FuotaJob,
|
||||
pub created_at: DateTime<Utc>,
|
||||
pub completed_at: Option<DateTime<Utc>>,
|
||||
pub max_attempt_count: i16,
|
||||
pub attempt_count: i16,
|
||||
pub scheduler_run_after: DateTime<Utc>,
|
||||
}
|
||||
|
||||
impl Default for FuotaDeploymentJob {
|
||||
fn default() -> Self {
|
||||
let now = Utc::now();
|
||||
|
||||
Self {
|
||||
fuota_deployment_id: Uuid::nil().into(),
|
||||
job: fields::FuotaJob::McGroupSetup,
|
||||
created_at: now,
|
||||
completed_at: None,
|
||||
max_attempt_count: 0,
|
||||
attempt_count: 0,
|
||||
scheduler_run_after: now,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn create_deployment(d: FuotaDeployment) -> Result<FuotaDeployment, Error> {
|
||||
d.validate()?;
|
||||
|
||||
@ -250,6 +278,8 @@ pub async fn add_devices(fuota_deployment_id: Uuid, dev_euis: Vec<EUI64>) -> Res
|
||||
fuota_deployment::table
|
||||
.on(fuota_deployment::dsl::device_profile_id.eq(device::dsl::device_profile_id)),
|
||||
)
|
||||
.inner_join(application::table.on(application::dsl::id.eq(device::dsl::application_id)))
|
||||
.filter(application::dsl::id.eq(fuota_deployment::dsl::application_id))
|
||||
.filter(fuota_deployment::dsl::id.eq(fields::Uuid::from(fuota_deployment_id)))
|
||||
.filter(device::dsl::dev_eui.eq_any(&dev_euis))
|
||||
.load(&mut get_async_db_conn().await?)
|
||||
@ -412,7 +442,7 @@ pub async fn get_gateway_count(fuota_deployment_id: Uuid) -> Result<i64, Error>
|
||||
.map_err(|e| Error::from_diesel(e, "".into()))
|
||||
}
|
||||
|
||||
pub async fn get_gateway(
|
||||
pub async fn get_gateways(
|
||||
fuota_deployment_id: Uuid,
|
||||
limit: i64,
|
||||
offset: i64,
|
||||
@ -429,3 +459,407 @@ pub async fn get_gateway(
|
||||
.await
|
||||
.map_err(|e| Error::from_diesel(e, "".into()))
|
||||
}
|
||||
|
||||
// Creating a new job, will set any pending job(s) to completed within the same transaction.
|
||||
pub async fn create_job(j: FuotaDeploymentJob) -> Result<FuotaDeploymentJob, Error> {
|
||||
let mut c = get_async_db_conn().await?;
|
||||
let j: FuotaDeploymentJob = db_transaction::<FuotaDeploymentJob, Error, _>(&mut c, |c| {
|
||||
Box::pin(async move {
|
||||
// set pending job(s) to completed
|
||||
diesel::update(
|
||||
fuota_deployment_job::dsl::fuota_deployment_job
|
||||
.filter(
|
||||
fuota_deployment_job::dsl::fuota_deployment_id.eq(&j.fuota_deployment_id),
|
||||
)
|
||||
.filter(fuota_deployment_job::dsl::completed_at.is_null()),
|
||||
)
|
||||
.set(fuota_deployment_job::dsl::completed_at.eq(Utc::now()))
|
||||
.execute(c)
|
||||
.await?;
|
||||
|
||||
// create new job
|
||||
diesel::insert_into(fuota_deployment_job::table)
|
||||
.values(&j)
|
||||
.get_result(c)
|
||||
.await
|
||||
.map_err(|e| Error::from_diesel(e, j.fuota_deployment_id.to_string()))
|
||||
})
|
||||
})
|
||||
.await?;
|
||||
|
||||
info!(fuota_deployment_id = %j.fuota_deployment_id, job = %j.job, "FUOTA deployment job created");
|
||||
Ok(j)
|
||||
}
|
||||
|
||||
pub async fn update_job(j: FuotaDeploymentJob) -> Result<FuotaDeploymentJob, Error> {
|
||||
let j: FuotaDeploymentJob = diesel::update(
|
||||
fuota_deployment_job::dsl::fuota_deployment_job.find((&j.fuota_deployment_id, &j.job)),
|
||||
)
|
||||
.set((
|
||||
fuota_deployment_job::completed_at.eq(&j.completed_at),
|
||||
fuota_deployment_job::attempt_count.eq(&j.attempt_count),
|
||||
fuota_deployment_job::scheduler_run_after.eq(&j.scheduler_run_after),
|
||||
))
|
||||
.get_result(&mut get_async_db_conn().await?)
|
||||
.await
|
||||
.map_err(|e| Error::from_diesel(e, j.fuota_deployment_id.to_string()))?;
|
||||
|
||||
info!(fuota_deployment_id = %j.fuota_deployment_id, job = %j.job, "FUOTA deployment job updated");
|
||||
Ok(j)
|
||||
}
|
||||
|
||||
pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result<Vec<FuotaDeploymentJob>, Error> {
|
||||
fuota_deployment_job::dsl::fuota_deployment_job
|
||||
.filter(
|
||||
fuota_deployment_job::dsl::fuota_deployment_id
|
||||
.eq(fields::Uuid::from(fuota_deployment_id)),
|
||||
)
|
||||
.order_by(fuota_deployment_job::dsl::scheduler_run_after)
|
||||
.load(&mut get_async_db_conn().await?)
|
||||
.await
|
||||
.map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string()))
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod test {
|
||||
use super::*;
|
||||
use crate::storage::{application, device, device_profile, gateway, tenant};
|
||||
use crate::test;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fuota() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
let t = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let app = application::create(application::Application {
|
||||
name: "test-app".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dp = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create
|
||||
let mut d = create_deployment(FuotaDeployment {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-fuota-deployment".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let d_get = get_deployment(d.id.into()).await.unwrap();
|
||||
assert_eq!(d, d_get);
|
||||
|
||||
// update
|
||||
d.name = "updated-test-fuota-deployment".into();
|
||||
let d = update_deployment(d).await.unwrap();
|
||||
|
||||
// count
|
||||
assert_eq!(1, get_deployment_count(app.id.into()).await.unwrap());
|
||||
|
||||
// list
|
||||
assert_eq!(
|
||||
vec![FuotaDeploymentListItem {
|
||||
id: d.id,
|
||||
created_at: d.created_at,
|
||||
updated_at: d.updated_at,
|
||||
started_at: None,
|
||||
completed_at: None,
|
||||
name: d.name.clone(),
|
||||
}],
|
||||
list_deployments(app.id.into(), 10, 0).await.unwrap()
|
||||
);
|
||||
|
||||
// delete
|
||||
delete_deployment(d.id.into()).await.unwrap();
|
||||
assert!(delete_deployment(d.id.into()).await.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fuota_devices() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
let t = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let app = application::create(application::Application {
|
||||
name: "test-app".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let app2 = application::create(application::Application {
|
||||
name: "test-app".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dp = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dp2 = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dev = device::create(device::Device {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-device".into(),
|
||||
dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dev2 = device::create(device::Device {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp2.id,
|
||||
name: "test-device".into(),
|
||||
dev_eui: EUI64::from_be_bytes([2, 2, 3, 4, 5, 6, 7, 8]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dev3 = device::create(device::Device {
|
||||
application_id: app2.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-device".into(),
|
||||
dev_eui: EUI64::from_be_bytes([3, 2, 3, 4, 5, 6, 7, 8]),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create
|
||||
let d = create_deployment(FuotaDeployment {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-fuota-deployment".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// can't add devices from multiple device-profiles
|
||||
assert!(add_devices(d.id.into(), vec![dev2.dev_eui]).await.is_err());
|
||||
|
||||
// can't add devices from other applications
|
||||
assert!(add_devices(d.id.into(), vec![dev3.dev_eui]).await.is_err());
|
||||
|
||||
// add devices
|
||||
add_devices(d.id.into(), vec![dev.dev_eui]).await.unwrap();
|
||||
|
||||
// get device count
|
||||
assert_eq!(1, get_device_count(d.id.into()).await.unwrap());
|
||||
|
||||
// get devices
|
||||
let devices = get_devices(d.id.into(), 10, 0).await.unwrap();
|
||||
assert_eq!(1, devices.len());
|
||||
assert_eq!(dev.dev_eui, devices[0].dev_eui);
|
||||
assert_eq!(d.id, devices[0].fuota_deployment_id);
|
||||
|
||||
// remove devices
|
||||
remove_devices(d.id.into(), vec![dev.dev_eui])
|
||||
.await
|
||||
.unwrap();
|
||||
assert_eq!(0, get_device_count(d.id.into()).await.unwrap());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_fuota_gateways() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
let t = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant".into(),
|
||||
can_have_gateways: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let t2 = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant-2".into(),
|
||||
can_have_gateways: true,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let app = application::create(application::Application {
|
||||
name: "test-app".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dp = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let d = create_deployment(FuotaDeployment {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-fuota-deployment".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let gw = gateway::create(gateway::Gateway {
|
||||
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
|
||||
name: "gw-1".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let gw2 = gateway::create(gateway::Gateway {
|
||||
gateway_id: EUI64::from_be_bytes([2, 2, 3, 4, 5, 6, 7, 8]),
|
||||
name: "gw-2".into(),
|
||||
tenant_id: t2.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// adding gateteway from other tenant fails
|
||||
assert!(add_gateways(d.id.into(), vec![gw2.gateway_id])
|
||||
.await
|
||||
.is_err());
|
||||
|
||||
// add gateway
|
||||
add_gateways(d.id.into(), vec![gw.gateway_id])
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// get count
|
||||
assert_eq!(1, get_gateway_count(d.id.into()).await.unwrap());
|
||||
|
||||
// get gateways
|
||||
let gateways = get_gateways(d.id.into(), 10, 0).await.unwrap();
|
||||
assert_eq!(1, gateways.len());
|
||||
|
||||
// remove gateways
|
||||
remove_gateways(d.id.into(), vec![gw.gateway_id])
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_jobs() {
|
||||
let _guard = test::prepare().await;
|
||||
|
||||
let t = tenant::create(tenant::Tenant {
|
||||
name: "test-tenant".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let app = application::create(application::Application {
|
||||
name: "test-app".into(),
|
||||
tenant_id: t.id,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let dp = device_profile::create(device_profile::DeviceProfile {
|
||||
tenant_id: t.id,
|
||||
name: "test-dp".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create
|
||||
let d = create_deployment(FuotaDeployment {
|
||||
application_id: app.id,
|
||||
device_profile_id: dp.id,
|
||||
name: "test-fuota-deployment".into(),
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// create job
|
||||
let mut job = create_job(FuotaDeploymentJob {
|
||||
fuota_deployment_id: d.id,
|
||||
job: fields::FuotaJob::McGroupSetup,
|
||||
max_attempt_count: 3,
|
||||
attempt_count: 1,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// list jobs
|
||||
let jobs = list_jobs(d.id.into()).await.unwrap();
|
||||
assert_eq!(vec![job.clone()], jobs);
|
||||
|
||||
// update job
|
||||
job.attempt_count = 2;
|
||||
let job = update_job(job).await.unwrap();
|
||||
let jobs = list_jobs(d.id.into()).await.unwrap();
|
||||
assert_eq!(vec![job.clone()], jobs);
|
||||
|
||||
// create new job
|
||||
// we expect that this sets the previous one as completed
|
||||
let job2 = create_job(FuotaDeploymentJob {
|
||||
fuota_deployment_id: d.id,
|
||||
job: fields::FuotaJob::FragStatus,
|
||||
max_attempt_count: 3,
|
||||
attempt_count: 1,
|
||||
..Default::default()
|
||||
})
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let jobs = list_jobs(d.id.into()).await.unwrap();
|
||||
assert_eq!(2, jobs.len());
|
||||
assert_eq!(job.job, jobs[0].job);
|
||||
assert!(jobs[0].completed_at.is_some());
|
||||
assert_eq!(job2.job, jobs[1].job);
|
||||
assert!(jobs[1].completed_at.is_none());
|
||||
}
|
||||
}
|
||||
|
@ -234,6 +234,19 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
fuota_deployment_job (fuota_deployment_id, job) {
|
||||
fuota_deployment_id -> Uuid,
|
||||
#[max_length = 20]
|
||||
job -> Varchar,
|
||||
created_at -> Timestamptz,
|
||||
completed_at -> Nullable<Timestamptz>,
|
||||
max_attempt_count -> Int2,
|
||||
attempt_count -> Int2,
|
||||
scheduler_run_after -> Timestamptz,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
gateway (gateway_id) {
|
||||
gateway_id -> Bytea,
|
||||
@ -392,6 +405,7 @@ diesel::joinable!(fuota_deployment_device -> device (dev_eui));
|
||||
diesel::joinable!(fuota_deployment_device -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(fuota_deployment_gateway -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(fuota_deployment_gateway -> gateway (gateway_id));
|
||||
diesel::joinable!(fuota_deployment_job -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(gateway -> tenant (tenant_id));
|
||||
diesel::joinable!(multicast_group -> application (application_id));
|
||||
diesel::joinable!(multicast_group_device -> device (dev_eui));
|
||||
@ -416,6 +430,7 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||
fuota_deployment,
|
||||
fuota_deployment_device,
|
||||
fuota_deployment_gateway,
|
||||
fuota_deployment_job,
|
||||
gateway,
|
||||
multicast_group,
|
||||
multicast_group_device,
|
||||
|
@ -210,6 +210,18 @@ diesel::table! {
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
fuota_deployment_job (fuota_deployment_id, job) {
|
||||
fuota_deployment_id -> Text,
|
||||
job -> Text,
|
||||
created_at -> TimestamptzSqlite,
|
||||
completed_at -> Nullable<TimestamptzSqlite>,
|
||||
max_attempt_count -> SmallInt,
|
||||
attempt_count -> SmallInt,
|
||||
scheduler_run_after -> TimestamptzSqlite,
|
||||
}
|
||||
}
|
||||
|
||||
diesel::table! {
|
||||
gateway (gateway_id) {
|
||||
gateway_id -> Binary,
|
||||
@ -359,6 +371,7 @@ diesel::joinable!(fuota_deployment_device -> device (dev_eui));
|
||||
diesel::joinable!(fuota_deployment_device -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(fuota_deployment_gateway -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(fuota_deployment_gateway -> gateway (gateway_id));
|
||||
diesel::joinable!(fuota_deployment_job -> fuota_deployment (fuota_deployment_id));
|
||||
diesel::joinable!(gateway -> tenant (tenant_id));
|
||||
diesel::joinable!(multicast_group -> application (application_id));
|
||||
diesel::joinable!(multicast_group_device -> device (dev_eui));
|
||||
@ -383,6 +396,7 @@ diesel::allow_tables_to_appear_in_same_query!(
|
||||
fuota_deployment,
|
||||
fuota_deployment_device,
|
||||
fuota_deployment_gateway,
|
||||
fuota_deployment_job,
|
||||
gateway,
|
||||
multicast_group,
|
||||
multicast_group_device,
|
||||
|
Loading…
x
Reference in New Issue
Block a user