Add chirpstack_integration crate.

This crate can be used to build external ChirpStack integrations.
The plan is to move all non-HTTP based integrations to external
repositories. The reason is that these integrations usually require
quite some external dependencies as these rely on their own SDKs.
This commit is contained in:
Orne Brocaar 2023-10-13 11:37:47 +01:00
parent 81d37a1d6c
commit fda489d315
5 changed files with 755 additions and 0 deletions

30
Cargo.lock generated
View File

@ -1099,6 +1099,23 @@ dependencies = [
"tonic-build",
]
[[package]]
name = "chirpstack_integration"
version = "4.5.1"
dependencies = [
"anyhow",
"async-trait",
"chirpstack_api",
"lazy_static",
"redis",
"serde",
"serde_json",
"tokio",
"toml",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "chrono"
version = "0.4.31"
@ -1208,7 +1225,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35ed6e9d84f0b51a7f52daf1c7d71dd136fd7a3f41a8462b8cdb8c78d920fad4"
dependencies = [
"bytes",
"futures-core",
"memchr",
"pin-project-lite",
"tokio",
"tokio-util",
]
[[package]]
@ -3440,10 +3461,16 @@ version = "0.23.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f49cdc0bb3f412bf8e7d1bd90fe1d9eb10bc5c399ba90973c14662a27b3f8ba"
dependencies = [
"async-trait",
"bytes",
"combine",
"crc16",
"futures",
"futures-util",
"itoa",
"log",
"percent-encoding",
"pin-project-lite",
"r2d2",
"rand",
"rustls 0.21.7",
@ -3451,6 +3478,9 @@ dependencies = [
"ryu",
"sha1_smol",
"socket2 0.4.9",
"tokio",
"tokio-rustls 0.24.1",
"tokio-util",
"url",
]

View File

@ -2,6 +2,7 @@
resolver = "2"
members = [
"chirpstack",
"chirpstack-integration",
"lrwn",
"lrwn-filters",
"backend",

View File

@ -65,5 +65,6 @@ test:
test-all:
cd backend && cargo test
cd chirpstack && make test-all
cd chirpstack-integration && cargo test
cd lrwn && make test
cd lrwn-filters && make test

View File

@ -0,0 +1,29 @@
[package]
name = "chirpstack_integration"
description = "Library for building external ChirpStack integrations"
homepage = "https://www.chirpstack.io/"
license = "MIT"
version = "4.5.1"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2021"
repository = "https://github.com/chirpstack/chirpstack"
[dependencies]
chirpstack_api = { path = "../api/rust", version = "4.5" }
redis = { version = "0.23", features = [
"cluster-async",
"tokio-rustls-comp",
] }
anyhow = "1.0"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = [
"fmt",
"ansi",
"json",
], default-features = true }
async-trait = "0.1.73"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.32", features = ["macros", "rt-multi-thread"] }
lazy_static = "1.4"
serde_json = "1.0"
toml = "0.7"

View File

@ -0,0 +1,694 @@
#[macro_use]
extern crate lazy_static;
use std::io::Cursor;
use std::str::FromStr;
use anyhow::Result;
use async_trait::async_trait;
use serde::Deserialize;
use tokio::sync::RwLock;
use tracing::{error, info, warn, Level};
use tracing_subscriber::{filter, prelude::*};
use chirpstack_api::{integration as integration_pb, prost::Message};
lazy_static! {
static ref INTEGRATION: RwLock<Option<Box<dyn IntegrationTrait + Sync + Send>>> =
RwLock::new(None);
}
#[derive(Default, Deserialize, Clone)]
#[serde(default)]
pub struct Configuration {
pub logging: Logging,
pub redis: Redis,
}
#[derive(Deserialize, Clone)]
#[serde(default)]
pub struct Logging {
pub level: String,
}
impl Default for Logging {
fn default() -> Self {
Logging {
level: "info".into(),
}
}
}
#[derive(Deserialize, Clone)]
#[serde(default)]
pub struct Redis {
pub servers: Vec<String>,
pub cluster: bool,
pub key_prefix: String,
pub consumer_group: String,
pub consumer_name: String,
}
impl Default for Redis {
fn default() -> Self {
Redis {
servers: vec!["redis://127.0.0.1/".into()],
cluster: false,
key_prefix: "".into(),
consumer_group: "integration_pulsar".into(),
consumer_name: "main".into(),
}
}
}
#[async_trait]
pub trait IntegrationTrait {
async fn uplink_event(&self, pl: &integration_pb::UplinkEvent) -> Result<()>;
async fn join_event(&self, pl: &integration_pb::JoinEvent) -> Result<()>;
async fn ack_event(&self, pl: &integration_pb::AckEvent) -> Result<()>;
async fn txack_event(&self, pl: &integration_pb::TxAckEvent) -> Result<()>;
async fn log_event(&self, pl: &integration_pb::LogEvent) -> Result<()>;
async fn status_event(&self, pl: &integration_pb::StatusEvent) -> Result<()>;
async fn location_event(&self, pl: &integration_pb::LocationEvent) -> Result<()>;
async fn integration_event(&self, pl: &integration_pb::IntegrationEvent) -> Result<()>;
}
struct Integration {
redis_client: RedisClient,
key_prefix: String,
consumer_group: String,
consumer_name: String,
}
enum RedisClient {
Client(redis::Client),
ClusterClient(redis::cluster::ClusterClient),
}
impl RedisClient {
async fn get_async_connection(&self) -> Result<RedisConnection> {
match self {
RedisClient::Client(c) => Ok(RedisConnection::Client(c.get_async_connection().await?)),
RedisClient::ClusterClient(c) => Ok(RedisConnection::ClusterClient(
c.get_async_connection().await?,
)),
}
}
}
enum RedisConnection {
Client(redis::aio::Connection),
ClusterClient(redis::cluster_async::ClusterConnection),
}
impl redis::aio::ConnectionLike for RedisConnection {
fn get_db(&self) -> i64 {
match self {
RedisConnection::Client(c) => c.get_db(),
RedisConnection::ClusterClient(c) => c.get_db(),
}
}
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> redis::RedisFuture<'a, redis::Value> {
match self {
RedisConnection::Client(c) => c.req_packed_command(cmd),
RedisConnection::ClusterClient(c) => c.req_packed_command(cmd),
}
}
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> redis::RedisFuture<'a, Vec<redis::Value>> {
match self {
RedisConnection::Client(c) => c.req_packed_commands(cmd, offset, count),
RedisConnection::ClusterClient(c) => c.req_packed_commands(cmd, offset, count),
}
}
}
impl Integration {
fn new(conf: &Configuration) -> Result<Self> {
info!("Initializing ChirpStack Integration backend");
let redis_client = if conf.redis.cluster {
info!("Setting up Redis Cluster client");
RedisClient::ClusterClient(
redis::cluster::ClusterClientBuilder::new(conf.redis.servers.clone()).build()?,
)
} else {
info!(server = %conf.redis.servers[0], "Setting up Redis client");
RedisClient::Client(redis::Client::open(conf.redis.servers[0].clone())?)
};
Ok(Integration {
redis_client,
key_prefix: conf.redis.key_prefix.clone(),
consumer_group: conf.redis.consumer_group.clone(),
consumer_name: conf.redis.consumer_name.clone(),
})
}
async fn start(&self) -> Result<()> {
info!("Getting Redis connection");
let mut redis_conn = self.redis_client.get_async_connection().await?;
let key = format!("{}device:stream:event", self.key_prefix);
// Try to create the consumer group. This will fail in case the consumer group already exists.
let _: usize = match redis::cmd("XGROUP")
.arg("CREATE")
.arg(&key)
.arg(&self.consumer_group)
.arg(0)
.arg("MKSTREAM")
.query_async(&mut redis_conn)
.await
{
Ok(v) => v,
Err(e) => {
warn!(error = %e, "Could not create Redis consumer group, ignore this error if the group already exists");
0
}
};
loop {
let srr: redis::streams::StreamReadReply = redis::cmd("XREADGROUP")
.arg("GROUP")
.arg(&self.consumer_group)
.arg(&self.consumer_name)
.arg("COUNT")
.arg(10)
.arg("BLOCK")
.arg(1000)
.arg("STREAMS")
.arg(&key)
.arg(">")
.query_async(&mut redis_conn)
.await?;
for stream_key in &srr.keys {
for stream_id in &stream_key.ids {
redis::cmd("XACK")
.arg(&key)
.arg(&self.consumer_group)
.arg(&stream_id.id)
.query_async(&mut redis_conn)
.await?;
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
info!(key = %k, "Event received from Redis stream");
match k.as_ref() {
"up" => {
if let redis::Value::Data(b) = v {
let pl = integration_pb::UplinkEvent::decode(
&mut Cursor::new(b),
)?;
tokio::spawn(uplink_event(pl));
}
}
"join" => {
if let redis::Value::Data(b) = v {
let pl =
integration_pb::JoinEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(join_event(pl));
}
}
"ack" => {
if let redis::Value::Data(b) = v {
let pl =
integration_pb::AckEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(ack_event(pl));
}
}
"txack" => {
if let redis::Value::Data(b) = v {
let pl = integration_pb::TxAckEvent::decode(
&mut Cursor::new(b),
)?;
tokio::spawn(txack_event(pl));
}
}
"status" => {
if let redis::Value::Data(b) = v {
let pl = integration_pb::StatusEvent::decode(
&mut Cursor::new(b),
)?;
tokio::spawn(status_event(pl));
}
}
"log" => {
if let redis::Value::Data(b) = v {
let pl =
integration_pb::LogEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(log_event(pl));
}
}
"location" => {
if let redis::Value::Data(b) = v {
let pl = integration_pb::LocationEvent::decode(
&mut Cursor::new(b),
)?;
tokio::spawn(location_event(pl));
}
}
"integration" => {
if let redis::Value::Data(b) = v {
let pl = integration_pb::IntegrationEvent::decode(
&mut Cursor::new(b),
)?;
tokio::spawn(integration_event(pl));
}
}
_ => {
error!(key = %k, "Unexpected event key");
}
}
Ok(())
}();
if let Err(e) = res {
error!(key = %k, error = %e, "Parsing event error");
}
}
}
}
}
}
}
pub fn setup_log(conf: &Configuration) -> Result<()> {
let filter = filter::LevelFilter::from_level(Level::from_str(&conf.logging.level).unwrap());
tracing_subscriber::registry()
.with(tracing_subscriber::fmt::layer())
.with(filter)
.init();
info!(
"Starting {} (version: {}, docs: {})",
env!("CARGO_PKG_DESCRIPTION"),
env!("CARGO_PKG_VERSION"),
env!("CARGO_PKG_HOMEPAGE")
);
Ok(())
}
pub async fn register(integration: Box<dyn IntegrationTrait + Sync + Send>) {
let mut int = INTEGRATION.write().await;
*int = Some(integration);
}
pub async fn start(conf: Configuration) -> Result<()> {
let int = Integration::new(&conf)?;
int.start().await
}
async fn uplink_event(pl: integration_pb::UplinkEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().uplink_event(&pl).await {
error!(error = %e, "Uplink event error");
}
}
async fn join_event(pl: integration_pb::JoinEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().join_event(&pl).await {
error!(error = %e, "Join event error");
}
}
async fn ack_event(pl: integration_pb::AckEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().ack_event(&pl).await {
error!(error = %e, "Ack event error");
}
}
async fn txack_event(pl: integration_pb::TxAckEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().txack_event(&pl).await {
error!(error = %e, "Tx ack event error");
}
}
async fn status_event(pl: integration_pb::StatusEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().status_event(&pl).await {
error!(error = %e, "Status event error");
}
}
async fn log_event(pl: integration_pb::LogEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().log_event(&pl).await {
error!(error = %e, "Log event error");
}
}
async fn location_event(pl: integration_pb::LocationEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().location_event(&pl).await {
error!(error = %e, "Location event error");
}
}
async fn integration_event(pl: integration_pb::IntegrationEvent) {
let integration = INTEGRATION.read().await;
if let Err(e) = integration.as_ref().unwrap().integration_event(&pl).await {
error!(error = %e, "Integration event error");
}
}
#[cfg(test)]
mod test {
use super::*;
use std::env;
use std::time::Duration;
use tokio::sync::RwLock;
use tokio::time::sleep;
lazy_static! {
static ref UPLINK_EVENTS: RwLock<Vec<integration_pb::UplinkEvent>> =
RwLock::new(Vec::new());
static ref JOIN_EVENTS: RwLock<Vec<integration_pb::JoinEvent>> = RwLock::new(Vec::new());
static ref ACK_EVENTS: RwLock<Vec<integration_pb::AckEvent>> = RwLock::new(Vec::new());
static ref TXACK_EVENTS: RwLock<Vec<integration_pb::TxAckEvent>> = RwLock::new(Vec::new());
static ref LOG_EVENTS: RwLock<Vec<integration_pb::LogEvent>> = RwLock::new(Vec::new());
static ref STATUS_EVENTS: RwLock<Vec<integration_pb::StatusEvent>> =
RwLock::new(Vec::new());
static ref LOCATION_EVENTS: RwLock<Vec<integration_pb::LocationEvent>> =
RwLock::new(Vec::new());
static ref INTEGRATION_EVENTS: RwLock<Vec<integration_pb::IntegrationEvent>> =
RwLock::new(Vec::new());
}
struct MockIntegration {}
#[async_trait]
impl IntegrationTrait for MockIntegration {
async fn uplink_event(&self, pl: &integration_pb::UplinkEvent) -> Result<()> {
UPLINK_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn join_event(&self, pl: &integration_pb::JoinEvent) -> Result<()> {
JOIN_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn ack_event(&self, pl: &integration_pb::AckEvent) -> Result<()> {
ACK_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn txack_event(&self, pl: &integration_pb::TxAckEvent) -> Result<()> {
TXACK_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn log_event(&self, pl: &integration_pb::LogEvent) -> Result<()> {
LOG_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn status_event(&self, pl: &integration_pb::StatusEvent) -> Result<()> {
STATUS_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn location_event(&self, pl: &integration_pb::LocationEvent) -> Result<()> {
LOCATION_EVENTS.write().await.push(pl.clone());
Ok(())
}
async fn integration_event(&self, pl: &integration_pb::IntegrationEvent) -> Result<()> {
INTEGRATION_EVENTS.write().await.push(pl.clone());
Ok(())
}
}
#[tokio::test]
async fn test_integration() {
let redis_url = env::var("TEST_REDIS_URL").unwrap_or("redis://127.0.0.1/1".to_string());
setup_log(&Configuration::default()).unwrap();
register(Box::new(MockIntegration {})).await;
let conf = Configuration {
redis: Redis {
servers: vec![redis_url.clone()],
consumer_group: "test_group".into(),
consumer_name: "test_consumer".into(),
..Default::default()
},
..Default::default()
};
tokio::spawn(start(conf));
sleep(Duration::from_millis(100)).await;
let redis_client = redis::Client::open(redis_url).unwrap();
let mut redis_conn = redis_client.get_async_connection().await.unwrap();
println!("Uplink");
// uplink
let pl = integration_pb::UplinkEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("up")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = UPLINK_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::UplinkEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Join");
// join
let pl = integration_pb::JoinEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("join")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = JOIN_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::JoinEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Ack");
// ack
let pl = integration_pb::AckEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("ack")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = ACK_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::AckEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("TxAck");
// txack
let pl = integration_pb::TxAckEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("txack")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = TXACK_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::TxAckEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Log");
// log
let pl = integration_pb::LogEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("log")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = LOG_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::LogEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Status");
// status
let pl = integration_pb::StatusEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("status")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = STATUS_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::StatusEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Location");
// location
let pl = integration_pb::LocationEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("location")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = LOCATION_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::LocationEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
println!("Integration");
// integration
let pl = integration_pb::IntegrationEvent::default();
let _: String = redis::cmd("XADD")
.arg("device:stream:event")
.arg("MAXLEN")
.arg(1)
.arg("*")
.arg("integration")
.arg(pl.encode_to_vec())
.query_async(&mut redis_conn)
.await
.unwrap();
sleep(Duration::from_millis(100)).await;
let pl_recv = INTEGRATION_EVENTS
.write()
.await
.drain(0..1)
.collect::<Vec<integration_pb::IntegrationEvent>>()
.first()
.cloned()
.unwrap();
assert_eq!(pl, pl_recv);
}
}