Handle integration events async of uplink / downlink flow.

Wrapping the handling of integration events in a tokio::spawn should
already have been there, as we do not want to delay the downlink in case
of slow integrations.
This commit is contained in:
Orne Brocaar 2022-12-15 21:33:23 +00:00
parent fd061d4657
commit aa9923a60b
9 changed files with 198 additions and 47 deletions

View File

@ -333,10 +333,7 @@ impl Data {
},
};
integration::ack_event(&self.application.id, &self.device.variables, &pl)
.await
.context("Publish ack event")?;
integration::ack_event(self.application.id, &self.device.variables, &pl).await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of timeout");
continue;
@ -366,10 +363,7 @@ impl Data {
.collect(),
};
integration::log_event(&self.application.id, &self.device.variables, &pl)
.await
.context("Publish log event")?;
integration::log_event(self.application.id, &self.device.variables, &pl).await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of max. payload size");
continue;

View File

@ -318,7 +318,7 @@ impl TxAck {
..Default::default()
};
integration::log_event(&app.id, &dev.variables, &pl).await?;
integration::log_event(app.id, &dev.variables, &pl).await;
Ok(())
}
@ -366,7 +366,7 @@ impl TxAck {
tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(),
};
integration::txack_event(&app.id, &dev.variables, &pl).await?;
integration::txack_event(app.id, &dev.variables, &pl).await;
Ok(())
}

View File

@ -378,7 +378,8 @@ impl Integration {
)?)),
};
integration_event(&Uuid::from_str(&di.application_id)?, vars, &int_pl).await
integration_event(Uuid::from_str(&di.application_id)?, vars, &int_pl).await;
Ok(())
}
async fn handle_response_downlink(
@ -431,7 +432,8 @@ impl Integration {
}),
};
location_event(&Uuid::from_str(&di.application_id)?, vars, &loc_pl).await
location_event(Uuid::from_str(&di.application_id)?, vars, &loc_pl).await;
Ok(())
}
async fn update_geoloc_buffer(
@ -729,7 +731,7 @@ impl IntegrationTrait for Integration {
location: Some(v),
};
location_event(&Uuid::from_str(&di.application_id)?, vars, &loc_pl).await?;
location_event(Uuid::from_str(&di.application_id)?, vars, &loc_pl).await;
}
Ok(())

View File

@ -133,7 +133,7 @@ pub trait Integration {
}
// Returns a Vec of integrations for the given Application ID.
async fn for_application_id(id: &Uuid) -> Result<Vec<Box<dyn Integration + Sync + Send>>> {
async fn for_application_id(id: Uuid) -> Result<Vec<Box<dyn Integration + Sync + Send>>> {
#[cfg(test)]
{
let m = MOCK_INTEGRATION.read().await;
@ -143,7 +143,7 @@ async fn for_application_id(id: &Uuid) -> Result<Vec<Box<dyn Integration + Sync
}
let mut out: Vec<Box<dyn Integration + Sync + Send>> = Vec::new();
let integrations = application::get_integrations_for_application(id).await?;
let integrations = application::get_integrations_for_application(&id).await?;
for app_i in &integrations {
out.push(match &app_i.configuration {
@ -187,7 +187,24 @@ async fn for_application_id(id: &Uuid) -> Result<Vec<Box<dyn Integration + Sync
}
pub async fn uplink_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::UplinkEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _uplink_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Uplink event error");
}
}
});
}
async fn _uplink_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::UplinkEvent,
) -> Result<()> {
@ -212,7 +229,24 @@ pub async fn uplink_event(
}
pub async fn join_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::JoinEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _join_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Join event error");
}
}
});
}
async fn _join_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::JoinEvent,
) -> Result<()> {
@ -237,7 +271,24 @@ pub async fn join_event(
}
pub async fn ack_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::AckEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _ack_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Ack event error");
}
}
});
}
async fn _ack_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::AckEvent,
) -> Result<()> {
@ -262,7 +313,24 @@ pub async fn ack_event(
}
pub async fn txack_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::TxAckEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _txack_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Txack event error");
}
}
});
}
async fn _txack_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::TxAckEvent,
) -> Result<()> {
@ -287,7 +355,24 @@ pub async fn txack_event(
}
pub async fn log_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::LogEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _log_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Log event error");
}
}
});
}
async fn _log_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::LogEvent,
) -> Result<()> {
@ -312,7 +397,24 @@ pub async fn log_event(
}
pub async fn status_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::StatusEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _status_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Status event error");
}
}
});
}
async fn _status_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::StatusEvent,
) -> Result<()> {
@ -337,7 +439,24 @@ pub async fn status_event(
}
pub async fn location_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::LocationEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _location_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Location event error");
}
}
});
}
async fn _location_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::LocationEvent,
) -> Result<()> {
@ -362,7 +481,24 @@ pub async fn location_event(
}
pub async fn integration_event(
application_id: &Uuid,
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::IntegrationEvent,
) {
tokio::spawn({
let vars = vars.clone();
let pl = pl.clone();
async move {
if let Err(err) = _integration_event(application_id, &vars, &pl).await {
error!(application_id = %application_id, error = %err, "Location event error");
}
}
});
}
async fn _integration_event(
application_id: Uuid,
vars: &HashMap<String, String>,
pl: &integration::IntegrationEvent,
) -> Result<()> {

View File

@ -1,7 +1,7 @@
use anyhow::Result;
use bigdecimal::BigDecimal;
use chrono::{DateTime, Utc};
use tracing::{error, info};
use tracing::info;
use crate::integration;
use crate::storage::{application, device, device_profile, tenant};
@ -48,8 +48,8 @@ pub async fn handle(
let rx_time: DateTime<Utc> =
helpers::get_rx_timestamp(&uplink_frame_set.rx_info_set).into();
if let Err(e) = integration::status_event(
&app.id,
integration::status_event(
app.id,
&dev.variables,
&integration_pb::StatusEvent {
deduplication_id: uplink_frame_set.uplink_set_id.to_string(),
@ -75,10 +75,7 @@ pub async fn handle(
},
},
)
.await
{
error!(error = %e, "Sending status event error");
}
.await;
}
Ok(None)
@ -94,6 +91,8 @@ pub mod test {
use lrwn::EUI64;
use std::collections::HashMap;
use std::str::FromStr;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
#[test]
@ -189,6 +188,9 @@ pub mod test {
.unwrap();
assert_eq!(true, resp.is_none());
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let status_events = mock::get_status_events().await;
assert_eq!(
vec![integration_pb::StatusEvent {

View File

@ -1,10 +1,12 @@
use std::future::Future;
use std::io::Cursor;
use std::pin::Pin;
use std::time::Duration;
use prost::Message;
use redis::streams::StreamReadReply;
use tokio::sync::RwLock;
use tokio::time::sleep;
use crate::gateway::backend::mock as gateway_mock;
use crate::integration::mock;
@ -122,6 +124,9 @@ pub fn integration_log(logs: Vec<String>) -> Validator {
Box::new(move || {
let logs = logs.clone();
Box::pin(async move {
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let mock_logs = mock::get_log_events().await;
assert_eq!(logs.len(), mock_logs.len());
@ -136,6 +141,9 @@ pub fn integration_log(logs: Vec<String>) -> Validator {
pub fn no_uplink_event() -> Validator {
Box::new(move || {
Box::pin(async move {
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let mock_events = mock::get_uplink_events().await;
assert_eq!(0, mock_events.len());
})
@ -146,6 +154,9 @@ pub fn uplink_event(up: integration_pb::UplinkEvent) -> Validator {
Box::new(move || {
let up = up.clone();
Box::pin(async move {
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let mut mock_events = mock::get_uplink_events().await;
assert_eq!(1, mock_events.len());
@ -163,6 +174,9 @@ pub fn ack_event(ack: integration_pb::AckEvent) -> Validator {
Box::new(move || {
let ack = ack.clone();
Box::pin(async move {
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let mut mock_events = mock::get_ack_events().await;
assert_eq!(1, mock_events.len());
@ -180,6 +194,9 @@ pub fn status_event(st: integration_pb::StatusEvent) -> Validator {
Box::new(move || {
let st = st.clone();
Box::pin(async move {
// Integration events are handled async.
sleep(Duration::from_millis(100)).await;
let mut mock_events = mock::get_status_events().await;
assert_eq!(1, mock_events.len());

View File

@ -315,7 +315,7 @@ impl Data {
.cloned()
.collect(),
};
integration::log_event(&app.id, &dev.variables, &pl).await?;
integration::log_event(app.id, &dev.variables, &pl).await;
}
if self.reset {
@ -333,7 +333,7 @@ impl Data {
.cloned()
.collect(),
};
integration::log_event(&app.id, &dev.variables, &pl).await?;
integration::log_event(app.id, &dev.variables, &pl).await;
}
Err(Error::Abort)
@ -700,7 +700,7 @@ impl Data {
Ok(v) => v,
Err(e) => {
integration::log_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::LogEvent {
time: Some(Utc::now().into()),
@ -714,12 +714,12 @@ impl Data {
.collect(),
},
)
.await?;
.await;
None
}
};
integration::uplink_event(&app.id, &dev.variables, &pl).await?;
integration::uplink_event(app.id, &dev.variables, &pl).await;
self.uplink_event = Some(pl);
@ -871,7 +871,7 @@ impl Data {
tags.extend((*dev.tags).clone());
integration::ack_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::AckEvent {
deduplication_id: self.uplink_frame_set.uplink_set_id.to_string(),
@ -892,7 +892,7 @@ impl Data {
f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32,
},
)
.await?;
.await;
Ok(())
}

View File

@ -259,7 +259,7 @@ impl JoinRequest {
Err(v) => match v {
StorageError::InvalidDevNonce => {
integration::log_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::LogEvent {
time: Some(Utc::now().into()),
@ -276,7 +276,7 @@ impl JoinRequest {
.collect(),
},
)
.await?;
.await;
metrics::save(
&format!("device:{}", dev.dev_eui),
@ -314,7 +314,7 @@ impl JoinRequest {
let dev = self.device.as_ref().unwrap();
integration::log_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::LogEvent {
time: Some(Utc::now().into()),
@ -331,7 +331,7 @@ impl JoinRequest {
.collect(),
},
)
.await?;
.await;
metrics::save(
&format!("device:{}", dev.dev_eui),
@ -739,7 +739,7 @@ impl JoinRequest {
dev_addr: self.dev_addr.as_ref().unwrap().to_string(),
};
integration::join_event(&app.id, &dev.variables, &pl).await?;
integration::join_event(app.id, &dev.variables, &pl).await;
Ok(())
}
}

View File

@ -301,7 +301,7 @@ impl JoinRequest {
Err(v) => match v {
StorageError::InvalidDevNonce => {
integration::log_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::LogEvent {
time: Some(Utc::now().into()),
@ -318,7 +318,7 @@ impl JoinRequest {
.collect(),
},
)
.await?;
.await;
metrics::save(
&format!("device:{}", dev.dev_eui),
@ -356,7 +356,7 @@ impl JoinRequest {
let dev = self.device.as_ref().unwrap();
integration::log_event(
&app.id,
app.id,
&dev.variables,
&integration_pb::LogEvent {
time: Some(Utc::now().into()),
@ -373,7 +373,7 @@ impl JoinRequest {
.collect(),
},
)
.await?;
.await;
metrics::save(
&format!("device:{}", dev.dev_eui),
@ -676,7 +676,7 @@ impl JoinRequest {
dev_addr: self.dev_addr.as_ref().unwrap().to_string(),
};
integration::join_event(&app.id, &dev.variables, &pl).await?;
integration::join_event(app.id, &dev.variables, &pl).await;
Ok(())
}