mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-05-31 06:51:01 +00:00
Add start job + get schedulable jobs functions + API.
This commit is contained in:
parent
c04d867864
commit
1a51aa836d
8
api/proto/api/fuota.proto
vendored
8
api/proto/api/fuota.proto
vendored
@ -30,6 +30,9 @@ service FuotaService {
|
|||||||
// Delete the FUOTA deployment for the given ID.
|
// Delete the FUOTA deployment for the given ID.
|
||||||
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
||||||
|
|
||||||
|
// Start the FUOTA deployment.
|
||||||
|
rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
||||||
|
|
||||||
// List the FUOTA deployments.
|
// List the FUOTA deployments.
|
||||||
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
|
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
|
||||||
|
|
||||||
@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest {
|
|||||||
string id = 1;
|
string id = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message StartFuotaDeploymentRequest {
|
||||||
|
// FUOTA deployment ID.
|
||||||
|
string id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message ListFuotaDeploymentsRequest {
|
message ListFuotaDeploymentsRequest {
|
||||||
// Max number of FUOTA deployments to return in the result-set.
|
// Max number of FUOTA deployments to return in the result-set.
|
||||||
// If not set, it will be treated as 0, and the response will only return the total_count.
|
// If not set, it will be treated as 0, and the response will only return the total_count.
|
||||||
|
8
api/rust/proto/chirpstack/api/fuota.proto
vendored
8
api/rust/proto/chirpstack/api/fuota.proto
vendored
@ -30,6 +30,9 @@ service FuotaService {
|
|||||||
// Delete the FUOTA deployment for the given ID.
|
// Delete the FUOTA deployment for the given ID.
|
||||||
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
rpc DeleteDeployment(DeleteFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
||||||
|
|
||||||
|
// Start the FUOTA deployment.
|
||||||
|
rpc StartDeployment(StartFuotaDeploymentRequest) returns (google.protobuf.Empty) {}
|
||||||
|
|
||||||
// List the FUOTA deployments.
|
// List the FUOTA deployments.
|
||||||
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
|
rpc ListDeployments(ListFuotaDeploymentsRequest) returns (ListFuotaDeploymentsResponse) {}
|
||||||
|
|
||||||
@ -237,6 +240,11 @@ message DeleteFuotaDeploymentRequest {
|
|||||||
string id = 1;
|
string id = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message StartFuotaDeploymentRequest {
|
||||||
|
// FUOTA deployment ID.
|
||||||
|
string id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
message ListFuotaDeploymentsRequest {
|
message ListFuotaDeploymentsRequest {
|
||||||
// Max number of FUOTA deployments to return in the result-set.
|
// Max number of FUOTA deployments to return in the result-set.
|
||||||
// If not set, it will be treated as 0, and the response will only return the total_count.
|
// If not set, it will be treated as 0, and the response will only return the total_count.
|
||||||
|
@ -58,4 +58,5 @@ create table fuota_deployment_job (
|
|||||||
primary key (fuota_deployment_id, job)
|
primary key (fuota_deployment_id, job)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at);
|
||||||
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
||||||
|
@ -58,4 +58,5 @@ create table fuota_deployment_job (
|
|||||||
primary key (fuota_deployment_id, job)
|
primary key (fuota_deployment_id, job)
|
||||||
);
|
);
|
||||||
|
|
||||||
|
create index idx_fuota_deployment_job_completed_at on fuota_deployment_job(completed_at);
|
||||||
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
create index idx_fuota_deployment_job_scheduler_run_after on fuota_deployment_job(scheduler_run_after);
|
||||||
|
@ -10,7 +10,7 @@ use lrwn::EUI64;
|
|||||||
use crate::api::auth::validator;
|
use crate::api::auth::validator;
|
||||||
use crate::api::error::ToStatus;
|
use crate::api::error::ToStatus;
|
||||||
use crate::api::helpers::{self, FromProto, ToProto};
|
use crate::api::helpers::{self, FromProto, ToProto};
|
||||||
use crate::storage::fuota;
|
use crate::storage::{fields, fuota};
|
||||||
|
|
||||||
pub struct Fuota {
|
pub struct Fuota {
|
||||||
validator: validator::RequestValidator,
|
validator: validator::RequestValidator,
|
||||||
@ -228,6 +228,37 @@ impl FuotaService for Fuota {
|
|||||||
Ok(resp)
|
Ok(resp)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn start_deployment(
|
||||||
|
&self,
|
||||||
|
request: Request<api::StartFuotaDeploymentRequest>,
|
||||||
|
) -> Result<Response<()>, Status> {
|
||||||
|
let req = request.get_ref();
|
||||||
|
let id = Uuid::from_str(&req.id).map_err(|e| e.status())?;
|
||||||
|
|
||||||
|
self.validator
|
||||||
|
.validate(
|
||||||
|
request.extensions(),
|
||||||
|
validator::ValidateFuotaDeploymentAccess::new(validator::Flag::Update, id),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let d = fuota::get_deployment(id).await.map_err(|e| e.status())?;
|
||||||
|
|
||||||
|
fuota::create_job(fuota::FuotaDeploymentJob {
|
||||||
|
fuota_deployment_id: d.id,
|
||||||
|
job: fields::FuotaJob::McGroupSetup,
|
||||||
|
max_attempt_count: d.unicast_attempt_count,
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.status())?;
|
||||||
|
|
||||||
|
let mut resp = Response::new(());
|
||||||
|
resp.metadata_mut()
|
||||||
|
.insert("x-log-fuota_deployment_id", req.id.parse().unwrap());
|
||||||
|
Ok(resp)
|
||||||
|
}
|
||||||
|
|
||||||
async fn list_deployments(
|
async fn list_deployments(
|
||||||
&self,
|
&self,
|
||||||
request: Request<api::ListFuotaDeploymentsRequest>,
|
request: Request<api::ListFuotaDeploymentsRequest>,
|
||||||
@ -651,6 +682,21 @@ mod test {
|
|||||||
assert_eq!(1, list_resp.result.len());
|
assert_eq!(1, list_resp.result.len());
|
||||||
assert_eq!(create_resp.id, list_resp.result[0].id);
|
assert_eq!(create_resp.id, list_resp.result[0].id);
|
||||||
|
|
||||||
|
// start deployment
|
||||||
|
let start_req = get_request(
|
||||||
|
&u.id,
|
||||||
|
api::StartFuotaDeploymentRequest {
|
||||||
|
id: create_resp.id.clone(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
service.start_deployment(start_req).await.unwrap();
|
||||||
|
let jobs = fuota::list_jobs(Uuid::from_str(&create_resp.id).unwrap())
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
assert_eq!(1, jobs.len());
|
||||||
|
assert_eq!(create_resp.id, jobs[0].fuota_deployment_id.to_string());
|
||||||
|
assert_eq!(fields::FuotaJob::McGroupSetup, jobs[0].job);
|
||||||
|
|
||||||
// add device
|
// add device
|
||||||
let add_dev_req = get_request(
|
let add_dev_req = get_request(
|
||||||
&u.id,
|
&u.id,
|
||||||
|
@ -1,11 +1,12 @@
|
|||||||
use anyhow::Result;
|
use anyhow::{Context, Result};
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Duration, Utc};
|
||||||
use diesel::{dsl, prelude::*};
|
use diesel::{dsl, prelude::*};
|
||||||
use diesel_async::RunQueryDsl;
|
use diesel_async::RunQueryDsl;
|
||||||
use tracing::info;
|
use tracing::info;
|
||||||
use uuid::Uuid;
|
use uuid::Uuid;
|
||||||
use validator::Validate;
|
use validator::Validate;
|
||||||
|
|
||||||
|
use crate::config;
|
||||||
use crate::storage::error::Error;
|
use crate::storage::error::Error;
|
||||||
use crate::storage::schema::{
|
use crate::storage::schema::{
|
||||||
application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway,
|
application, device, fuota_deployment, fuota_deployment_device, fuota_deployment_gateway,
|
||||||
@ -133,7 +134,7 @@ impl Default for FuotaDeploymentGateway {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Queryable, Insertable, Debug, PartialEq, Eq)]
|
#[derive(Clone, Queryable, QueryableByName, Insertable, Debug, PartialEq, Eq)]
|
||||||
#[diesel(table_name = fuota_deployment_job)]
|
#[diesel(table_name = fuota_deployment_job)]
|
||||||
pub struct FuotaDeploymentJob {
|
pub struct FuotaDeploymentJob {
|
||||||
pub fuota_deployment_id: fields::Uuid,
|
pub fuota_deployment_id: fields::Uuid,
|
||||||
@ -520,6 +521,72 @@ pub async fn list_jobs(fuota_deployment_id: Uuid) -> Result<Vec<FuotaDeploymentJ
|
|||||||
.map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string()))
|
.map_err(|e| Error::from_diesel(e, fuota_deployment_id.to_string()))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Selected jobs will automatically have their scheduler_run_after column updated to now + 2 x scheduler interval value.
|
||||||
|
// This is such that concurrent queries will not result in the same job being executed twice.
|
||||||
|
pub async fn get_schedulable_jobs(limit: usize) -> Result<Vec<FuotaDeploymentJob>> {
|
||||||
|
let mut c = get_async_db_conn().await?;
|
||||||
|
db_transaction::<Vec<FuotaDeploymentJob>, Error, _>(&mut c, |c| {
|
||||||
|
Box::pin(async move {
|
||||||
|
let conf = config::get();
|
||||||
|
diesel::sql_query(if cfg!(feature = "sqlite") {
|
||||||
|
r#"
|
||||||
|
update
|
||||||
|
fuota_deployment_job
|
||||||
|
set
|
||||||
|
scheduler_run_after = ?3
|
||||||
|
where
|
||||||
|
(fuota_deployment_id, job) in (
|
||||||
|
select
|
||||||
|
fuota_deployment_id,
|
||||||
|
job
|
||||||
|
from
|
||||||
|
fuota_deployment_job
|
||||||
|
where
|
||||||
|
completed_at is null
|
||||||
|
and scheduler_run_after <= ?2
|
||||||
|
order by
|
||||||
|
created_at
|
||||||
|
limit ?1
|
||||||
|
)
|
||||||
|
returning *
|
||||||
|
"#
|
||||||
|
} else {
|
||||||
|
r#"
|
||||||
|
update
|
||||||
|
fuota_deployment_job
|
||||||
|
set
|
||||||
|
scheduler_run_after = $3
|
||||||
|
where
|
||||||
|
(fuota_deployment_id, job) in (
|
||||||
|
select
|
||||||
|
fuota_deployment_id,
|
||||||
|
job
|
||||||
|
from
|
||||||
|
fuota_deployment_job
|
||||||
|
where
|
||||||
|
completed_at is null
|
||||||
|
and scheduler_run_after <= $2
|
||||||
|
order by
|
||||||
|
created_at
|
||||||
|
limit $1
|
||||||
|
)
|
||||||
|
returning *
|
||||||
|
"#
|
||||||
|
})
|
||||||
|
.bind::<diesel::sql_types::Integer, _>(limit as i32)
|
||||||
|
.bind::<fields::sql_types::Timestamptz, _>(Utc::now())
|
||||||
|
.bind::<fields::sql_types::Timestamptz, _>(
|
||||||
|
Utc::now() + Duration::from_std(2 * conf.network.scheduler.interval).unwrap(),
|
||||||
|
)
|
||||||
|
.load(c)
|
||||||
|
.await
|
||||||
|
.map_err(|e| Error::from_diesel(e, "".into()))
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.context("Get FUOTA jobs")
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod test {
|
mod test {
|
||||||
use super::*;
|
use super::*;
|
||||||
@ -861,5 +928,13 @@ mod test {
|
|||||||
assert!(jobs[0].completed_at.is_some());
|
assert!(jobs[0].completed_at.is_some());
|
||||||
assert_eq!(job2.job, jobs[1].job);
|
assert_eq!(job2.job, jobs[1].job);
|
||||||
assert!(jobs[1].completed_at.is_none());
|
assert!(jobs[1].completed_at.is_none());
|
||||||
|
|
||||||
|
// get schedulable jobs
|
||||||
|
let jobs = get_schedulable_jobs(10).await.unwrap();
|
||||||
|
assert_eq!(1, jobs.len());
|
||||||
|
assert_eq!(job2.job, jobs[0].job);
|
||||||
|
|
||||||
|
let jobs = get_schedulable_jobs(10).await.unwrap();
|
||||||
|
assert_eq!(0, jobs.len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user