Immediately return PG conn after use.

Same as the previous commit, this will returns the PG connection back to
the pool after usage, avoiding the risk that we are holding the conn
longer than needed.
This commit is contained in:
Orne Brocaar 2023-12-12 11:22:15 +00:00
parent fccf762c39
commit c62f2b6474
16 changed files with 177 additions and 330 deletions

View File

@ -92,12 +92,11 @@ impl Validator for ValidateActiveUser {
}
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = user::dsl::user
.select(dsl::count_star())
.find(id)
.filter(user::dsl::is_active.eq(true))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -118,7 +117,6 @@ impl Validator for ValidateIsAdmin {
}
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = user::dsl::user
.select(dsl::count_star())
.find(id)
@ -127,7 +125,7 @@ impl Validator for ValidateIsAdmin {
.eq(true)
.and(user::dsl::is_admin.eq(true)),
)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -144,22 +142,20 @@ impl ValidateActiveUserOrKey {
#[async_trait]
impl Validator for ValidateActiveUserOrKey {
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = api_key::dsl::api_key
.select(dsl::count_star())
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = user::dsl::user
.select(dsl::count_star())
.find(id)
.filter(user::dsl::is_active.eq(true))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -178,7 +174,6 @@ impl ValidateUsersAccess {
#[async_trait]
impl Validator for ValidateUsersAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.find(&id)
@ -195,17 +190,16 @@ impl Validator for ValidateUsersAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
// admin api key
let count = api_key::dsl::api_key
.select(dsl::count_star())
.find(&id)
.filter(api_key::dsl::is_admin.eq(true))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -225,7 +219,6 @@ impl ValidateUserAccess {
#[async_trait]
impl Validator for ValidateUserAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.find(&id)
@ -251,17 +244,16 @@ impl Validator for ValidateUserAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
// admin api key
let count = api_key::dsl::api_key
.select(dsl::count_star())
.find(&id)
.filter(api_key::dsl::is_admin.eq(true))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -287,8 +279,6 @@ impl ValidateApiKeysAccess {
#[async_trait]
impl Validator for ValidateApiKeysAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(&id).and(user::dsl::is_active.eq(true)))
@ -327,7 +317,7 @@ impl Validator for ValidateApiKeysAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, _id: &Uuid) -> Result<i64, Error> {
@ -349,8 +339,6 @@ impl ValidateApiKeyAccess {
#[async_trait]
impl Validator for ValidateApiKeyAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -380,7 +368,7 @@ impl Validator for ValidateApiKeyAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, _id: &Uuid) -> Result<i64, Error> {
@ -401,8 +389,6 @@ impl ValidateTenantsAccess {
#[async_trait]
impl Validator for ValidateTenantsAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.find(&id)
@ -421,17 +407,16 @@ impl Validator for ValidateTenantsAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
// admin api key
let count = api_key::dsl::api_key
.select(dsl::count_star())
.find(&id)
.filter(api_key::dsl::is_admin.eq(true))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -451,8 +436,6 @@ impl ValidateTenantAccess {
#[async_trait]
impl Validator for ValidateTenantAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -482,12 +465,10 @@ impl Validator for ValidateTenantAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -512,7 +493,7 @@ impl Validator for ValidateTenantAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -530,7 +511,6 @@ impl ValidateTenantUsersAccess {
#[async_trait]
impl Validator for ValidateTenantUsersAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -569,11 +549,10 @@ impl Validator for ValidateTenantUsersAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -594,7 +573,7 @@ impl Validator for ValidateTenantUsersAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -617,7 +596,6 @@ impl ValidateTenantUserAccess {
#[async_trait]
impl Validator for ValidateTenantUserAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -662,11 +640,10 @@ impl Validator for ValidateTenantUserAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -687,7 +664,7 @@ impl Validator for ValidateTenantUserAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -705,7 +682,6 @@ impl ValidateApplicationsAccess {
#[async_trait]
impl Validator for ValidateApplicationsAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -749,11 +725,10 @@ impl Validator for ValidateApplicationsAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -783,7 +758,7 @@ impl Validator for ValidateApplicationsAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -804,7 +779,6 @@ impl ValidateApplicationAccess {
#[async_trait]
impl Validator for ValidateApplicationAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -858,11 +832,10 @@ impl Validator for ValidateApplicationAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -885,7 +858,7 @@ impl Validator for ValidateApplicationAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -902,7 +875,6 @@ impl ValidateDeviceProfileTemplatesAccess {
#[async_trait]
impl Validator for ValidateDeviceProfileTemplatesAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -920,11 +892,10 @@ impl Validator for ValidateDeviceProfileTemplatesAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -942,7 +913,7 @@ impl Validator for ValidateDeviceProfileTemplatesAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -959,7 +930,6 @@ impl ValidateDeviceProfileTemplateAccess {
#[async_trait]
impl Validator for ValidateDeviceProfileTemplateAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -977,11 +947,10 @@ impl Validator for ValidateDeviceProfileTemplateAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -999,7 +968,7 @@ impl Validator for ValidateDeviceProfileTemplateAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1017,7 +986,6 @@ impl ValidateDeviceProfilesAccess {
#[async_trait]
impl Validator for ValidateDeviceProfilesAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1061,11 +1029,10 @@ impl Validator for ValidateDeviceProfilesAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -1086,7 +1053,7 @@ impl Validator for ValidateDeviceProfilesAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1107,7 +1074,6 @@ impl ValidateDeviceProfileAccess {
#[async_trait]
impl Validator for ValidateDeviceProfileAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1161,11 +1127,10 @@ impl Validator for ValidateDeviceProfileAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1188,7 +1153,7 @@ impl Validator for ValidateDeviceProfileAccess {
}
};
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1209,7 +1174,6 @@ impl ValidateDevicesAccess {
#[async_trait]
impl Validator for ValidateDevicesAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1263,11 +1227,10 @@ impl Validator for ValidateDevicesAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1290,7 +1253,7 @@ impl Validator for ValidateDevicesAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1308,7 +1271,6 @@ impl ValidateDeviceAccess {
#[async_trait]
impl Validator for ValidateDeviceAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1364,11 +1326,10 @@ impl Validator for ValidateDeviceAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1391,7 +1352,7 @@ impl Validator for ValidateDeviceAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1409,7 +1370,6 @@ impl ValidateDeviceQueueAccess {
#[async_trait]
impl Validator for ValidateDeviceQueueAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1440,11 +1400,10 @@ impl Validator for ValidateDeviceQueueAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1467,7 +1426,7 @@ impl Validator for ValidateDeviceQueueAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1485,7 +1444,6 @@ impl ValidateGatewaysAccess {
#[async_trait]
impl Validator for ValidateGatewaysAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1529,11 +1487,10 @@ impl Validator for ValidateGatewaysAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.find(id)
@ -1554,7 +1511,7 @@ impl Validator for ValidateGatewaysAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1572,7 +1529,6 @@ impl ValidateGatewayAccess {
#[async_trait]
impl Validator for ValidateGatewayAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1626,11 +1582,10 @@ impl Validator for ValidateGatewayAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1654,7 +1609,7 @@ impl Validator for ValidateGatewayAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1675,7 +1630,6 @@ impl ValidateMulticastGroupsAccess {
#[async_trait]
impl Validator for ValidateMulticastGroupsAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1729,11 +1683,10 @@ impl Validator for ValidateMulticastGroupsAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1756,7 +1709,7 @@ impl Validator for ValidateMulticastGroupsAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1777,7 +1730,6 @@ impl ValidateMulticastGroupAccess {
#[async_trait]
impl Validator for ValidateMulticastGroupAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1833,11 +1785,10 @@ impl Validator for ValidateMulticastGroupAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1862,7 +1813,7 @@ impl Validator for ValidateMulticastGroupAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}
@ -1883,7 +1834,6 @@ impl ValidateMulticastGroupQueueAccess {
#[async_trait]
impl Validator for ValidateMulticastGroupQueueAccess {
async fn validate_user(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = user::dsl::user
.select(dsl::count_star())
.filter(user::dsl::id.eq(id).and(user::dsl::is_active.eq(true)))
@ -1939,11 +1889,10 @@ impl Validator for ValidateMulticastGroupQueueAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
async fn validate_key(&self, id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::id.eq(id))
@ -1968,7 +1917,7 @@ impl Validator for ValidateMulticastGroupQueueAccess {
}
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
}

View File

@ -49,9 +49,8 @@ async fn health_handler() -> Result<impl warp::Reply, Infallible> {
}
async fn _health_handler() -> Result<()> {
let mut c = get_async_db_conn().await?;
diesel::sql_query("select 1")
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await
.context("PostgreSQL connection error")?;

View File

@ -51,10 +51,9 @@ pub struct Filters {
pub async fn create(ak: ApiKey) -> Result<ApiKey, Error> {
ak.validate()?;
let mut c = get_async_db_conn().await?;
let ak: ApiKey = diesel::insert_into(api_key::table)
.values(&ak)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, ak.id.to_string()))?;
info!(id = %ak.id, "Api-key created");
@ -62,9 +61,8 @@ pub async fn create(ak: ApiKey) -> Result<ApiKey, Error> {
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(api_key::dsl::api_key.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(id.to_string()));
@ -74,8 +72,6 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.select(dsl::count_star())
.filter(api_key::dsl::is_admin.eq(filters.is_admin))
@ -85,12 +81,10 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(api_key::dsl::tenant_id.eq(tenant_id));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list(limit: i64, offset: i64, filters: &Filters) -> Result<Vec<ApiKey>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = api_key::dsl::api_key
.filter(api_key::dsl::is_admin.eq(filters.is_admin))
.into_boxed();
@ -103,7 +97,7 @@ pub async fn list(limit: i64, offset: i64, filters: &Filters) -> Result<Vec<ApiK
.order_by(api_key::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
@ -123,10 +117,9 @@ pub mod test {
}
pub async fn get(id: &Uuid) -> Result<ApiKey, Error> {
let mut c = get_async_db_conn().await?;
api_key::dsl::api_key
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, id.to_string()))
}

View File

@ -292,10 +292,9 @@ impl Default for Integration {
pub async fn create(a: Application) -> Result<Application, Error> {
a.validate()?;
let mut c = get_async_db_conn().await?;
let a: Application = diesel::insert_into(application::table)
.values(&a)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, a.id.to_string()))?;
@ -305,10 +304,9 @@ pub async fn create(a: Application) -> Result<Application, Error> {
}
pub async fn get(id: &Uuid) -> Result<Application, Error> {
let mut c = get_async_db_conn().await?;
let a = application::dsl::application
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
Ok(a)
@ -316,7 +314,7 @@ pub async fn get(id: &Uuid) -> Result<Application, Error> {
pub async fn update(a: Application) -> Result<Application, Error> {
a.validate()?;
let mut c = get_async_db_conn().await?;
let a: Application = diesel::update(application::dsl::application.find(&a.id))
.set((
application::updated_at.eq(Utc::now()),
@ -324,7 +322,7 @@ pub async fn update(a: Application) -> Result<Application, Error> {
application::description.eq(&a.description),
application::tags.eq(&a.tags),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, a.id.to_string()))?;
@ -337,10 +335,9 @@ pub async fn update(a: Application) -> Result<Application, Error> {
}
pub async fn update_mqtt_cls_cert(id: &Uuid, cert: &[u8]) -> Result<Application, Error> {
let mut c = get_async_db_conn().await?;
let app: Application = diesel::update(application::dsl::application.find(&id))
.set(application::mqtt_tls_cert.eq(cert))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
@ -353,9 +350,8 @@ pub async fn update_mqtt_cls_cert(id: &Uuid, cert: &[u8]) -> Result<Application,
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(application::dsl::application.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(id.to_string()));
@ -370,7 +366,6 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = application::dsl::application
.select(dsl::count_star())
.into_boxed();
@ -383,7 +378,7 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(application::dsl::name.ilike(format!("%{}%", search)));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list(
@ -391,7 +386,6 @@ pub async fn list(
offset: i64,
filters: &Filters,
) -> Result<Vec<ApplicationListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = application::dsl::application
.select((
application::id,
@ -414,16 +408,15 @@ pub async fn list(
.order_by(application::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
pub async fn create_integration(i: Integration) -> Result<Integration, Error> {
let mut c = get_async_db_conn().await?;
let i: Integration = diesel::insert_into(application_integration::table)
.values(&i)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, i.kind.to_string()))?;
@ -435,14 +428,13 @@ pub async fn get_integration(
application_id: &Uuid,
kind: IntegrationKind,
) -> Result<Integration, Error> {
let mut c = get_async_db_conn().await?;
let mut i: Integration = application_integration::dsl::application_integration
.filter(
application_integration::dsl::application_id
.eq(application_id)
.and(application_integration::dsl::kind.eq(kind)),
)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, application_id.to_string()))?;
@ -462,7 +454,6 @@ pub async fn get_integration(
}
pub async fn update_integration(i: Integration) -> Result<Integration, Error> {
let mut c = get_async_db_conn().await?;
let i: Integration = diesel::update(
application_integration::dsl::application_integration.filter(
application_integration::dsl::application_id
@ -474,7 +465,7 @@ pub async fn update_integration(i: Integration) -> Result<Integration, Error> {
application_integration::updated_at.eq(Utc::now()),
application_integration::configuration.eq(&i.configuration),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, i.application_id.to_string()))?;
@ -484,7 +475,6 @@ pub async fn update_integration(i: Integration) -> Result<Integration, Error> {
}
pub async fn delete_integration(application_id: &Uuid, kind: IntegrationKind) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(
application_integration::dsl::application_integration.filter(
application_integration::dsl::application_id
@ -492,7 +482,7 @@ pub async fn delete_integration(application_id: &Uuid, kind: IntegrationKind) ->
.and(application_integration::dsl::kind.eq(&kind)),
),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
@ -506,11 +496,10 @@ pub async fn delete_integration(application_id: &Uuid, kind: IntegrationKind) ->
pub async fn get_integrations_for_application(
application_id: &Uuid,
) -> Result<Vec<Integration>, Error> {
let mut c = get_async_db_conn().await?;
let items: Vec<Integration> = application_integration::dsl::application_integration
.filter(application_integration::dsl::application_id.eq(&application_id))
.order_by(application_integration::dsl::kind)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
@ -522,7 +511,6 @@ pub async fn get_measurement_keys(application_id: &Uuid) -> Result<Vec<String>,
pub key: String,
}
let mut c = get_async_db_conn().await?;
let keys: Vec<Measurement> = diesel::sql_query(
r#"
select
@ -538,7 +526,7 @@ pub async fn get_measurement_keys(application_id: &Uuid) -> Result<Vec<String>,
"#,
)
.bind::<diesel::sql_types::Uuid, _>(application_id)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, application_id.to_string()))?;
Ok(keys.iter().map(|k| k.key.clone()).collect())

View File

@ -229,10 +229,9 @@ pub async fn create(d: Device) -> Result<Device, Error> {
}
pub async fn get(dev_eui: &EUI64) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d = device::dsl::device
.find(&dev_eui)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(d)
@ -240,7 +239,7 @@ pub async fn get(dev_eui: &EUI64) -> Result<Device, Error> {
pub async fn update(d: Device) -> Result<Device, Error> {
d.validate()?;
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&d.dev_eui))
.set((
device::updated_at.eq(Utc::now()),
@ -254,7 +253,7 @@ pub async fn update(d: Device) -> Result<Device, Error> {
device::variables.eq(&d.variables),
device::join_eui.eq(&d.join_eui),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, d.dev_eui.to_string()))?;
info!(dev_eui = %d.dev_eui, "Device updated");
@ -262,10 +261,9 @@ pub async fn update(d: Device) -> Result<Device, Error> {
}
pub async fn set_enabled_class(dev_eui: &EUI64, mode: DeviceClass) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set(device::enabled_class.eq(&mode))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, enabled_class = %mode, "Enabled class updated");
@ -273,10 +271,9 @@ pub async fn set_enabled_class(dev_eui: &EUI64, mode: DeviceClass) -> Result<Dev
}
pub async fn set_join_eui(dev_eui: EUI64, join_eui: EUI64) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set(device::join_eui.eq(&join_eui))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, join_eui = %join_eui, "Updated JoinEUI");
@ -284,10 +281,9 @@ pub async fn set_join_eui(dev_eui: EUI64, join_eui: EUI64) -> Result<Device, Err
}
pub async fn set_dev_addr(dev_eui: EUI64, dev_addr: DevAddr) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set(device::dev_addr.eq(&dev_addr))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, dev_addr = %dev_addr, "Updated DevAddr");
@ -302,22 +298,20 @@ pub async fn set_scheduler_run_after(
dev_eui: &EUI64,
new_ts: Option<DateTime<Utc>>,
) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
diesel::update(device::dsl::device.find(&dev_eui))
.set(device::scheduler_run_after.eq(&new_ts))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))
}
pub async fn set_last_seen_dr(dev_eui: &EUI64, dr: u8) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set((
device::last_seen_at.eq(Utc::now()),
device::dr.eq(dr as i16),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, dr = dr, "Data-rate updated");
@ -330,14 +324,13 @@ pub async fn set_status(
external_power_source: bool,
battery_level: Option<BigDecimal>,
) -> Result<Device, Error> {
let mut c = get_async_db_conn().await?;
let d: Device = diesel::update(device::dsl::device.find(&dev_eui))
.set((
device::margin.eq(Some(margin)),
device::external_power_source.eq(external_power_source),
device::battery_level.eq(battery_level),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, "Device status updated");
@ -345,9 +338,8 @@ pub async fn set_status(
}
pub async fn delete(dev_eui: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(device::dsl::device.find(&dev_eui))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(dev_eui.to_string()));
@ -357,7 +349,6 @@ pub async fn delete(dev_eui: &EUI64) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device::dsl::device
.select(dsl::count_star())
.distinct()
@ -376,7 +367,7 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(multicast_group_device::dsl::multicast_group_id.eq(multicast_group_id));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list(
@ -384,7 +375,6 @@ pub async fn list(
offset: i64,
filters: &Filters,
) -> Result<Vec<DeviceListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device::dsl::device
.inner_join(device_profile::table)
.left_join(multicast_group_device::table)
@ -419,13 +409,12 @@ pub async fn list(
q.order_by(device::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
pub async fn get_active_inactive(tenant_id: &Option<Uuid>) -> Result<DevicesActiveInactive, Error> {
let mut c = get_async_db_conn().await?;
diesel::sql_query(r#"
with device_active_inactive as (
select
@ -446,12 +435,11 @@ pub async fn get_active_inactive(tenant_id: &Option<Uuid>) -> Result<DevicesActi
device_active_inactive
"#)
.bind::<diesel::sql_types::Nullable<diesel::sql_types::Uuid>, _>(tenant_id)
.get_result(&mut c).await
.get_result(&mut get_async_db_conn().await?).await
.map_err(|e| Error::from_diesel(e, "".into()))
}
pub async fn get_data_rates(tenant_id: &Option<Uuid>) -> Result<Vec<DevicesDataRate>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device::dsl::device
.inner_join(device_profile::table)
//.select((device::dr, dsl::count_star()))
@ -467,7 +455,7 @@ pub async fn get_data_rates(tenant_id: &Option<Uuid>) -> Result<Vec<DevicesDataR
q = q.filter(device_profile::dsl::tenant_id.eq(id));
}
q.load(&mut c)
q.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}

View File

@ -45,10 +45,9 @@ impl Default for DeviceKeys {
}
pub async fn create(dk: DeviceKeys) -> Result<DeviceKeys, Error> {
let mut c = get_async_db_conn().await?;
let dk: DeviceKeys = diesel::insert_into(device_keys::table)
.values(&dk)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dk.dev_eui.to_string()))?;
info!(
@ -59,20 +58,18 @@ pub async fn create(dk: DeviceKeys) -> Result<DeviceKeys, Error> {
}
pub async fn get(dev_eui: &EUI64) -> Result<DeviceKeys, Error> {
let mut c = get_async_db_conn().await?;
let dk = device_keys::dsl::device_keys
.find(&dev_eui)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(dk)
}
pub async fn update(dk: DeviceKeys) -> Result<DeviceKeys, Error> {
let mut c = get_async_db_conn().await?;
let dk: DeviceKeys = diesel::update(device_keys::dsl::device_keys.find(&dk.dev_eui))
.set(&dk)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dk.dev_eui.to_string()))?;
info!(
@ -83,9 +80,8 @@ pub async fn update(dk: DeviceKeys) -> Result<DeviceKeys, Error> {
}
pub async fn delete(dev_eui: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(device_keys::dsl::device_keys.find(&dev_eui))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(dev_eui.to_string()));
@ -98,10 +94,9 @@ pub async fn delete(dev_eui: &EUI64) -> Result<(), Error> {
}
pub async fn set_dev_nonces(dev_eui: &EUI64, nonces: &[i32]) -> Result<DeviceKeys, Error> {
let mut c = get_async_db_conn().await?;
let dk: DeviceKeys = diesel::update(device_keys::dsl::device_keys.find(dev_eui))
.set(device_keys::dev_nonces.eq(nonces))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(
@ -158,13 +153,12 @@ pub mod test {
use crate::test;
pub async fn reset_nonces(dev_eui: &EUI64) -> Result<DeviceKeys, Error> {
let mut c = get_async_db_conn().await?;
let dk: DeviceKeys = diesel::update(device_keys::dsl::device_keys.find(&dev_eui))
.set((
device_keys::dev_nonces.eq::<Vec<i32>>(Vec::new()),
device_keys::join_nonce.eq(0),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;

View File

@ -198,10 +198,10 @@ pub struct Filters {
pub async fn create(dp: DeviceProfile) -> Result<DeviceProfile, Error> {
dp.validate()?;
let mut c = get_async_db_conn().await?;
let dp: DeviceProfile = diesel::insert_into(device_profile::table)
.values(&dp)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, dp.id.to_string()))?;
info!(id = %dp.id, "Device-profile created");
@ -209,10 +209,9 @@ pub async fn create(dp: DeviceProfile) -> Result<DeviceProfile, Error> {
}
pub async fn get(id: &Uuid) -> Result<DeviceProfile, Error> {
let mut c = get_async_db_conn().await?;
let dp = device_profile::dsl::device_profile
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, id.to_string()))?;
Ok(dp)
@ -220,7 +219,6 @@ pub async fn get(id: &Uuid) -> Result<DeviceProfile, Error> {
pub async fn update(dp: DeviceProfile) -> Result<DeviceProfile, Error> {
dp.validate()?;
let mut c = get_async_db_conn().await?;
let dp: DeviceProfile = diesel::update(device_profile::dsl::device_profile.find(&dp.id))
.set((
@ -282,7 +280,7 @@ pub async fn update(dp: DeviceProfile) -> Result<DeviceProfile, Error> {
device_profile::relay_overall_limit_bucket_size.eq(&dp.relay_overall_limit_bucket_size),
device_profile::allow_roaming.eq(&dp.allow_roaming),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, dp.id.to_string()))?;
@ -291,10 +289,9 @@ pub async fn update(dp: DeviceProfile) -> Result<DeviceProfile, Error> {
}
pub async fn set_measurements(id: Uuid, m: &fields::Measurements) -> Result<DeviceProfile, Error> {
let mut c = get_async_db_conn().await?;
let dp: DeviceProfile = diesel::update(device_profile::dsl::device_profile.find(&id))
.set(device_profile::measurements.eq(m))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
info!(id = %id, "Device-profile measurements updated");
@ -302,9 +299,8 @@ pub async fn set_measurements(id: Uuid, m: &fields::Measurements) -> Result<Devi
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(device_profile::dsl::device_profile.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(error::Error::NotFound(id.to_string()));
@ -314,7 +310,6 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device_profile::dsl::device_profile
.select(dsl::count_star())
.into_boxed();
@ -327,7 +322,7 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(device_profile::dsl::name.ilike(format!("%{}%", search)));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list(
@ -335,7 +330,6 @@ pub async fn list(
offset: i64,
filters: &Filters,
) -> Result<Vec<DeviceProfileListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device_profile::dsl::device_profile
.select((
device_profile::id,
@ -363,7 +357,7 @@ pub async fn list(
.order_by(device_profile::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}

View File

@ -133,10 +133,10 @@ pub struct DeviceProfileTemplateListItem {
pub async fn create(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate, Error> {
dp.validate()?;
let mut c = get_async_db_conn().await?;
let dp: DeviceProfileTemplate = diesel::insert_into(device_profile_template::table)
.values(&dp)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, dp.id.to_string()))?;
info!(id = %dp.id, "Device-profile template created");
@ -145,7 +145,7 @@ pub async fn create(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate,
pub async fn upsert(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate, Error> {
dp.validate()?;
let mut c = get_async_db_conn().await?;
let dp: DeviceProfileTemplate = diesel::insert_into(device_profile_template::table)
.values(&dp)
.on_conflict(device_profile_template::id)
@ -181,7 +181,7 @@ pub async fn upsert(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate,
device_profile_template::measurements.eq(&dp.measurements),
device_profile_template::auto_detect_measurements.eq(&dp.auto_detect_measurements),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, dp.id.to_string()))?;
info!(id = %dp.id, "Device-profile template upserted");
@ -190,10 +190,10 @@ pub async fn upsert(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate,
pub async fn get(id: &str) -> Result<DeviceProfileTemplate, Error> {
let id = id.to_string();
let mut c = get_async_db_conn().await?;
let dp = device_profile_template::dsl::device_profile_template
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, id.clone()))?;
Ok(dp)
@ -201,7 +201,6 @@ pub async fn get(id: &str) -> Result<DeviceProfileTemplate, Error> {
pub async fn update(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate, Error> {
dp.validate()?;
let mut c = get_async_db_conn().await?;
let dp: DeviceProfileTemplate =
diesel::update(device_profile_template::dsl::device_profile_template.find(&dp.id))
@ -235,7 +234,7 @@ pub async fn update(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate,
device_profile_template::abp_rx2_freq.eq(&dp.abp_rx2_freq),
device_profile_template::tags.eq(&dp.tags),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| error::Error::from_diesel(e, dp.id.clone()))?;
info!(id = %dp.id, "Device-profile template updated");
@ -244,9 +243,9 @@ pub async fn update(dp: DeviceProfileTemplate) -> Result<DeviceProfileTemplate,
pub async fn delete(id: &str) -> Result<(), Error> {
let id = id.to_string();
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(device_profile_template::dsl::device_profile_template.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(error::Error::NotFound(id));
@ -256,15 +255,13 @@ pub async fn delete(id: &str) -> Result<(), Error> {
}
pub async fn get_count() -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
Ok(device_profile_template::dsl::device_profile_template
.select(dsl::count_star())
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?)
}
pub async fn list(limit: i64, offset: i64) -> Result<Vec<DeviceProfileTemplateListItem>, Error> {
let mut c = get_async_db_conn().await?;
let items = device_profile_template::dsl::device_profile_template
.select((
device_profile_template::id,
@ -288,7 +285,7 @@ pub async fn list(limit: i64, offset: i64) -> Result<Vec<DeviceProfileTemplateLi
))
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}

View File

@ -64,10 +64,10 @@ impl Default for DeviceQueueItem {
pub async fn enqueue_item(qi: DeviceQueueItem) -> Result<DeviceQueueItem, Error> {
qi.validate()?;
let mut c = get_async_db_conn().await?;
let qi: DeviceQueueItem = diesel::insert_into(device_queue_item::table)
.values(&qi)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, qi.id.to_string()))?;
info!(id = %qi.id, dev_eui = %qi.dev_eui, "Device queue-item enqueued");
@ -75,17 +75,15 @@ pub async fn enqueue_item(qi: DeviceQueueItem) -> Result<DeviceQueueItem, Error>
}
pub async fn get_item(id: &Uuid) -> Result<DeviceQueueItem, Error> {
let mut c = get_async_db_conn().await?;
let qi = device_queue_item::dsl::device_queue_item
.find(id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
Ok(qi)
}
pub async fn update_item(qi: DeviceQueueItem) -> Result<DeviceQueueItem, Error> {
let mut c = get_async_db_conn().await?;
let qi: DeviceQueueItem =
diesel::update(device_queue_item::dsl::device_queue_item.find(&qi.id))
.set((
@ -93,7 +91,7 @@ pub async fn update_item(qi: DeviceQueueItem) -> Result<DeviceQueueItem, Error>
device_queue_item::f_cnt_down.eq(&qi.f_cnt_down),
device_queue_item::timeout_after.eq(&qi.timeout_after),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, qi.id.to_string()))?;
info!(id = %qi.id, dev_eui = %qi.dev_eui, "Device queue-item updated");
@ -101,9 +99,8 @@ pub async fn update_item(qi: DeviceQueueItem) -> Result<DeviceQueueItem, Error>
}
pub async fn delete_item(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(device_queue_item::dsl::device_queue_item.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(id.to_string()));
@ -114,12 +111,11 @@ pub async fn delete_item(id: &Uuid) -> Result<(), Error> {
/// It returns the device queue-item and a bool indicating if there are more items in the queue.
pub async fn get_next_for_dev_eui(dev_eui: &EUI64) -> Result<(DeviceQueueItem, bool), Error> {
let mut c = get_async_db_conn().await?;
let items: Vec<DeviceQueueItem> = device_queue_item::dsl::device_queue_item
.filter(device_queue_item::dev_eui.eq(&dev_eui))
.order_by(device_queue_item::created_at)
.limit(2)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
@ -143,22 +139,20 @@ pub async fn get_next_for_dev_eui(dev_eui: &EUI64) -> Result<(DeviceQueueItem, b
}
pub async fn get_for_dev_eui(dev_eui: &EUI64) -> Result<Vec<DeviceQueueItem>, Error> {
let mut c = get_async_db_conn().await?;
let items = device_queue_item::dsl::device_queue_item
.filter(device_queue_item::dev_eui.eq(&dev_eui))
.order_by(device_queue_item::created_at)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(items)
}
pub async fn flush_for_dev_eui(dev_eui: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let count: usize = diesel::delete(
device_queue_item::dsl::device_queue_item.filter(device_queue_item::dev_eui.eq(&dev_eui)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
info!(dev_eui = %dev_eui, count = count, "Device queue flushed");
@ -166,25 +160,23 @@ pub async fn flush_for_dev_eui(dev_eui: &EUI64) -> Result<(), Error> {
}
pub async fn get_pending_for_dev_eui(dev_eui: &EUI64) -> Result<DeviceQueueItem, Error> {
let mut c = get_async_db_conn().await?;
let qi = device_queue_item::dsl::device_queue_item
.filter(
device_queue_item::dev_eui
.eq(&dev_eui)
.and(device_queue_item::is_pending.eq(true)),
)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(qi)
}
pub async fn get_max_f_cnt_down(dev_eui: EUI64) -> Result<Option<i64>, Error> {
let mut c = get_async_db_conn().await?;
Ok(device_queue_item::dsl::device_queue_item
.select(dsl::max(device_queue_item::f_cnt_down))
.filter(device_queue_item::dsl::dev_eui.eq(dev_eui))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?)
}

View File

@ -154,10 +154,9 @@ pub async fn create(gw: Gateway) -> Result<Gateway, Error> {
}
pub async fn get(gateway_id: &EUI64) -> Result<Gateway, Error> {
let mut c = get_async_db_conn().await?;
let gw = gateway::dsl::gateway
.find(&gateway_id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, gateway_id.to_string()))?;
Ok(gw)
@ -165,7 +164,7 @@ pub async fn get(gateway_id: &EUI64) -> Result<Gateway, Error> {
pub async fn update(gw: Gateway) -> Result<Gateway, Error> {
gw.validate()?;
let mut c = get_async_db_conn().await?;
let gw: Gateway = diesel::update(gateway::dsl::gateway.find(&gw.gateway_id))
.set((
gateway::updated_at.eq(Utc::now()),
@ -177,7 +176,7 @@ pub async fn update(gw: Gateway) -> Result<Gateway, Error> {
gateway::stats_interval_secs.eq(&gw.stats_interval_secs),
gateway::tags.eq(&gw.tags),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, gw.gateway_id.to_string()))?;
info!(
@ -189,13 +188,13 @@ pub async fn update(gw: Gateway) -> Result<Gateway, Error> {
pub async fn update_state(id: &EUI64, props: &HashMap<String, String>) -> Result<Gateway, Error> {
let props = fields::KeyValue::new(props.clone());
let mut c = get_async_db_conn().await?;
let gw: Gateway = diesel::update(gateway::dsl::gateway.find(&id))
.set((
gateway::last_seen_at.eq(Some(Utc::now())),
gateway::properties.eq(props),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
@ -215,7 +214,7 @@ pub async fn update_state_and_loc(
props: &HashMap<String, String>,
) -> Result<Gateway, Error> {
let props = fields::KeyValue::new(props.clone());
let mut c = get_async_db_conn().await?;
let gw: Gateway = diesel::update(gateway::dsl::gateway.find(&id))
.set((
gateway::last_seen_at.eq(Some(Utc::now())),
@ -224,7 +223,7 @@ pub async fn update_state_and_loc(
gateway::altitude.eq(alt),
gateway::properties.eq(props),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
@ -237,10 +236,9 @@ pub async fn update_state_and_loc(
}
pub async fn update_tls_cert(id: &EUI64, cert: &[u8]) -> Result<Gateway, Error> {
let mut c = get_async_db_conn().await?;
let gw: Gateway = diesel::update(gateway::dsl::gateway.find(&id))
.set(gateway::tls_certificate.eq(cert))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
info!(
@ -252,9 +250,8 @@ pub async fn update_tls_cert(id: &EUI64, cert: &[u8]) -> Result<Gateway, Error>
}
pub async fn delete(gateway_id: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(gateway::dsl::gateway.find(&gateway_id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(gateway_id.to_string()));
@ -267,7 +264,6 @@ pub async fn delete(gateway_id: &EUI64) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = gateway::dsl::gateway
.select(dsl::count_star())
.distinct()
@ -286,7 +282,7 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(gateway::dsl::name.ilike(format!("%{}%", search)));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list(
@ -294,7 +290,6 @@ pub async fn list(
offset: i64,
filters: &Filters,
) -> Result<Vec<GatewayListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = gateway::dsl::gateway
.left_join(multicast_group_gateway::table)
.select((
@ -330,13 +325,12 @@ pub async fn list(
.order_by(gateway::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
pub async fn get_meta(gateway_id: &EUI64) -> Result<GatewayMeta, Error> {
let mut c = get_async_db_conn().await?;
let meta = gateway::dsl::gateway
.inner_join(tenant::table)
.select((
@ -349,14 +343,13 @@ pub async fn get_meta(gateway_id: &EUI64) -> Result<GatewayMeta, Error> {
tenant::private_gateways_down,
))
.filter(gateway::dsl::gateway_id.eq(&gateway_id))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, gateway_id.to_string()))?;
Ok(meta)
}
pub async fn get_counts_by_state(tenant_id: &Option<Uuid>) -> Result<GatewayCountsByState, Error> {
let mut c = get_async_db_conn().await?;
let counts: GatewayCountsByState = diesel::sql_query(r#"
select
coalesce(sum(case when last_seen_at is null then 1 end), 0) as never_seen_count,
@ -366,7 +359,7 @@ pub async fn get_counts_by_state(tenant_id: &Option<Uuid>) -> Result<GatewayCoun
gateway
where
$1 is null or tenant_id = $1
"#).bind::<diesel::sql_types::Nullable<diesel::sql_types::Uuid>, _>(tenant_id).get_result(&mut c).await?;
"#).bind::<diesel::sql_types::Nullable<diesel::sql_types::Uuid>, _>(tenant_id).get_result(&mut get_async_db_conn().await?).await?;
Ok(counts)
}

View File

@ -11,13 +11,12 @@ use lrwn::EUI64;
pub async fn get_all_device_data(
dev_eui: EUI64,
) -> Result<(Device, Application, Tenant, DeviceProfile), Error> {
let mut c = get_async_db_conn().await?;
let res = device::table
.inner_join(application::table)
.inner_join(tenant::table.on(application::dsl::tenant_id.eq(tenant::dsl::id)))
.inner_join(device_profile::table)
.filter(device::dsl::dev_eui.eq(&dev_eui))
.first::<(Device, Application, Tenant, DeviceProfile)>(&mut c)
.first::<(Device, Application, Tenant, DeviceProfile)>(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, dev_eui.to_string()))?;
Ok(res)

View File

@ -132,10 +132,10 @@ impl Default for MulticastGroupQueueItem {
pub async fn create(mg: MulticastGroup) -> Result<MulticastGroup, Error> {
mg.validate()?;
let mut c = get_async_db_conn().await?;
let mg: MulticastGroup = diesel::insert_into(multicast_group::table)
.values(&mg)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, mg.id.to_string()))?;
info!(id = %mg.id, "Multicast-group created");
@ -143,17 +143,16 @@ pub async fn create(mg: MulticastGroup) -> Result<MulticastGroup, Error> {
}
pub async fn get(id: &Uuid) -> Result<MulticastGroup, Error> {
let mut c = get_async_db_conn().await?;
multicast_group::dsl::multicast_group
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))
}
pub async fn update(mg: MulticastGroup) -> Result<MulticastGroup, Error> {
mg.validate()?;
let mut c = get_async_db_conn().await?;
let mg: MulticastGroup = diesel::update(multicast_group::dsl::multicast_group.find(&mg.id))
.set((
multicast_group::updated_at.eq(Utc::now()),
@ -169,7 +168,7 @@ pub async fn update(mg: MulticastGroup) -> Result<MulticastGroup, Error> {
multicast_group::class_b_ping_slot_period.eq(&mg.class_b_ping_slot_period),
multicast_group::class_c_scheduling_type.eq(&mg.class_c_scheduling_type),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, mg.id.to_string()))?;
info!(id = %mg.id, "Multicast-group updated");
@ -177,9 +176,8 @@ pub async fn update(mg: MulticastGroup) -> Result<MulticastGroup, Error> {
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(multicast_group::dsl::multicast_group.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(id.to_string()));
@ -189,7 +187,6 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = multicast_group::dsl::multicast_group
.select(dsl::count_star())
.into_boxed();
@ -202,7 +199,7 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
q = q.filter(multicast_group::dsl::name.ilike(format!("%{}%", search)));
}
q.first(&mut c)
q.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
@ -212,7 +209,6 @@ pub async fn list(
offset: i64,
filters: &Filters,
) -> Result<Vec<MulticastGroupListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = multicast_group::dsl::multicast_group
.select((
multicast_group::id,
@ -235,7 +231,7 @@ pub async fn list(
q.order_by(multicast_group::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
@ -282,13 +278,12 @@ pub async fn add_device(group_id: &Uuid, dev_eui: &EUI64) -> Result<(), Error> {
}
pub async fn remove_device(group_id: &Uuid, dev_eui: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(
multicast_group_device::dsl::multicast_group_device
.filter(multicast_group_device::multicast_group_id.eq(&group_id))
.filter(multicast_group_device::dev_eui.eq(&dev_eui)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(format!(
@ -349,13 +344,12 @@ pub async fn add_gateway(group_id: &Uuid, gateway_id: &EUI64) -> Result<(), Erro
}
pub async fn remove_gateway(group_id: &Uuid, gateway_id: &EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(
multicast_group_gateway::dsl::multicast_group_gateway
.filter(multicast_group_gateway::multicast_group_id.eq(&group_id))
.filter(multicast_group_gateway::gateway_id.eq(&gateway_id)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(format!(
@ -368,21 +362,19 @@ pub async fn remove_gateway(group_id: &Uuid, gateway_id: &EUI64) -> Result<(), E
}
pub async fn get_dev_euis(group_id: &Uuid) -> Result<Vec<EUI64>, Error> {
let mut c = get_async_db_conn().await?;
multicast_group_device::dsl::multicast_group_device
.select(multicast_group_device::dev_eui)
.filter(multicast_group_device::dsl::multicast_group_id.eq(&group_id))
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, group_id.to_string()))
}
pub async fn get_gateway_ids(group_id: &Uuid) -> Result<Vec<EUI64>, Error> {
let mut c = get_async_db_conn().await?;
multicast_group_gateway::dsl::multicast_group_gateway
.select(multicast_group_gateway::gateway_id)
.filter(multicast_group_gateway::dsl::multicast_group_id.eq(&group_id))
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, group_id.to_string()))
}
@ -396,7 +388,6 @@ pub async fn enqueue(
gateway_ids: &[EUI64],
) -> Result<(Vec<Uuid>, u32), Error> {
qi.validate()?;
let mut c = get_async_db_conn().await?;
let conf = config::get();
let (ids, f_cnt) = c
@ -568,9 +559,8 @@ pub async fn enqueue(
}
pub async fn delete_queue_item(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(multicast_group_queue_item::dsl::multicast_group_queue_item.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(id.to_string()));
@ -580,12 +570,11 @@ pub async fn delete_queue_item(id: &Uuid) -> Result<(), Error> {
}
pub async fn flush_queue(multicast_group_id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let _ = diesel::delete(
multicast_group_queue_item::dsl::multicast_group_queue_item
.filter(multicast_group_queue_item::multicast_group_id.eq(&multicast_group_id)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, multicast_group_id.to_string()))?;
info!(multicast_group_id = %multicast_group_id, "Multicast-group queue flushed");
@ -593,11 +582,10 @@ pub async fn flush_queue(multicast_group_id: &Uuid) -> Result<(), Error> {
}
pub async fn get_queue(multicast_group_id: &Uuid) -> Result<Vec<MulticastGroupQueueItem>, Error> {
let mut c = get_async_db_conn().await?;
multicast_group_queue_item::dsl::multicast_group_queue_item
.filter(multicast_group_queue_item::dsl::multicast_group_id.eq(&multicast_group_id))
.order_by(multicast_group_queue_item::created_at)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, multicast_group_id.to_string()))
}
@ -650,10 +638,9 @@ pub mod test {
use crate::test;
pub async fn get_queue_item(id: &Uuid) -> Result<MulticastGroupQueueItem, Error> {
let mut c = get_async_db_conn().await?;
multicast_group_queue_item::dsl::multicast_group_queue_item
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))
}

View File

@ -43,7 +43,6 @@ pub struct DeviceListItem {
}
pub async fn get_relay_count(filters: &RelayFilters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device::dsl::device
.select(dsl::count_star())
.inner_join(device_profile::table)
@ -54,7 +53,7 @@ pub async fn get_relay_count(filters: &RelayFilters) -> Result<i64, Error> {
q = q.filter(device::dsl::application_id.eq(application_id));
}
Ok(q.first(&mut c).await?)
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn list_relays(
@ -62,7 +61,6 @@ pub async fn list_relays(
offset: i64,
filters: &RelayFilters,
) -> Result<Vec<RelayListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = device::dsl::device
.inner_join(device_profile::table)
.select((device::dev_eui, device::name))
@ -76,13 +74,12 @@ pub async fn list_relays(
q.order_by(device::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
pub async fn get_device_count(filters: &DeviceFilters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = relay_device::dsl::relay_device
.select(dsl::count_star())
.into_boxed();
@ -91,7 +88,7 @@ pub async fn get_device_count(filters: &DeviceFilters) -> Result<i64, Error> {
q = q.filter(relay_device::dsl::relay_dev_eui.eq(relay_dev_eui));
}
q.first(&mut c)
q.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
@ -101,7 +98,6 @@ pub async fn list_devices(
offset: i64,
filters: &DeviceFilters,
) -> Result<Vec<DeviceListItem>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = relay_device::dsl::relay_device
.inner_join(device::table.on(relay_device::dsl::dev_eui.eq(device::dsl::dev_eui)))
.inner_join(
@ -125,7 +121,7 @@ pub async fn list_devices(
q.order_by(device::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, "".into()))
}
@ -219,13 +215,12 @@ pub async fn add_device(relay_dev_eui: EUI64, device_dev_eui: EUI64) -> Result<(
}
pub async fn remove_device(relay_dev_eui: EUI64, device_dev_eui: EUI64) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(
relay_device::dsl::relay_device
.filter(relay_device::relay_dev_eui.eq(&relay_dev_eui))
.filter(relay_device::dev_eui.eq(&device_dev_eui)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(format!(

View File

@ -48,7 +48,6 @@ pub async fn global_search(
let query = format!("%{}%", query);
let tags = serde_json::to_value(tags).context("To serde_json value")?;
let mut c = get_async_db_conn().await?;
let res: Vec<SearchResult> = diesel::sql_query(
r#"
-- device
@ -157,7 +156,7 @@ pub async fn global_search(
.bind::<diesel::sql_types::BigInt, _>(limit as i64)
.bind::<diesel::sql_types::BigInt, _>(offset as i64)
.bind::<diesel::sql_types::Jsonb, _>(tags)
.load(&mut c).await?;
.load(&mut get_async_db_conn().await?).await?;
Ok(res)
}

View File

@ -104,10 +104,10 @@ pub struct Filters {
pub async fn create(t: Tenant) -> Result<Tenant, Error> {
t.validate()?;
let mut c = get_async_db_conn().await?;
let t: Tenant = diesel::insert_into(tenant::table)
.values(&t)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, t.id.to_string()))?;
info!(id = %t.id, "Tenant created");
@ -115,10 +115,9 @@ pub async fn create(t: Tenant) -> Result<Tenant, Error> {
}
pub async fn get(id: &Uuid) -> Result<Tenant, Error> {
let mut c = get_async_db_conn().await?;
let t = tenant::dsl::tenant
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
Ok(t)
@ -126,7 +125,7 @@ pub async fn get(id: &Uuid) -> Result<Tenant, Error> {
pub async fn update(t: Tenant) -> Result<Tenant, Error> {
t.validate()?;
let mut c = get_async_db_conn().await?;
let t: Tenant = diesel::update(tenant::dsl::tenant.find(&t.id))
.set((
tenant::updated_at.eq(Utc::now()),
@ -139,7 +138,7 @@ pub async fn update(t: Tenant) -> Result<Tenant, Error> {
tenant::private_gateways_down.eq(&t.private_gateways_down),
tenant::tags.eq(&t.tags),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, t.id.to_string()))?;
info!(id = %t.id, "Tenant updated");
@ -147,9 +146,8 @@ pub async fn update(t: Tenant) -> Result<Tenant, Error> {
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(tenant::dsl::tenant.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
@ -161,7 +159,6 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let mut q = tenant::dsl::tenant
.left_join(tenant_user::table)
.into_boxed();
@ -176,13 +173,12 @@ pub async fn get_count(filters: &Filters) -> Result<i64, Error> {
Ok(
q.select(dsl::sql::<diesel::sql_types::BigInt>("count(distinct id)"))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?,
)
}
pub async fn list(limit: i64, offset: i64, filters: &Filters) -> Result<Vec<Tenant>, Error> {
let mut c = get_async_db_conn().await?;
let mut q = tenant::dsl::tenant
.left_join(tenant_user::table)
.select(tenant::all_columns)
@ -200,15 +196,14 @@ pub async fn list(limit: i64, offset: i64, filters: &Filters) -> Result<Vec<Tena
q = q.filter(tenant::dsl::name.ilike(format!("%{}%", search)));
}
let items = q.load(&mut c).await?;
let items = q.load(&mut get_async_db_conn().await?).await?;
Ok(items)
}
pub async fn add_user(tu: TenantUser) -> Result<TenantUser, Error> {
let mut c = get_async_db_conn().await?;
let tu: TenantUser = diesel::insert_into(tenant_user::table)
.values(&tu)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, tu.user_id.to_string()))?;
info!(
@ -220,14 +215,13 @@ pub async fn add_user(tu: TenantUser) -> Result<TenantUser, Error> {
}
pub async fn update_user(tu: TenantUser) -> Result<TenantUser, Error> {
let mut c = get_async_db_conn().await?;
let tu: TenantUser = diesel::update(
tenant_user::dsl::tenant_user
.filter(tenant_user::dsl::tenant_id.eq(&tu.tenant_id))
.filter(tenant_user::dsl::user_id.eq(&tu.user_id)),
)
.set(&tu)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, tu.user_id.to_string()))?;
info!(
@ -239,22 +233,20 @@ pub async fn update_user(tu: TenantUser) -> Result<TenantUser, Error> {
}
pub async fn get_user(tenant_id: &Uuid, user_id: &Uuid) -> Result<TenantUser, Error> {
let mut c = get_async_db_conn().await?;
let tu: TenantUser = tenant_user::dsl::tenant_user
.filter(tenant_user::dsl::tenant_id.eq(&tenant_id))
.filter(tenant_user::dsl::user_id.eq(&user_id))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, user_id.to_string()))?;
Ok(tu)
}
pub async fn get_user_count(tenant_id: &Uuid) -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = tenant_user::dsl::tenant_user
.select(dsl::count_star())
.filter(tenant_user::dsl::tenant_id.eq(&tenant_id))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
@ -264,7 +256,6 @@ pub async fn get_users(
limit: i64,
offset: i64,
) -> Result<Vec<TenantUserListItem>, Error> {
let mut c = get_async_db_conn().await?;
let items = tenant_user::dsl::tenant_user
.inner_join(user::table)
.select((
@ -281,20 +272,19 @@ pub async fn get_users(
.order_by(user::dsl::email)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
pub async fn delete_user(tenant_id: &Uuid, user_id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(
tenant_user::dsl::tenant_user
.filter(tenant_user::dsl::tenant_id.eq(&tenant_id))
.filter(tenant_user::dsl::user_id.eq(&user_id)),
)
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(user_id.to_string()));
@ -308,10 +298,9 @@ pub async fn delete_user(tenant_id: &Uuid, user_id: &Uuid) -> Result<(), Error>
}
pub async fn get_tenant_users_for_user(user_id: &Uuid) -> Result<Vec<TenantUser>, Error> {
let mut c = get_async_db_conn().await?;
let items = tenant_user::dsl::tenant_user
.filter(tenant_user::dsl::user_id.eq(&user_id))
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}

View File

@ -66,11 +66,10 @@ impl User {
pub async fn create(u: User) -> Result<User, Error> {
u.validate()?;
let mut c = get_async_db_conn().await?;
let u: User = diesel::insert_into(user::table)
.values(&u)
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, u.id.to_string()))?;
info!(id = %u.id, "User created");
@ -78,40 +77,36 @@ pub async fn create(u: User) -> Result<User, Error> {
}
pub async fn get(id: &Uuid) -> Result<User, Error> {
let mut c = get_async_db_conn().await?;
let u = user::dsl::user
.find(&id)
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
Ok(u)
}
pub async fn get_by_email(email: &str) -> Result<User, Error> {
let mut c = get_async_db_conn().await?;
let u = user::dsl::user
.filter(user::dsl::email.eq(email))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, email.to_string()))?;
Ok(u)
}
pub async fn get_by_external_id(external_id: &str) -> Result<User, Error> {
let mut c = get_async_db_conn().await?;
let u = user::dsl::user
.filter(user::dsl::external_id.eq(external_id))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, external_id.to_string()))?;
Ok(u)
}
pub async fn get_by_email_and_pw(email: &str, pw: &str) -> Result<User, Error> {
let mut c = get_async_db_conn().await?;
let u: User = match user::dsl::user
.filter(user::dsl::email.eq(email))
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, email.to_string()))
{
@ -133,7 +128,7 @@ pub async fn get_by_email_and_pw(email: &str, pw: &str) -> Result<User, Error> {
pub async fn update(u: User) -> Result<User, Error> {
u.validate()?;
let mut c = get_async_db_conn().await?;
let u: User = diesel::update(user::dsl::user.find(&u.id))
.set((
user::updated_at.eq(Utc::now()),
@ -144,7 +139,7 @@ pub async fn update(u: User) -> Result<User, Error> {
user::note.eq(&u.note),
user::external_id.eq(&u.external_id),
))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, u.id.to_string()))?;
info!(user_id = %u.id, "User updated");
@ -152,10 +147,9 @@ pub async fn update(u: User) -> Result<User, Error> {
}
pub async fn set_password_hash(id: &Uuid, hash: &str) -> Result<User, Error> {
let mut c = get_async_db_conn().await?;
let u: User = diesel::update(user::dsl::user.find(&id))
.set(user::password_hash.eq(&hash))
.get_result(&mut c)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
info!(id = %id, "Password set");
@ -163,9 +157,8 @@ pub async fn set_password_hash(id: &Uuid, hash: &str) -> Result<User, Error> {
}
pub async fn delete(id: &Uuid) -> Result<(), Error> {
let mut c = get_async_db_conn().await?;
let ra = diesel::delete(user::dsl::user.find(&id))
.execute(&mut c)
.execute(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, id.to_string()))?;
@ -177,21 +170,19 @@ pub async fn delete(id: &Uuid) -> Result<(), Error> {
}
pub async fn get_count() -> Result<i64, Error> {
let mut c = get_async_db_conn().await?;
let count = user::dsl::user
.select(dsl::count_star())
.first(&mut c)
.first(&mut get_async_db_conn().await?)
.await?;
Ok(count)
}
pub async fn list(limit: i64, offset: i64) -> Result<Vec<User>, Error> {
let mut c = get_async_db_conn().await?;
let items = user::dsl::user
.order_by(user::dsl::email)
.limit(limit)
.offset(offset)
.load(&mut c)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}