diff --git a/chirpstack/src/api/internal.rs b/chirpstack/src/api/internal.rs index b3547af1..8cb0270e 100644 --- a/chirpstack/src/api/internal.rs +++ b/chirpstack/src/api/internal.rs @@ -22,7 +22,7 @@ use super::error::ToStatus; use super::helpers::ToProto; use super::{helpers, oidc}; use crate::storage::{api_key, device, error::Error, gateway, redis_key, search, tenant, user}; -use crate::{config, eventlog, framelog, region}; +use crate::{config, region, streams}; use lrwn::EUI64; pub struct Internal { @@ -599,7 +599,7 @@ impl InternalService for Internal { let (redis_tx, mut redis_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1); - let mut framelog_future = Box::pin(framelog::get_frame_logs(key, 10, redis_tx)); + let mut framelog_future = Box::pin(streams::frames::get_frame_logs(key, 10, redis_tx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); tokio::spawn(async move { @@ -666,7 +666,7 @@ impl InternalService for Internal { let (redis_tx, mut redis_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1); - let mut framelog_future = Box::pin(framelog::get_frame_logs(key, 10, redis_tx)); + let mut framelog_future = Box::pin(streams::frames::get_frame_logs(key, 10, redis_tx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); tokio::spawn(async move { @@ -734,7 +734,7 @@ impl InternalService for Internal { let (redis_tx, mut redis_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1); - let mut eventlog_future = Box::pin(eventlog::get_event_logs(key, 10, redis_tx)); + let mut eventlog_future = Box::pin(streams::events::get_event_logs(key, 10, redis_tx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); tokio::spawn(async move { diff --git a/chirpstack/src/api/mod.rs b/chirpstack/src/api/mod.rs index c9438f00..84255413 100644 --- a/chirpstack/src/api/mod.rs +++ b/chirpstack/src/api/mod.rs @@ -41,7 +41,7 @@ use super::config; use crate::api::auth::validator; use crate::helpers::errors::PrintFullError; use crate::monitoring::prometheus; -use crate::requestlog; +use crate::streams::api_requests; pub mod application; pub mod auth; @@ -421,7 +421,7 @@ where }; task::spawn(async move { - if let Err(e) = requestlog::log_request(&req_log).await { + if let Err(e) = api_requests::log_request(&req_log).await { error!(error = %e.full(), "Log request error"); } }); diff --git a/chirpstack/src/downlink/tx_ack.rs b/chirpstack/src/downlink/tx_ack.rs index 1b441e90..7274e31a 100644 --- a/chirpstack/src/downlink/tx_ack.rs +++ b/chirpstack/src/downlink/tx_ack.rs @@ -11,8 +11,8 @@ use crate::storage::{ device::{self, DeviceClass}, device_profile, device_queue, device_session, downlink_frame, multicast, tenant, }; -use crate::{framelog, integration, metalog}; -use chirpstack_api::{common, gw, integration as integration_pb, internal, streams}; +use crate::{integration, streams}; +use chirpstack_api::{common, gw, integration as integration_pb, internal, streams as streams_pb}; pub struct TxAck { downlink_tx_ack: gw::DownlinkTxAck, @@ -609,7 +609,7 @@ impl TxAck { let dfi = self.downlink_frame_item.as_ref().unwrap(); let phy = self.phy_payload.as_mut().unwrap(); - let dfl = streams::DownlinkFrameLog { + let dfl = streams_pb::DownlinkFrameLog { time: Some(Utc::now().into()), phy_payload: dfi.phy_payload.clone(), tx_info: dfi.tx_info.clone(), @@ -642,7 +642,7 @@ impl TxAck { // Log for gateway (with potentially encrypted mac-commands). info!(gateway_id = %dfl.gateway_id, "Log downlink-frame for gateway"); - framelog::log_downlink_for_gateway(&dfl).await?; + streams::frames::log_downlink_for_gateway(&dfl).await?; // Downlink is not related to a device / DevEUI, e.g. it could be a multicast // or proprietary downlink. Therefore we can't log it for a specific DevEUI. @@ -678,7 +678,7 @@ impl TxAck { phy.decrypt_f_opts(&nwk_s_enc_key)?; } - let dfl = streams::DownlinkFrameLog { + let dfl = streams_pb::DownlinkFrameLog { time: dfl.time.clone(), phy_payload: phy.to_vec()?, tx_info: dfl.tx_info.clone(), @@ -693,7 +693,7 @@ impl TxAck { // Log for device. info!(device_eui = %dfl.dev_eui, "Log downlink-frame for device"); - framelog::log_downlink_for_device(&dfl).await?; + streams::frames::log_downlink_for_device(&dfl).await?; Ok(()) } @@ -705,7 +705,7 @@ impl TxAck { let dfi = self.downlink_frame_item.as_ref().unwrap(); let phy = self.phy_payload.as_ref().unwrap(); - let dm = streams::DownlinkMeta { + let dm = streams_pb::DownlinkMeta { dev_eui: if !df.dev_eui.is_empty() { EUI64::from_slice(&df.dev_eui)?.to_string() } else { @@ -749,7 +749,7 @@ impl TxAck { gateway_id: df.downlink_frame.as_ref().unwrap().gateway_id.clone(), }; - metalog::log_downlink(&dm).await + streams::meta::log_downlink(&dm).await } fn is_error(&self) -> bool { diff --git a/chirpstack/src/integration/redis.rs b/chirpstack/src/integration/redis.rs index 9209bec8..75b956e1 100644 --- a/chirpstack/src/integration/redis.rs +++ b/chirpstack/src/integration/redis.rs @@ -6,7 +6,7 @@ use prost::Message; use tracing::info; use super::Integration as IntegrationTrait; -use crate::eventlog; +use crate::streams; use chirpstack_api::integration; pub struct Integration {} @@ -30,7 +30,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("up", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("up", &dev_info.dev_eui, &b).await } async fn join_event( @@ -43,7 +43,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("join", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("join", &dev_info.dev_eui, &b).await } async fn ack_event( @@ -56,7 +56,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("ack", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("ack", &dev_info.dev_eui, &b).await } async fn txack_event( @@ -69,7 +69,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("txack", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("txack", &dev_info.dev_eui, &b).await } async fn log_event( @@ -82,7 +82,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("log", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("log", &dev_info.dev_eui, &b).await } async fn status_event( @@ -95,7 +95,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("status", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("status", &dev_info.dev_eui, &b).await } async fn location_event( @@ -108,7 +108,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("location", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("location", &dev_info.dev_eui, &b).await } async fn integration_event( @@ -121,7 +121,7 @@ impl IntegrationTrait for Integration { .as_ref() .ok_or_else(|| anyhow!("device_info is None"))?; let b = pl.encode_to_vec(); - eventlog::log_event_for_device("integration", &dev_info.dev_eui, &b).await + streams::events::log_event_for_device("integration", &dev_info.dev_eui, &b).await } } diff --git a/chirpstack/src/main.rs b/chirpstack/src/main.rs index 5f9c6a87..933c0280 100644 --- a/chirpstack/src/main.rs +++ b/chirpstack/src/main.rs @@ -30,19 +30,16 @@ mod codec; mod config; mod devaddr; mod downlink; -mod eventlog; -mod framelog; mod gateway; mod gpstime; mod helpers; mod integration; mod maccommand; -mod metalog; mod monitoring; mod region; -mod requestlog; mod sensitivity; mod storage; +mod streams; #[cfg(test)] mod test; mod uplink; diff --git a/chirpstack/src/requestlog.rs b/chirpstack/src/streams/api_requests.rs similarity index 100% rename from chirpstack/src/requestlog.rs rename to chirpstack/src/streams/api_requests.rs diff --git a/chirpstack/src/eventlog.rs b/chirpstack/src/streams/events.rs similarity index 100% rename from chirpstack/src/eventlog.rs rename to chirpstack/src/streams/events.rs diff --git a/chirpstack/src/framelog.rs b/chirpstack/src/streams/frames.rs similarity index 100% rename from chirpstack/src/framelog.rs rename to chirpstack/src/streams/frames.rs diff --git a/chirpstack/src/metalog.rs b/chirpstack/src/streams/meta.rs similarity index 100% rename from chirpstack/src/metalog.rs rename to chirpstack/src/streams/meta.rs diff --git a/chirpstack/src/streams/mod.rs b/chirpstack/src/streams/mod.rs new file mode 100644 index 00000000..d62c335c --- /dev/null +++ b/chirpstack/src/streams/mod.rs @@ -0,0 +1,4 @@ +pub mod api_requests; +pub mod events; +pub mod frames; +pub mod meta; diff --git a/chirpstack/src/uplink/data.rs b/chirpstack/src/uplink/data.rs index 75811c58..4ca45eaf 100644 --- a/chirpstack/src/uplink/data.rs +++ b/chirpstack/src/uplink/data.rs @@ -19,8 +19,8 @@ use crate::storage::{ device::{self, DeviceClass}, device_gateway, device_profile, device_queue, device_session, fields, metrics, tenant, }; -use crate::{codec, config, downlink, framelog, integration, maccommand, metalog, region}; -use chirpstack_api::{integration as integration_pb, internal, streams}; +use crate::{codec, config, downlink, integration, maccommand, region, streams}; +use chirpstack_api::{integration as integration_pb, internal, streams as streams_pb}; use lrwn::{AES128Key, EUI64}; pub struct Data { @@ -276,9 +276,10 @@ impl Data { info!(dev_addr = %dev_addr, "None of the device-sessions for dev_addr resulted in valid MIC"); // Log uplink for null DevEUI. - let mut ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; + let mut ufl: streams_pb::UplinkFrameLog = + (&self.uplink_frame_set).try_into()?; ufl.dev_eui = "0000000000000000".to_string(); - framelog::log_uplink_for_device(&ufl).await?; + streams::frames::log_uplink_for_device(&ufl).await?; return Err(Error::Abort); } @@ -648,7 +649,7 @@ impl Data { async fn log_uplink_frame_set(&self) -> Result<()> { trace!("Logging uplink frame-set"); - let mut ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; + let mut ufl: streams_pb::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; ufl.dev_eui = self.device.as_ref().unwrap().dev_eui.to_string(); // self.phy_payload holds the decrypted payload. @@ -656,7 +657,7 @@ impl Data { ufl.plaintext_frm_payload = true; ufl.phy_payload = self.phy_payload.to_vec()?; - framelog::log_uplink_for_device(&ufl).await?; + streams::frames::log_uplink_for_device(&ufl).await?; Ok(()) } @@ -733,7 +734,7 @@ impl Data { trace!("Logging uplink meta"); if let lrwn::Payload::MACPayload(mac_pl) = &self.phy_payload.payload { - let um = streams::UplinkMeta { + let um = streams_pb::UplinkMeta { dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), tx_info: Some(self.uplink_frame_set.tx_info.clone()), rx_info: self.uplink_frame_set.rx_info_set.clone(), @@ -763,7 +764,7 @@ impl Data { message_type: self.phy_payload.mhdr.m_type.to_proto().into(), }; - metalog::log_uplink(&um).await?; + streams::meta::log_uplink(&um).await?; } Ok(()) diff --git a/chirpstack/src/uplink/join.rs b/chirpstack/src/uplink/join.rs index ecaaa89f..16a0c177 100644 --- a/chirpstack/src/uplink/join.rs +++ b/chirpstack/src/uplink/join.rs @@ -28,10 +28,8 @@ use crate::storage::{ error::Error as StorageError, metrics, tenant, }; -use crate::{ - config, devaddr::get_random_dev_addr, downlink, framelog, integration, metalog, region, -}; -use chirpstack_api::{common, integration as integration_pb, internal, streams}; +use crate::{config, devaddr::get_random_dev_addr, downlink, integration, region, streams}; +use chirpstack_api::{common, integration as integration_pb, internal, streams as streams_pb}; pub struct JoinRequest { uplink_frame_set: UplinkFrameSet, @@ -383,8 +381,8 @@ impl JoinRequest { async fn log_uplink_frame_set(&self) -> Result<()> { trace!("Logging uplink frame-set"); - let ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; - framelog::log_uplink_for_device(&ufl).await?; + let ufl: streams_pb::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; + streams::frames::log_uplink_for_device(&ufl).await?; Ok(()) } @@ -768,7 +766,7 @@ impl JoinRequest { async fn log_uplink_meta(&self) -> Result<()> { trace!("Logging uplink meta"); - let um = streams::UplinkMeta { + let um = streams_pb::UplinkMeta { dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), tx_info: Some(self.uplink_frame_set.tx_info.clone()), rx_info: self.uplink_frame_set.rx_info_set.clone(), @@ -777,7 +775,7 @@ impl JoinRequest { ..Default::default() }; - metalog::log_uplink(&um).await?; + streams::meta::log_uplink(&um).await?; Ok(()) } diff --git a/chirpstack/src/uplink/join_sns.rs b/chirpstack/src/uplink/join_sns.rs index 019b7928..f5398a97 100644 --- a/chirpstack/src/uplink/join_sns.rs +++ b/chirpstack/src/uplink/join_sns.rs @@ -14,9 +14,9 @@ use crate::storage::{ error::Error as StorageError, metrics, tenant, }; -use crate::{config, devaddr::get_random_dev_addr, integration, metalog, region}; +use crate::{config, devaddr::get_random_dev_addr, integration, region, streams}; use backend::{PRStartAnsPayload, PRStartReqPayload}; -use chirpstack_api::{common, integration as integration_pb, internal, streams}; +use chirpstack_api::{common, integration as integration_pb, internal, streams as streams_pb}; use lrwn::{keys, AES128Key, DevAddr, NetID}; pub struct JoinRequest { @@ -555,7 +555,7 @@ impl JoinRequest { async fn log_uplink_meta(&self) -> Result<()> { trace!("Logging uplink meta"); - let req = streams::UplinkMeta { + let req = streams_pb::UplinkMeta { dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), tx_info: Some(self.uplink_frame_set.tx_info.clone()), rx_info: self.uplink_frame_set.rx_info_set.clone(), @@ -564,7 +564,7 @@ impl JoinRequest { ..Default::default() }; - metalog::log_uplink(&req).await?; + streams::meta::log_uplink(&req).await?; Ok(()) } diff --git a/chirpstack/src/uplink/mod.rs b/chirpstack/src/uplink/mod.rs index fe88f6a2..d828dd3b 100644 --- a/chirpstack/src/uplink/mod.rs +++ b/chirpstack/src/uplink/mod.rs @@ -13,12 +13,12 @@ use tracing::{debug, error, info, span, trace, warn, Instrument, Level}; use uuid::Uuid; use crate::config; -use crate::framelog; use crate::helpers::errors::PrintFullError; use crate::storage::{ device, device_profile, error::Error as StorageError, gateway, get_redis_conn, redis_key, }; -use chirpstack_api::{common, gw, internal, streams}; +use crate::streams; +use chirpstack_api::{common, gw, internal, streams as streams_pb}; use lrwn::region::CommonName; use lrwn::{ForwardUplinkReq, MType, PhyPayload, EUI64}; @@ -57,11 +57,13 @@ pub struct UplinkFrameSet { pub roaming_meta_data: Option, } -impl TryFrom<&UplinkFrameSet> for streams::UplinkFrameLog { +impl TryFrom<&UplinkFrameSet> for streams_pb::UplinkFrameLog { type Error = anyhow::Error; - fn try_from(ufs: &UplinkFrameSet) -> std::result::Result { - let mut ufl = streams::UplinkFrameLog { + fn try_from( + ufs: &UplinkFrameSet, + ) -> std::result::Result { + let mut ufl = streams_pb::UplinkFrameLog { phy_payload: ufs.phy_payload.to_vec()?, tx_info: Some(ufs.tx_info.clone()), rx_info: ufs.rx_info_set.clone(), @@ -315,8 +317,8 @@ pub async fn handle_uplink(deduplication_id: Uuid, uplink: gw::UplinkFrameSet) - .context("Update gateway meta-data")?; debug!("Logging uplink frame to Redis Stream"); - let ufl: streams::UplinkFrameLog = (&uplink).try_into()?; - framelog::log_uplink_for_gateways(&ufl) + let ufl: streams_pb::UplinkFrameLog = (&uplink).try_into()?; + streams::frames::log_uplink_for_gateways(&ufl) .await .context("Log uplink for gateways")?;