diff --git a/chirpstack/src/downlink/data.rs b/chirpstack/src/downlink/data.rs index fe061cd4..222fb398 100644 --- a/chirpstack/src/downlink/data.rs +++ b/chirpstack/src/downlink/data.rs @@ -333,10 +333,7 @@ impl Data { }, }; - integration::ack_event(&self.application.id, &self.device.variables, &pl) - .await - .context("Publish ack event")?; - + integration::ack_event(self.application.id, &self.device.variables, &pl).await; warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of timeout"); continue; @@ -366,10 +363,7 @@ impl Data { .collect(), }; - integration::log_event(&self.application.id, &self.device.variables, &pl) - .await - .context("Publish log event")?; - + integration::log_event(self.application.id, &self.device.variables, &pl).await; warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of max. payload size"); continue; diff --git a/chirpstack/src/downlink/tx_ack.rs b/chirpstack/src/downlink/tx_ack.rs index df85d5b2..760fda38 100644 --- a/chirpstack/src/downlink/tx_ack.rs +++ b/chirpstack/src/downlink/tx_ack.rs @@ -318,7 +318,7 @@ impl TxAck { ..Default::default() }; - integration::log_event(&app.id, &dev.variables, &pl).await?; + integration::log_event(app.id, &dev.variables, &pl).await; Ok(()) } @@ -366,7 +366,7 @@ impl TxAck { tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(), }; - integration::txack_event(&app.id, &dev.variables, &pl).await?; + integration::txack_event(app.id, &dev.variables, &pl).await; Ok(()) } diff --git a/chirpstack/src/integration/loracloud/mod.rs b/chirpstack/src/integration/loracloud/mod.rs index e65dd570..b4874f9e 100644 --- a/chirpstack/src/integration/loracloud/mod.rs +++ b/chirpstack/src/integration/loracloud/mod.rs @@ -378,7 +378,8 @@ impl Integration { )?)), }; - integration_event(&Uuid::from_str(&di.application_id)?, vars, &int_pl).await + integration_event(Uuid::from_str(&di.application_id)?, vars, &int_pl).await; + Ok(()) } async fn handle_response_downlink( @@ -431,7 +432,8 @@ impl Integration { }), }; - location_event(&Uuid::from_str(&di.application_id)?, vars, &loc_pl).await + location_event(Uuid::from_str(&di.application_id)?, vars, &loc_pl).await; + Ok(()) } async fn update_geoloc_buffer( @@ -729,7 +731,7 @@ impl IntegrationTrait for Integration { location: Some(v), }; - location_event(&Uuid::from_str(&di.application_id)?, vars, &loc_pl).await?; + location_event(Uuid::from_str(&di.application_id)?, vars, &loc_pl).await; } Ok(()) diff --git a/chirpstack/src/integration/mod.rs b/chirpstack/src/integration/mod.rs index 0e22c24f..ec9c07a7 100644 --- a/chirpstack/src/integration/mod.rs +++ b/chirpstack/src/integration/mod.rs @@ -133,7 +133,7 @@ pub trait Integration { } // Returns a Vec of integrations for the given Application ID. -async fn for_application_id(id: &Uuid) -> Result>> { +async fn for_application_id(id: Uuid) -> Result>> { #[cfg(test)] { let m = MOCK_INTEGRATION.read().await; @@ -143,7 +143,7 @@ async fn for_application_id(id: &Uuid) -> Result> = Vec::new(); - let integrations = application::get_integrations_for_application(id).await?; + let integrations = application::get_integrations_for_application(&id).await?; for app_i in &integrations { out.push(match &app_i.configuration { @@ -187,7 +187,24 @@ async fn for_application_id(id: &Uuid) -> Result, + pl: &integration::UplinkEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _uplink_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Uplink event error"); + } + } + }); +} + +async fn _uplink_event( + application_id: Uuid, vars: &HashMap, pl: &integration::UplinkEvent, ) -> Result<()> { @@ -212,7 +229,24 @@ pub async fn uplink_event( } pub async fn join_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::JoinEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _join_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Join event error"); + } + } + }); +} + +async fn _join_event( + application_id: Uuid, vars: &HashMap, pl: &integration::JoinEvent, ) -> Result<()> { @@ -237,7 +271,24 @@ pub async fn join_event( } pub async fn ack_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::AckEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _ack_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Ack event error"); + } + } + }); +} + +async fn _ack_event( + application_id: Uuid, vars: &HashMap, pl: &integration::AckEvent, ) -> Result<()> { @@ -262,7 +313,24 @@ pub async fn ack_event( } pub async fn txack_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::TxAckEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _txack_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Txack event error"); + } + } + }); +} + +async fn _txack_event( + application_id: Uuid, vars: &HashMap, pl: &integration::TxAckEvent, ) -> Result<()> { @@ -287,7 +355,24 @@ pub async fn txack_event( } pub async fn log_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::LogEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _log_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Log event error"); + } + } + }); +} + +async fn _log_event( + application_id: Uuid, vars: &HashMap, pl: &integration::LogEvent, ) -> Result<()> { @@ -312,7 +397,24 @@ pub async fn log_event( } pub async fn status_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::StatusEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _status_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Status event error"); + } + } + }); +} + +async fn _status_event( + application_id: Uuid, vars: &HashMap, pl: &integration::StatusEvent, ) -> Result<()> { @@ -337,7 +439,24 @@ pub async fn status_event( } pub async fn location_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::LocationEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _location_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Location event error"); + } + } + }); +} + +async fn _location_event( + application_id: Uuid, vars: &HashMap, pl: &integration::LocationEvent, ) -> Result<()> { @@ -362,7 +481,24 @@ pub async fn location_event( } pub async fn integration_event( - application_id: &Uuid, + application_id: Uuid, + vars: &HashMap, + pl: &integration::IntegrationEvent, +) { + tokio::spawn({ + let vars = vars.clone(); + let pl = pl.clone(); + + async move { + if let Err(err) = _integration_event(application_id, &vars, &pl).await { + error!(application_id = %application_id, error = %err, "Location event error"); + } + } + }); +} + +async fn _integration_event( + application_id: Uuid, vars: &HashMap, pl: &integration::IntegrationEvent, ) -> Result<()> { diff --git a/chirpstack/src/maccommand/dev_status.rs b/chirpstack/src/maccommand/dev_status.rs index b6ac3628..123a8eee 100644 --- a/chirpstack/src/maccommand/dev_status.rs +++ b/chirpstack/src/maccommand/dev_status.rs @@ -1,7 +1,7 @@ use anyhow::Result; use bigdecimal::BigDecimal; use chrono::{DateTime, Utc}; -use tracing::{error, info}; +use tracing::info; use crate::integration; use crate::storage::{application, device, device_profile, tenant}; @@ -48,8 +48,8 @@ pub async fn handle( let rx_time: DateTime = helpers::get_rx_timestamp(&uplink_frame_set.rx_info_set).into(); - if let Err(e) = integration::status_event( - &app.id, + integration::status_event( + app.id, &dev.variables, &integration_pb::StatusEvent { deduplication_id: uplink_frame_set.uplink_set_id.to_string(), @@ -75,10 +75,7 @@ pub async fn handle( }, }, ) - .await - { - error!(error = %e, "Sending status event error"); - } + .await; } Ok(None) @@ -94,6 +91,8 @@ pub mod test { use lrwn::EUI64; use std::collections::HashMap; use std::str::FromStr; + use std::time::Duration; + use tokio::time::sleep; use uuid::Uuid; #[test] @@ -189,6 +188,9 @@ pub mod test { .unwrap(); assert_eq!(true, resp.is_none()); + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let status_events = mock::get_status_events().await; assert_eq!( vec![integration_pb::StatusEvent { diff --git a/chirpstack/src/test/assert.rs b/chirpstack/src/test/assert.rs index bf6a43ba..064ba42b 100644 --- a/chirpstack/src/test/assert.rs +++ b/chirpstack/src/test/assert.rs @@ -1,10 +1,12 @@ use std::future::Future; use std::io::Cursor; use std::pin::Pin; +use std::time::Duration; use prost::Message; use redis::streams::StreamReadReply; use tokio::sync::RwLock; +use tokio::time::sleep; use crate::gateway::backend::mock as gateway_mock; use crate::integration::mock; @@ -122,6 +124,9 @@ pub fn integration_log(logs: Vec) -> Validator { Box::new(move || { let logs = logs.clone(); Box::pin(async move { + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let mock_logs = mock::get_log_events().await; assert_eq!(logs.len(), mock_logs.len()); @@ -136,6 +141,9 @@ pub fn integration_log(logs: Vec) -> Validator { pub fn no_uplink_event() -> Validator { Box::new(move || { Box::pin(async move { + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let mock_events = mock::get_uplink_events().await; assert_eq!(0, mock_events.len()); }) @@ -146,6 +154,9 @@ pub fn uplink_event(up: integration_pb::UplinkEvent) -> Validator { Box::new(move || { let up = up.clone(); Box::pin(async move { + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let mut mock_events = mock::get_uplink_events().await; assert_eq!(1, mock_events.len()); @@ -163,6 +174,9 @@ pub fn ack_event(ack: integration_pb::AckEvent) -> Validator { Box::new(move || { let ack = ack.clone(); Box::pin(async move { + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let mut mock_events = mock::get_ack_events().await; assert_eq!(1, mock_events.len()); @@ -180,6 +194,9 @@ pub fn status_event(st: integration_pb::StatusEvent) -> Validator { Box::new(move || { let st = st.clone(); Box::pin(async move { + // Integration events are handled async. + sleep(Duration::from_millis(100)).await; + let mut mock_events = mock::get_status_events().await; assert_eq!(1, mock_events.len()); diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index be080eeb..9b2bf71a 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -315,7 +315,7 @@ impl Data { .cloned() .collect(), }; - integration::log_event(&app.id, &dev.variables, &pl).await?; + integration::log_event(app.id, &dev.variables, &pl).await; } if self.reset { @@ -333,7 +333,7 @@ impl Data { .cloned() .collect(), }; - integration::log_event(&app.id, &dev.variables, &pl).await?; + integration::log_event(app.id, &dev.variables, &pl).await; } Err(Error::Abort) @@ -700,7 +700,7 @@ impl Data { Ok(v) => v, Err(e) => { integration::log_event( - &app.id, + app.id, &dev.variables, &integration_pb::LogEvent { time: Some(Utc::now().into()), @@ -714,12 +714,12 @@ impl Data { .collect(), }, ) - .await?; + .await; None } }; - integration::uplink_event(&app.id, &dev.variables, &pl).await?; + integration::uplink_event(app.id, &dev.variables, &pl).await; self.uplink_event = Some(pl); @@ -871,7 +871,7 @@ impl Data { tags.extend((*dev.tags).clone()); integration::ack_event( - &app.id, + app.id, &dev.variables, &integration_pb::AckEvent { deduplication_id: self.uplink_frame_set.uplink_set_id.to_string(), @@ -892,7 +892,7 @@ impl Data { f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32, }, ) - .await?; + .await; Ok(()) } diff --git a/chirpstack/src/uplink/join.rs b/chirpstack/src/uplink/join.rs index 449a5215..fb416206 100644 --- a/chirpstack/src/uplink/join.rs +++ b/chirpstack/src/uplink/join.rs @@ -259,7 +259,7 @@ impl JoinRequest { Err(v) => match v { StorageError::InvalidDevNonce => { integration::log_event( - &app.id, + app.id, &dev.variables, &integration_pb::LogEvent { time: Some(Utc::now().into()), @@ -276,7 +276,7 @@ impl JoinRequest { .collect(), }, ) - .await?; + .await; metrics::save( &format!("device:{}", dev.dev_eui), @@ -314,7 +314,7 @@ impl JoinRequest { let dev = self.device.as_ref().unwrap(); integration::log_event( - &app.id, + app.id, &dev.variables, &integration_pb::LogEvent { time: Some(Utc::now().into()), @@ -331,7 +331,7 @@ impl JoinRequest { .collect(), }, ) - .await?; + .await; metrics::save( &format!("device:{}", dev.dev_eui), @@ -739,7 +739,7 @@ impl JoinRequest { dev_addr: self.dev_addr.as_ref().unwrap().to_string(), }; - integration::join_event(&app.id, &dev.variables, &pl).await?; + integration::join_event(app.id, &dev.variables, &pl).await; Ok(()) } } diff --git a/chirpstack/src/uplink/join_sns.rs b/chirpstack/src/uplink/join_sns.rs index 167c903e..a20b78ac 100644 --- a/chirpstack/src/uplink/join_sns.rs +++ b/chirpstack/src/uplink/join_sns.rs @@ -301,7 +301,7 @@ impl JoinRequest { Err(v) => match v { StorageError::InvalidDevNonce => { integration::log_event( - &app.id, + app.id, &dev.variables, &integration_pb::LogEvent { time: Some(Utc::now().into()), @@ -318,7 +318,7 @@ impl JoinRequest { .collect(), }, ) - .await?; + .await; metrics::save( &format!("device:{}", dev.dev_eui), @@ -356,7 +356,7 @@ impl JoinRequest { let dev = self.device.as_ref().unwrap(); integration::log_event( - &app.id, + app.id, &dev.variables, &integration_pb::LogEvent { time: Some(Utc::now().into()), @@ -373,7 +373,7 @@ impl JoinRequest { .collect(), }, ) - .await?; + .await; metrics::save( &format!("device:{}", dev.dev_eui), @@ -676,7 +676,7 @@ impl JoinRequest { dev_addr: self.dev_addr.as_ref().unwrap().to_string(), }; - integration::join_event(&app.id, &dev.variables, &pl).await?; + integration::join_event(app.id, &dev.variables, &pl).await; Ok(()) }