From fda489d3152cc5482f64f41f8d049f9ba01b3ab5 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Fri, 13 Oct 2023 11:37:47 +0100 Subject: [PATCH] Add chirpstack_integration crate. This crate can be used to build external ChirpStack integrations. The plan is to move all non-HTTP based integrations to external repositories. The reason is that these integrations usually require quite some external dependencies as these rely on their own SDKs. --- Cargo.lock | 30 ++ Cargo.toml | 1 + Makefile | 1 + chirpstack-integration/Cargo.toml | 29 ++ chirpstack-integration/src/lib.rs | 694 ++++++++++++++++++++++++++++++ 5 files changed, 755 insertions(+) create mode 100644 chirpstack-integration/Cargo.toml create mode 100644 chirpstack-integration/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 9216d45d..c0c601b5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1099,6 +1099,23 @@ dependencies = [ "tonic-build", ] +[[package]] +name = "chirpstack_integration" +version = "4.5.1" +dependencies = [ + "anyhow", + "async-trait", + "chirpstack_api", + "lazy_static", + "redis", + "serde", + "serde_json", + "tokio", + "toml", + "tracing", + "tracing-subscriber", +] + [[package]] name = "chrono" version = "0.4.31" @@ -1208,7 +1225,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4" dependencies = [ "bytes", + "futures-core", "memchr", + "pin-project-lite", + "tokio", + "tokio-util", ] [[package]] @@ -3440,10 +3461,16 @@ version = "0.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba" dependencies = [ + "async-trait", + "bytes", "combine", "crc16", + "futures", + "futures-util", "itoa", + "log", "percent-encoding", + "pin-project-lite", "r2d2", "rand", "rustls 0.21.7", @@ -3451,6 +3478,9 @@ dependencies = [ "ryu", "sha1_smol", "socket2 0.4.9", + "tokio", + "tokio-rustls 0.24.1", + "tokio-util", "url", ] diff --git a/Cargo.toml b/Cargo.toml index f278fa2e..d50cf741 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "chirpstack", + "chirpstack-integration", "lrwn", "lrwn-filters", "backend", diff --git a/Makefile b/Makefile index 4dc77dc9..828a88f5 100644 --- a/Makefile +++ b/Makefile @@ -65,5 +65,6 @@ test: test-all: cd backend && cargo test cd chirpstack && make test-all + cd chirpstack-integration && cargo test cd lrwn && make test cd lrwn-filters && make test diff --git a/chirpstack-integration/Cargo.toml b/chirpstack-integration/Cargo.toml new file mode 100644 index 00000000..6f6fb0fe --- /dev/null +++ b/chirpstack-integration/Cargo.toml @@ -0,0 +1,29 @@ +[package] + name = "chirpstack_integration" + description = "Library for building external ChirpStack integrations" + homepage = "https://www.chirpstack.io/" + license = "MIT" + version = "4.5.1" + authors = ["Orne Brocaar "] + edition = "2021" + repository = "https://github.com/chirpstack/chirpstack" + +[dependencies] + chirpstack_api = { path = "../api/rust", version = "4.5" } + redis = { version = "0.23", features = [ + "cluster-async", + "tokio-rustls-comp", + ] } + anyhow = "1.0" + tracing = "0.1" + tracing-subscriber = { version = "0.3", features = [ + "fmt", + "ansi", + "json", + ], default-features = true } + async-trait = "0.1.73" + serde = { version = "1.0", features = ["derive"] } + tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] } + lazy_static = "1.4" + serde_json = "1.0" + toml = "0.7" diff --git a/chirpstack-integration/src/lib.rs b/chirpstack-integration/src/lib.rs new file mode 100644 index 00000000..b483ab65 --- /dev/null +++ b/chirpstack-integration/src/lib.rs @@ -0,0 +1,694 @@ +#[macro_use] +extern crate lazy_static; + +use std::io::Cursor; +use std::str::FromStr; + +use anyhow::Result; +use async_trait::async_trait; +use serde::Deserialize; +use tokio::sync::RwLock; +use tracing::{error, info, warn, Level}; +use tracing_subscriber::{filter, prelude::*}; + +use chirpstack_api::{integration as integration_pb, prost::Message}; + +lazy_static! { + static ref INTEGRATION: RwLock>> = + RwLock::new(None); +} + +#[derive(Default, Deserialize, Clone)] +#[serde(default)] +pub struct Configuration { + pub logging: Logging, + pub redis: Redis, +} + +#[derive(Deserialize, Clone)] +#[serde(default)] +pub struct Logging { + pub level: String, +} + +impl Default for Logging { + fn default() -> Self { + Logging { + level: "info".into(), + } + } +} + +#[derive(Deserialize, Clone)] +#[serde(default)] +pub struct Redis { + pub servers: Vec, + pub cluster: bool, + pub key_prefix: String, + pub consumer_group: String, + pub consumer_name: String, +} + +impl Default for Redis { + fn default() -> Self { + Redis { + servers: vec!["redis://127.0.0.1/".into()], + cluster: false, + key_prefix: "".into(), + consumer_group: "integration_pulsar".into(), + consumer_name: "main".into(), + } + } +} + +#[async_trait] +pub trait IntegrationTrait { + async fn uplink_event(&self, pl: &integration_pb::UplinkEvent) -> Result<()>; + + async fn join_event(&self, pl: &integration_pb::JoinEvent) -> Result<()>; + + async fn ack_event(&self, pl: &integration_pb::AckEvent) -> Result<()>; + + async fn txack_event(&self, pl: &integration_pb::TxAckEvent) -> Result<()>; + + async fn log_event(&self, pl: &integration_pb::LogEvent) -> Result<()>; + + async fn status_event(&self, pl: &integration_pb::StatusEvent) -> Result<()>; + + async fn location_event(&self, pl: &integration_pb::LocationEvent) -> Result<()>; + + async fn integration_event(&self, pl: &integration_pb::IntegrationEvent) -> Result<()>; +} + +struct Integration { + redis_client: RedisClient, + key_prefix: String, + consumer_group: String, + consumer_name: String, +} + +enum RedisClient { + Client(redis::Client), + ClusterClient(redis::cluster::ClusterClient), +} + +impl RedisClient { + async fn get_async_connection(&self) -> Result { + match self { + RedisClient::Client(c) => Ok(RedisConnection::Client(c.get_async_connection().await?)), + RedisClient::ClusterClient(c) => Ok(RedisConnection::ClusterClient( + c.get_async_connection().await?, + )), + } + } +} + +enum RedisConnection { + Client(redis::aio::Connection), + ClusterClient(redis::cluster_async::ClusterConnection), +} + +impl redis::aio::ConnectionLike for RedisConnection { + fn get_db(&self) -> i64 { + match self { + RedisConnection::Client(c) => c.get_db(), + RedisConnection::ClusterClient(c) => c.get_db(), + } + } + + fn req_packed_command<'a>( + &'a mut self, + cmd: &'a redis::Cmd, + ) -> redis::RedisFuture<'a, redis::Value> { + match self { + RedisConnection::Client(c) => c.req_packed_command(cmd), + RedisConnection::ClusterClient(c) => c.req_packed_command(cmd), + } + } + + fn req_packed_commands<'a>( + &'a mut self, + cmd: &'a redis::Pipeline, + offset: usize, + count: usize, + ) -> redis::RedisFuture<'a, Vec> { + match self { + RedisConnection::Client(c) => c.req_packed_commands(cmd, offset, count), + RedisConnection::ClusterClient(c) => c.req_packed_commands(cmd, offset, count), + } + } +} + +impl Integration { + fn new(conf: &Configuration) -> Result { + info!("Initializing ChirpStack Integration backend"); + + let redis_client = if conf.redis.cluster { + info!("Setting up Redis Cluster client"); + RedisClient::ClusterClient( + redis::cluster::ClusterClientBuilder::new(conf.redis.servers.clone()).build()?, + ) + } else { + info!(server = %conf.redis.servers[0], "Setting up Redis client"); + RedisClient::Client(redis::Client::open(conf.redis.servers[0].clone())?) + }; + + Ok(Integration { + redis_client, + key_prefix: conf.redis.key_prefix.clone(), + consumer_group: conf.redis.consumer_group.clone(), + consumer_name: conf.redis.consumer_name.clone(), + }) + } + + async fn start(&self) -> Result<()> { + info!("Getting Redis connection"); + let mut redis_conn = self.redis_client.get_async_connection().await?; + + let key = format!("{}device:stream:event", self.key_prefix); + + // Try to create the consumer group. This will fail in case the consumer group already exists. + let _: usize = match redis::cmd("XGROUP") + .arg("CREATE") + .arg(&key) + .arg(&self.consumer_group) + .arg(0) + .arg("MKSTREAM") + .query_async(&mut redis_conn) + .await + { + Ok(v) => v, + Err(e) => { + warn!(error = %e, "Could not create Redis consumer group, ignore this error if the group already exists"); + 0 + } + }; + + loop { + let srr: redis::streams::StreamReadReply = redis::cmd("XREADGROUP") + .arg("GROUP") + .arg(&self.consumer_group) + .arg(&self.consumer_name) + .arg("COUNT") + .arg(10) + .arg("BLOCK") + .arg(1000) + .arg("STREAMS") + .arg(&key) + .arg(">") + .query_async(&mut redis_conn) + .await?; + + for stream_key in &srr.keys { + for stream_id in &stream_key.ids { + redis::cmd("XACK") + .arg(&key) + .arg(&self.consumer_group) + .arg(&stream_id.id) + .query_async(&mut redis_conn) + .await?; + + for (k, v) in &stream_id.map { + let res = || -> Result<()> { + info!(key = %k, "Event received from Redis stream"); + match k.as_ref() { + "up" => { + if let redis::Value::Data(b) = v { + let pl = integration_pb::UplinkEvent::decode( + &mut Cursor::new(b), + )?; + tokio::spawn(uplink_event(pl)); + } + } + "join" => { + if let redis::Value::Data(b) = v { + let pl = + integration_pb::JoinEvent::decode(&mut Cursor::new(b))?; + tokio::spawn(join_event(pl)); + } + } + "ack" => { + if let redis::Value::Data(b) = v { + let pl = + integration_pb::AckEvent::decode(&mut Cursor::new(b))?; + tokio::spawn(ack_event(pl)); + } + } + "txack" => { + if let redis::Value::Data(b) = v { + let pl = integration_pb::TxAckEvent::decode( + &mut Cursor::new(b), + )?; + tokio::spawn(txack_event(pl)); + } + } + "status" => { + if let redis::Value::Data(b) = v { + let pl = integration_pb::StatusEvent::decode( + &mut Cursor::new(b), + )?; + tokio::spawn(status_event(pl)); + } + } + "log" => { + if let redis::Value::Data(b) = v { + let pl = + integration_pb::LogEvent::decode(&mut Cursor::new(b))?; + tokio::spawn(log_event(pl)); + } + } + "location" => { + if let redis::Value::Data(b) = v { + let pl = integration_pb::LocationEvent::decode( + &mut Cursor::new(b), + )?; + tokio::spawn(location_event(pl)); + } + } + "integration" => { + if let redis::Value::Data(b) = v { + let pl = integration_pb::IntegrationEvent::decode( + &mut Cursor::new(b), + )?; + tokio::spawn(integration_event(pl)); + } + } + _ => { + error!(key = %k, "Unexpected event key"); + } + } + + Ok(()) + }(); + + if let Err(e) = res { + error!(key = %k, error = %e, "Parsing event error"); + } + } + } + } + } + } +} + +pub fn setup_log(conf: &Configuration) -> Result<()> { + let filter = filter::LevelFilter::from_level(Level::from_str(&conf.logging.level).unwrap()); + + tracing_subscriber::registry() + .with(tracing_subscriber::fmt::layer()) + .with(filter) + .init(); + + info!( + "Starting {} (version: {}, docs: {})", + env!("CARGO_PKG_DESCRIPTION"), + env!("CARGO_PKG_VERSION"), + env!("CARGO_PKG_HOMEPAGE") + ); + + Ok(()) +} + +pub async fn register(integration: Box) { + let mut int = INTEGRATION.write().await; + *int = Some(integration); +} + +pub async fn start(conf: Configuration) -> Result<()> { + let int = Integration::new(&conf)?; + int.start().await +} + +async fn uplink_event(pl: integration_pb::UplinkEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().uplink_event(&pl).await { + error!(error = %e, "Uplink event error"); + } +} + +async fn join_event(pl: integration_pb::JoinEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().join_event(&pl).await { + error!(error = %e, "Join event error"); + } +} + +async fn ack_event(pl: integration_pb::AckEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().ack_event(&pl).await { + error!(error = %e, "Ack event error"); + } +} + +async fn txack_event(pl: integration_pb::TxAckEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().txack_event(&pl).await { + error!(error = %e, "Tx ack event error"); + } +} + +async fn status_event(pl: integration_pb::StatusEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().status_event(&pl).await { + error!(error = %e, "Status event error"); + } +} + +async fn log_event(pl: integration_pb::LogEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().log_event(&pl).await { + error!(error = %e, "Log event error"); + } +} + +async fn location_event(pl: integration_pb::LocationEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().location_event(&pl).await { + error!(error = %e, "Location event error"); + } +} + +async fn integration_event(pl: integration_pb::IntegrationEvent) { + let integration = INTEGRATION.read().await; + if let Err(e) = integration.as_ref().unwrap().integration_event(&pl).await { + error!(error = %e, "Integration event error"); + } +} + +#[cfg(test)] +mod test { + use super::*; + use std::env; + use std::time::Duration; + + use tokio::sync::RwLock; + use tokio::time::sleep; + + lazy_static! { + static ref UPLINK_EVENTS: RwLock> = + RwLock::new(Vec::new()); + static ref JOIN_EVENTS: RwLock> = RwLock::new(Vec::new()); + static ref ACK_EVENTS: RwLock> = RwLock::new(Vec::new()); + static ref TXACK_EVENTS: RwLock> = RwLock::new(Vec::new()); + static ref LOG_EVENTS: RwLock> = RwLock::new(Vec::new()); + static ref STATUS_EVENTS: RwLock> = + RwLock::new(Vec::new()); + static ref LOCATION_EVENTS: RwLock> = + RwLock::new(Vec::new()); + static ref INTEGRATION_EVENTS: RwLock> = + RwLock::new(Vec::new()); + } + + struct MockIntegration {} + + #[async_trait] + impl IntegrationTrait for MockIntegration { + async fn uplink_event(&self, pl: &integration_pb::UplinkEvent) -> Result<()> { + UPLINK_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn join_event(&self, pl: &integration_pb::JoinEvent) -> Result<()> { + JOIN_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn ack_event(&self, pl: &integration_pb::AckEvent) -> Result<()> { + ACK_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn txack_event(&self, pl: &integration_pb::TxAckEvent) -> Result<()> { + TXACK_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn log_event(&self, pl: &integration_pb::LogEvent) -> Result<()> { + LOG_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn status_event(&self, pl: &integration_pb::StatusEvent) -> Result<()> { + STATUS_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn location_event(&self, pl: &integration_pb::LocationEvent) -> Result<()> { + LOCATION_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + + async fn integration_event(&self, pl: &integration_pb::IntegrationEvent) -> Result<()> { + INTEGRATION_EVENTS.write().await.push(pl.clone()); + Ok(()) + } + } + + #[tokio::test] + async fn test_integration() { + let redis_url = env::var("TEST_REDIS_URL").unwrap_or("redis://127.0.0.1/1".to_string()); + + setup_log(&Configuration::default()).unwrap(); + register(Box::new(MockIntegration {})).await; + + let conf = Configuration { + redis: Redis { + servers: vec![redis_url.clone()], + consumer_group: "test_group".into(), + consumer_name: "test_consumer".into(), + ..Default::default() + }, + ..Default::default() + }; + + tokio::spawn(start(conf)); + sleep(Duration::from_millis(100)).await; + + let redis_client = redis::Client::open(redis_url).unwrap(); + let mut redis_conn = redis_client.get_async_connection().await.unwrap(); + + println!("Uplink"); + + // uplink + let pl = integration_pb::UplinkEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("up") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = UPLINK_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Join"); + + // join + let pl = integration_pb::JoinEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("join") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = JOIN_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Ack"); + + // ack + let pl = integration_pb::AckEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("ack") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = ACK_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("TxAck"); + + // txack + let pl = integration_pb::TxAckEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("txack") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = TXACK_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Log"); + + // log + let pl = integration_pb::LogEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("log") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = LOG_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Status"); + + // status + let pl = integration_pb::StatusEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("status") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = STATUS_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Location"); + + // location + let pl = integration_pb::LocationEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("location") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = LOCATION_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + + println!("Integration"); + + // integration + let pl = integration_pb::IntegrationEvent::default(); + let _: String = redis::cmd("XADD") + .arg("device:stream:event") + .arg("MAXLEN") + .arg(1) + .arg("*") + .arg("integration") + .arg(pl.encode_to_vec()) + .query_async(&mut redis_conn) + .await + .unwrap(); + + sleep(Duration::from_millis(100)).await; + + let pl_recv = INTEGRATION_EVENTS + .write() + .await + .drain(0..1) + .collect::>() + .first() + .cloned() + .unwrap(); + + assert_eq!(pl, pl_recv); + } +}