Group stream modules together.

This commit is contained in:
Orne Brocaar 2023-10-31 13:26:59 +00:00
parent 1d38ae2544
commit 9596f7c2d0
14 changed files with 56 additions and 54 deletions

View File

@ -22,7 +22,7 @@ use super::error::ToStatus;
use super::helpers::ToProto; use super::helpers::ToProto;
use super::{helpers, oidc}; use super::{helpers, oidc};
use crate::storage::{api_key, device, error::Error, gateway, redis_key, search, tenant, user}; use crate::storage::{api_key, device, error::Error, gateway, redis_key, search, tenant, user};
use crate::{config, eventlog, framelog, region}; use crate::{config, region, streams};
use lrwn::EUI64; use lrwn::EUI64;
pub struct Internal { pub struct Internal {
@ -599,7 +599,7 @@ impl InternalService for Internal {
let (redis_tx, mut redis_rx) = mpsc::channel(1); let (redis_tx, mut redis_rx) = mpsc::channel(1);
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let mut framelog_future = Box::pin(framelog::get_frame_logs(key, 10, redis_tx)); let mut framelog_future = Box::pin(streams::frames::get_frame_logs(key, 10, redis_tx));
let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx));
tokio::spawn(async move { tokio::spawn(async move {
@ -666,7 +666,7 @@ impl InternalService for Internal {
let (redis_tx, mut redis_rx) = mpsc::channel(1); let (redis_tx, mut redis_rx) = mpsc::channel(1);
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let mut framelog_future = Box::pin(framelog::get_frame_logs(key, 10, redis_tx)); let mut framelog_future = Box::pin(streams::frames::get_frame_logs(key, 10, redis_tx));
let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx));
tokio::spawn(async move { tokio::spawn(async move {
@ -734,7 +734,7 @@ impl InternalService for Internal {
let (redis_tx, mut redis_rx) = mpsc::channel(1); let (redis_tx, mut redis_rx) = mpsc::channel(1);
let (stream_tx, stream_rx) = mpsc::channel(1); let (stream_tx, stream_rx) = mpsc::channel(1);
let mut eventlog_future = Box::pin(eventlog::get_event_logs(key, 10, redis_tx)); let mut eventlog_future = Box::pin(streams::events::get_event_logs(key, 10, redis_tx));
let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx)); let (drop_receiver, mut close_rx) = DropReceiver::new(ReceiverStream::new(stream_rx));
tokio::spawn(async move { tokio::spawn(async move {

View File

@ -41,7 +41,7 @@ use super::config;
use crate::api::auth::validator; use crate::api::auth::validator;
use crate::helpers::errors::PrintFullError; use crate::helpers::errors::PrintFullError;
use crate::monitoring::prometheus; use crate::monitoring::prometheus;
use crate::requestlog; use crate::streams::api_requests;
pub mod application; pub mod application;
pub mod auth; pub mod auth;
@ -421,7 +421,7 @@ where
}; };
task::spawn(async move { task::spawn(async move {
if let Err(e) = requestlog::log_request(&req_log).await { if let Err(e) = api_requests::log_request(&req_log).await {
error!(error = %e.full(), "Log request error"); error!(error = %e.full(), "Log request error");
} }
}); });

View File

@ -11,8 +11,8 @@ use crate::storage::{
device::{self, DeviceClass}, device::{self, DeviceClass},
device_profile, device_queue, device_session, downlink_frame, multicast, tenant, device_profile, device_queue, device_session, downlink_frame, multicast, tenant,
}; };
use crate::{framelog, integration, metalog}; use crate::{integration, streams};
use chirpstack_api::{common, gw, integration as integration_pb, internal, streams}; use chirpstack_api::{common, gw, integration as integration_pb, internal, streams as streams_pb};
pub struct TxAck { pub struct TxAck {
downlink_tx_ack: gw::DownlinkTxAck, downlink_tx_ack: gw::DownlinkTxAck,
@ -609,7 +609,7 @@ impl TxAck {
let dfi = self.downlink_frame_item.as_ref().unwrap(); let dfi = self.downlink_frame_item.as_ref().unwrap();
let phy = self.phy_payload.as_mut().unwrap(); let phy = self.phy_payload.as_mut().unwrap();
let dfl = streams::DownlinkFrameLog { let dfl = streams_pb::DownlinkFrameLog {
time: Some(Utc::now().into()), time: Some(Utc::now().into()),
phy_payload: dfi.phy_payload.clone(), phy_payload: dfi.phy_payload.clone(),
tx_info: dfi.tx_info.clone(), tx_info: dfi.tx_info.clone(),
@ -642,7 +642,7 @@ impl TxAck {
// Log for gateway (with potentially encrypted mac-commands). // Log for gateway (with potentially encrypted mac-commands).
info!(gateway_id = %dfl.gateway_id, "Log downlink-frame for gateway"); info!(gateway_id = %dfl.gateway_id, "Log downlink-frame for gateway");
framelog::log_downlink_for_gateway(&dfl).await?; streams::frames::log_downlink_for_gateway(&dfl).await?;
// Downlink is not related to a device / DevEUI, e.g. it could be a multicast // Downlink is not related to a device / DevEUI, e.g. it could be a multicast
// or proprietary downlink. Therefore we can't log it for a specific DevEUI. // or proprietary downlink. Therefore we can't log it for a specific DevEUI.
@ -678,7 +678,7 @@ impl TxAck {
phy.decrypt_f_opts(&nwk_s_enc_key)?; phy.decrypt_f_opts(&nwk_s_enc_key)?;
} }
let dfl = streams::DownlinkFrameLog { let dfl = streams_pb::DownlinkFrameLog {
time: dfl.time.clone(), time: dfl.time.clone(),
phy_payload: phy.to_vec()?, phy_payload: phy.to_vec()?,
tx_info: dfl.tx_info.clone(), tx_info: dfl.tx_info.clone(),
@ -693,7 +693,7 @@ impl TxAck {
// Log for device. // Log for device.
info!(device_eui = %dfl.dev_eui, "Log downlink-frame for device"); info!(device_eui = %dfl.dev_eui, "Log downlink-frame for device");
framelog::log_downlink_for_device(&dfl).await?; streams::frames::log_downlink_for_device(&dfl).await?;
Ok(()) Ok(())
} }
@ -705,7 +705,7 @@ impl TxAck {
let dfi = self.downlink_frame_item.as_ref().unwrap(); let dfi = self.downlink_frame_item.as_ref().unwrap();
let phy = self.phy_payload.as_ref().unwrap(); let phy = self.phy_payload.as_ref().unwrap();
let dm = streams::DownlinkMeta { let dm = streams_pb::DownlinkMeta {
dev_eui: if !df.dev_eui.is_empty() { dev_eui: if !df.dev_eui.is_empty() {
EUI64::from_slice(&df.dev_eui)?.to_string() EUI64::from_slice(&df.dev_eui)?.to_string()
} else { } else {
@ -749,7 +749,7 @@ impl TxAck {
gateway_id: df.downlink_frame.as_ref().unwrap().gateway_id.clone(), gateway_id: df.downlink_frame.as_ref().unwrap().gateway_id.clone(),
}; };
metalog::log_downlink(&dm).await streams::meta::log_downlink(&dm).await
} }
fn is_error(&self) -> bool { fn is_error(&self) -> bool {

View File

@ -6,7 +6,7 @@ use prost::Message;
use tracing::info; use tracing::info;
use super::Integration as IntegrationTrait; use super::Integration as IntegrationTrait;
use crate::eventlog; use crate::streams;
use chirpstack_api::integration; use chirpstack_api::integration;
pub struct Integration {} pub struct Integration {}
@ -30,7 +30,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("up", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("up", &dev_info.dev_eui, &b).await
} }
async fn join_event( async fn join_event(
@ -43,7 +43,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("join", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("join", &dev_info.dev_eui, &b).await
} }
async fn ack_event( async fn ack_event(
@ -56,7 +56,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("ack", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("ack", &dev_info.dev_eui, &b).await
} }
async fn txack_event( async fn txack_event(
@ -69,7 +69,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("txack", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("txack", &dev_info.dev_eui, &b).await
} }
async fn log_event( async fn log_event(
@ -82,7 +82,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("log", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("log", &dev_info.dev_eui, &b).await
} }
async fn status_event( async fn status_event(
@ -95,7 +95,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("status", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("status", &dev_info.dev_eui, &b).await
} }
async fn location_event( async fn location_event(
@ -108,7 +108,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("location", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("location", &dev_info.dev_eui, &b).await
} }
async fn integration_event( async fn integration_event(
@ -121,7 +121,7 @@ impl IntegrationTrait for Integration {
.as_ref() .as_ref()
.ok_or_else(|| anyhow!("device_info is None"))?; .ok_or_else(|| anyhow!("device_info is None"))?;
let b = pl.encode_to_vec(); let b = pl.encode_to_vec();
eventlog::log_event_for_device("integration", &dev_info.dev_eui, &b).await streams::events::log_event_for_device("integration", &dev_info.dev_eui, &b).await
} }
} }

View File

@ -30,19 +30,16 @@ mod codec;
mod config; mod config;
mod devaddr; mod devaddr;
mod downlink; mod downlink;
mod eventlog;
mod framelog;
mod gateway; mod gateway;
mod gpstime; mod gpstime;
mod helpers; mod helpers;
mod integration; mod integration;
mod maccommand; mod maccommand;
mod metalog;
mod monitoring; mod monitoring;
mod region; mod region;
mod requestlog;
mod sensitivity; mod sensitivity;
mod storage; mod storage;
mod streams;
#[cfg(test)] #[cfg(test)]
mod test; mod test;
mod uplink; mod uplink;

View File

@ -0,0 +1,4 @@
pub mod api_requests;
pub mod events;
pub mod frames;
pub mod meta;

View File

@ -19,8 +19,8 @@ use crate::storage::{
device::{self, DeviceClass}, device::{self, DeviceClass},
device_gateway, device_profile, device_queue, device_session, fields, metrics, tenant, device_gateway, device_profile, device_queue, device_session, fields, metrics, tenant,
}; };
use crate::{codec, config, downlink, framelog, integration, maccommand, metalog, region}; use crate::{codec, config, downlink, integration, maccommand, region, streams};
use chirpstack_api::{integration as integration_pb, internal, streams}; use chirpstack_api::{integration as integration_pb, internal, streams as streams_pb};
use lrwn::{AES128Key, EUI64}; use lrwn::{AES128Key, EUI64};
pub struct Data { pub struct Data {
@ -276,9 +276,10 @@ impl Data {
info!(dev_addr = %dev_addr, "None of the device-sessions for dev_addr resulted in valid MIC"); info!(dev_addr = %dev_addr, "None of the device-sessions for dev_addr resulted in valid MIC");
// Log uplink for null DevEUI. // Log uplink for null DevEUI.
let mut ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; let mut ufl: streams_pb::UplinkFrameLog =
(&self.uplink_frame_set).try_into()?;
ufl.dev_eui = "0000000000000000".to_string(); ufl.dev_eui = "0000000000000000".to_string();
framelog::log_uplink_for_device(&ufl).await?; streams::frames::log_uplink_for_device(&ufl).await?;
return Err(Error::Abort); return Err(Error::Abort);
} }
@ -648,7 +649,7 @@ impl Data {
async fn log_uplink_frame_set(&self) -> Result<()> { async fn log_uplink_frame_set(&self) -> Result<()> {
trace!("Logging uplink frame-set"); trace!("Logging uplink frame-set");
let mut ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; let mut ufl: streams_pb::UplinkFrameLog = (&self.uplink_frame_set).try_into()?;
ufl.dev_eui = self.device.as_ref().unwrap().dev_eui.to_string(); ufl.dev_eui = self.device.as_ref().unwrap().dev_eui.to_string();
// self.phy_payload holds the decrypted payload. // self.phy_payload holds the decrypted payload.
@ -656,7 +657,7 @@ impl Data {
ufl.plaintext_frm_payload = true; ufl.plaintext_frm_payload = true;
ufl.phy_payload = self.phy_payload.to_vec()?; ufl.phy_payload = self.phy_payload.to_vec()?;
framelog::log_uplink_for_device(&ufl).await?; streams::frames::log_uplink_for_device(&ufl).await?;
Ok(()) Ok(())
} }
@ -733,7 +734,7 @@ impl Data {
trace!("Logging uplink meta"); trace!("Logging uplink meta");
if let lrwn::Payload::MACPayload(mac_pl) = &self.phy_payload.payload { if let lrwn::Payload::MACPayload(mac_pl) = &self.phy_payload.payload {
let um = streams::UplinkMeta { let um = streams_pb::UplinkMeta {
dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(),
tx_info: Some(self.uplink_frame_set.tx_info.clone()), tx_info: Some(self.uplink_frame_set.tx_info.clone()),
rx_info: self.uplink_frame_set.rx_info_set.clone(), rx_info: self.uplink_frame_set.rx_info_set.clone(),
@ -763,7 +764,7 @@ impl Data {
message_type: self.phy_payload.mhdr.m_type.to_proto().into(), message_type: self.phy_payload.mhdr.m_type.to_proto().into(),
}; };
metalog::log_uplink(&um).await?; streams::meta::log_uplink(&um).await?;
} }
Ok(()) Ok(())

View File

@ -28,10 +28,8 @@ use crate::storage::{
error::Error as StorageError, error::Error as StorageError,
metrics, tenant, metrics, tenant,
}; };
use crate::{ use crate::{config, devaddr::get_random_dev_addr, downlink, integration, region, streams};
config, devaddr::get_random_dev_addr, downlink, framelog, integration, metalog, region, use chirpstack_api::{common, integration as integration_pb, internal, streams as streams_pb};
};
use chirpstack_api::{common, integration as integration_pb, internal, streams};
pub struct JoinRequest { pub struct JoinRequest {
uplink_frame_set: UplinkFrameSet, uplink_frame_set: UplinkFrameSet,
@ -383,8 +381,8 @@ impl JoinRequest {
async fn log_uplink_frame_set(&self) -> Result<()> { async fn log_uplink_frame_set(&self) -> Result<()> {
trace!("Logging uplink frame-set"); trace!("Logging uplink frame-set");
let ufl: streams::UplinkFrameLog = (&self.uplink_frame_set).try_into()?; let ufl: streams_pb::UplinkFrameLog = (&self.uplink_frame_set).try_into()?;
framelog::log_uplink_for_device(&ufl).await?; streams::frames::log_uplink_for_device(&ufl).await?;
Ok(()) Ok(())
} }
@ -768,7 +766,7 @@ impl JoinRequest {
async fn log_uplink_meta(&self) -> Result<()> { async fn log_uplink_meta(&self) -> Result<()> {
trace!("Logging uplink meta"); trace!("Logging uplink meta");
let um = streams::UplinkMeta { let um = streams_pb::UplinkMeta {
dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(),
tx_info: Some(self.uplink_frame_set.tx_info.clone()), tx_info: Some(self.uplink_frame_set.tx_info.clone()),
rx_info: self.uplink_frame_set.rx_info_set.clone(), rx_info: self.uplink_frame_set.rx_info_set.clone(),
@ -777,7 +775,7 @@ impl JoinRequest {
..Default::default() ..Default::default()
}; };
metalog::log_uplink(&um).await?; streams::meta::log_uplink(&um).await?;
Ok(()) Ok(())
} }

View File

@ -14,9 +14,9 @@ use crate::storage::{
error::Error as StorageError, error::Error as StorageError,
metrics, tenant, metrics, tenant,
}; };
use crate::{config, devaddr::get_random_dev_addr, integration, metalog, region}; use crate::{config, devaddr::get_random_dev_addr, integration, region, streams};
use backend::{PRStartAnsPayload, PRStartReqPayload}; use backend::{PRStartAnsPayload, PRStartReqPayload};
use chirpstack_api::{common, integration as integration_pb, internal, streams}; use chirpstack_api::{common, integration as integration_pb, internal, streams as streams_pb};
use lrwn::{keys, AES128Key, DevAddr, NetID}; use lrwn::{keys, AES128Key, DevAddr, NetID};
pub struct JoinRequest { pub struct JoinRequest {
@ -555,7 +555,7 @@ impl JoinRequest {
async fn log_uplink_meta(&self) -> Result<()> { async fn log_uplink_meta(&self) -> Result<()> {
trace!("Logging uplink meta"); trace!("Logging uplink meta");
let req = streams::UplinkMeta { let req = streams_pb::UplinkMeta {
dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(), dev_eui: self.device.as_ref().unwrap().dev_eui.to_string(),
tx_info: Some(self.uplink_frame_set.tx_info.clone()), tx_info: Some(self.uplink_frame_set.tx_info.clone()),
rx_info: self.uplink_frame_set.rx_info_set.clone(), rx_info: self.uplink_frame_set.rx_info_set.clone(),
@ -564,7 +564,7 @@ impl JoinRequest {
..Default::default() ..Default::default()
}; };
metalog::log_uplink(&req).await?; streams::meta::log_uplink(&req).await?;
Ok(()) Ok(())
} }

View File

@ -13,12 +13,12 @@ use tracing::{debug, error, info, span, trace, warn, Instrument, Level};
use uuid::Uuid; use uuid::Uuid;
use crate::config; use crate::config;
use crate::framelog;
use crate::helpers::errors::PrintFullError; use crate::helpers::errors::PrintFullError;
use crate::storage::{ use crate::storage::{
device, device_profile, error::Error as StorageError, gateway, get_redis_conn, redis_key, device, device_profile, error::Error as StorageError, gateway, get_redis_conn, redis_key,
}; };
use chirpstack_api::{common, gw, internal, streams}; use crate::streams;
use chirpstack_api::{common, gw, internal, streams as streams_pb};
use lrwn::region::CommonName; use lrwn::region::CommonName;
use lrwn::{ForwardUplinkReq, MType, PhyPayload, EUI64}; use lrwn::{ForwardUplinkReq, MType, PhyPayload, EUI64};
@ -57,11 +57,13 @@ pub struct UplinkFrameSet {
pub roaming_meta_data: Option<RoamingMetaData>, pub roaming_meta_data: Option<RoamingMetaData>,
} }
impl TryFrom<&UplinkFrameSet> for streams::UplinkFrameLog { impl TryFrom<&UplinkFrameSet> for streams_pb::UplinkFrameLog {
type Error = anyhow::Error; type Error = anyhow::Error;
fn try_from(ufs: &UplinkFrameSet) -> std::result::Result<streams::UplinkFrameLog, Self::Error> { fn try_from(
let mut ufl = streams::UplinkFrameLog { ufs: &UplinkFrameSet,
) -> std::result::Result<streams_pb::UplinkFrameLog, Self::Error> {
let mut ufl = streams_pb::UplinkFrameLog {
phy_payload: ufs.phy_payload.to_vec()?, phy_payload: ufs.phy_payload.to_vec()?,
tx_info: Some(ufs.tx_info.clone()), tx_info: Some(ufs.tx_info.clone()),
rx_info: ufs.rx_info_set.clone(), rx_info: ufs.rx_info_set.clone(),
@ -315,8 +317,8 @@ pub async fn handle_uplink(deduplication_id: Uuid, uplink: gw::UplinkFrameSet) -
.context("Update gateway meta-data")?; .context("Update gateway meta-data")?;
debug!("Logging uplink frame to Redis Stream"); debug!("Logging uplink frame to Redis Stream");
let ufl: streams::UplinkFrameLog = (&uplink).try_into()?; let ufl: streams_pb::UplinkFrameLog = (&uplink).try_into()?;
framelog::log_uplink_for_gateways(&ufl) streams::frames::log_uplink_for_gateways(&ufl)
.await .await
.context("Log uplink for gateways")?; .context("Log uplink for gateways")?;