From 4131ed38ec627379aa7ca37bf70fa2dd6707a30c Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Tue, 12 Jul 2022 09:37:15 +0100 Subject: [PATCH] Add Kafka integration. --- Cargo.lock | 61 ++++++ chirpstack/Cargo.toml | 1 + chirpstack/src/cmd/configfile.rs | 47 ++++- chirpstack/src/config.rs | 30 +++ chirpstack/src/integration/kafka.rs | 296 ++++++++++++++++++++++++++++ chirpstack/src/integration/mod.rs | 1 + docker-compose.yml | 17 ++ 7 files changed, 452 insertions(+), 1 deletion(-) create mode 100644 chirpstack/src/integration/kafka.rs diff --git a/Cargo.lock b/Cargo.lock index 905e02ad..82b39dd7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -937,6 +937,7 @@ dependencies = [ "r2d2", "rand", "rand_core", + "rdkafka", "redis", "regex", "reqwest", @@ -2455,6 +2456,27 @@ dependencies = [ "libc", ] +[[package]] +name = "num_enum" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9" +dependencies = [ + "num_enum_derive", +] + +[[package]] +name = "num_enum_derive" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce" +dependencies = [ + "proc-macro-crate", + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "num_threads" version = "0.1.3" @@ -2943,6 +2965,16 @@ dependencies = [ "syn", ] +[[package]] +name = "proc-macro-crate" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a" +dependencies = [ + "thiserror", + "toml", +] + [[package]] name = "proc-macro2" version = "1.0.36" @@ -3120,6 +3152,35 @@ dependencies = [ "cipher 0.4.3", ] +[[package]] +name = "rdkafka" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c" +dependencies = [ + "futures", + "libc", + "log", + "rdkafka-sys", + "serde", + "serde_derive", + "serde_json", + "slab", + "tokio", +] + +[[package]] +name = "rdkafka-sys" +version = "4.2.0+1.8.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473" +dependencies = [ + "libc", + "libz-sys", + "num_enum", + "pkg-config", +] + [[package]] name = "reactor-trait" version = "1.1.0" diff --git a/chirpstack/Cargo.toml b/chirpstack/Cargo.toml index 831b4841..fb939142 100644 --- a/chirpstack/Cargo.toml +++ b/chirpstack/Cargo.toml @@ -54,6 +54,7 @@ gcp_auth = "0.7.2" lapin = "2.1.1" tokio-executor-trait = "2.1.0" tokio-reactor-trait = "1.1.0" +rdkafka = "0.28.0" # gRPC and Protobuf tonic = "0.7" diff --git a/chirpstack/src/cmd/configfile.rs b/chirpstack/src/cmd/configfile.rs index 28bdd6a6..90a4cb01 100644 --- a/chirpstack/src/cmd/configfile.rs +++ b/chirpstack/src/cmd/configfile.rs @@ -323,7 +323,8 @@ pub fn run() { # pool (0 = equal to max_open_connections). min_idle_connections={{ integration.postgresql.min_idle_connections }} - # AMQP / RabbitMQ. + + # AMQP / RabbitMQ integration configuration. [integration.amqp] # Server URL. @@ -342,6 +343,50 @@ pub fn run() { json={{ integration.amqp.json }} + # Kafka integration configuration. + [integration.kafka] + + # Brokers. + brokers=[ + {{#each integration.kafka.brokers}} + "{{this}}", + {{/each}} + ] + + # TLS. + # + # Set this to true when the Kafka client must connect using TLS to the Broker. + tls={{ integration.kafka.tls }} + + # Topic for events. + topic="{{ integration.kafka.topic }}" + + # Template for keys included in Kafka messages. + # Kafka uses the key for distributing messages over partitions. You can use + # this to ensure some subset of messages end up in the same partition, so + # they can be consumed in-order. And Kafka can use the key for data retention + # decisions. A header "event" with the event type is included in each + # message. There is no need to parse it from the key. + event_key="{{ integration.kafka.event_key }}" + + # Username (optional). + username="{{ integration.kafka.username }}" + + # Password. + password="{{ integration.kafka.password }}" + + # Mechanism. + # + # Valid options are: + # * PLAIN + # * SCRAM-SHA-256 + # * SCRAM-SHA-512 + mechanism="{{ integration.kafka.mechanism }}" + + # Use JSON encoding instead of Protobuf (binary). + json={{ integration.kafka.json }} + + # Codec configuration. [codec] diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 96c0bbb9..85640beb 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -244,6 +244,7 @@ pub struct Integration { pub mqtt: MqttIntegration, pub postgresql: PostgresqlIntegration, pub amqp: AmqpIntegration, + pub kafka: KafkaIntegration, } #[derive(Serialize, Deserialize, Clone)] @@ -341,6 +342,35 @@ impl Default for AmqpIntegration { } } +#[derive(Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct KafkaIntegration { + pub brokers: Vec, + pub tls: bool, + pub topic: String, + pub event_key: String, + pub username: String, + pub password: String, + pub mechanism: String, + pub json: bool, +} + +impl Default for KafkaIntegration { + fn default() -> Self { + KafkaIntegration { + brokers: vec!["localhost:9092".to_string()], + tls: false, + topic: "chirpstack".to_string(), + event_key: "application.{{application_id}}.device.{{dev_eui}}.event.{{event}}" + .to_string(), + username: "".to_string(), + password: "".to_string(), + mechanism: "PLAIN".to_string(), + json: true, + } + } +} + #[derive(Serialize, Deserialize, Clone, Default)] #[serde(default)] pub struct Codec { diff --git a/chirpstack/src/integration/kafka.rs b/chirpstack/src/integration/kafka.rs new file mode 100644 index 00000000..db367b8d --- /dev/null +++ b/chirpstack/src/integration/kafka.rs @@ -0,0 +1,296 @@ +use std::collections::HashMap; +use std::time::Duration; + +use anyhow::Result; +use async_trait::async_trait; +use handlebars::Handlebars; +use prost::Message; +use rdkafka::config::ClientConfig; +use rdkafka::message::OwnedHeaders; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use serde::Serialize; +use tracing::{error, info}; + +use super::Integration as IntegrationTrait; +use crate::config::KafkaIntegration as Config; +use chirpstack_api::integration; + +pub struct Integration<'a> { + templates: Handlebars<'a>, + topic: String, + json: bool, + producer: FutureProducer, +} + +#[derive(Serialize)] +struct EventKeyContext { + pub application_id: String, + pub dev_eui: String, + pub event: String, +} + +impl<'a> Integration<'a> { + pub fn new(conf: &Config) -> Result> { + info!("Initializing Kafka integration"); + + // event-key template. + let mut templates = Handlebars::new(); + templates.register_escape_fn(handlebars::no_escape); + templates.register_template_string("event_key", &conf.event_key)?; + + let producer: FutureProducer = ClientConfig::new() + .set("bootstrap.servers", &conf.brokers.join(",")) + .set("message.timeout.ms", "5000") + .set("allow.auto.create.topics", "true") + .set( + "sasl.mechanism", + match conf.mechanism.as_ref() { + "PLAIN" => "PLAIN", + "SCRAM-SHA-256" => "SCRAM-SHA-256", + "SCRAM-SHA-512" => "SCRAM-SHA-512", + _ => { + return Err(anyhow!( + "mechanism must be PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512" + )); + } + }, + ) + .set("sasl.username", &conf.username) + .set("sasl.password", &conf.password) + .create()?; + + let i = Integration { + templates, + producer, + json: conf.json, + topic: conf.topic.clone(), + }; + + Ok(i) + } + + async fn publish_event(&self, event: &str, event_key: String, b: &[u8]) -> Result<()> { + info!(topic = %self.topic, event_key = %event_key, "Publishing event"); + + let res = self + .producer + .send( + FutureRecord::to(&self.topic) + .key(&event_key) + .headers(OwnedHeaders::new().add("event", event)) + .payload(b), + Duration::from_secs(0), + ) + .await; + + if let Err(e) = res { + error!(error = ?e, "Publishing event error"); + return Err(anyhow!("{:?}", e)); + } + + Ok(()) + } + + fn get_event_key(&self, application_id: &str, dev_eui: &str, event: &str) -> Result { + Ok(self.templates.render( + "event_key", + &EventKeyContext { + application_id: application_id.to_string(), + dev_eui: dev_eui.to_string(), + event: event.to_string(), + }, + )?) + } +} + +#[async_trait] +impl<'a> IntegrationTrait for Integration<'a> { + async fn uplink_event( + &self, + _vars: &HashMap, + pl: &integration::UplinkEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "up")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("up", key, &b).await + } + + async fn join_event( + &self, + _vars: &HashMap, + pl: &integration::JoinEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "join")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("join", key, &b).await + } + + async fn ack_event( + &self, + _vars: &HashMap, + pl: &integration::AckEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "ack")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("ack", key, &b).await + } + + async fn txack_event( + &self, + _vars: &HashMap, + pl: &integration::TxAckEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "txack")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("txack", key, &b).await + } + + async fn log_event( + &self, + _vars: &HashMap, + pl: &integration::LogEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "log")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("log", key, &b).await + } + + async fn status_event( + &self, + _vars: &HashMap, + pl: &integration::StatusEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "status")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("status", key, &b).await + } + + async fn location_event( + &self, + _vars: &HashMap, + pl: &integration::LocationEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "location")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("location", key, &b).await + } + + async fn integration_event( + &self, + _vars: &HashMap, + pl: &integration::IntegrationEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let key = self.get_event_key(&di.application_id, &di.dev_eui, "integration")?; + let b = match self.json { + true => serde_json::to_vec(&pl)?, + false => pl.encode_to_vec(), + }; + self.publish_event("integration", key, &b).await + } +} + +#[cfg(test)] +pub mod test { + use super::*; + use crate::test; + use rdkafka::consumer::stream_consumer::StreamConsumer; + use rdkafka::consumer::Consumer; + use rdkafka::message::Headers; + use rdkafka::Message; + use std::time::Duration; + use tokio::time::sleep; + use tracing::trace; + + use uuid::Uuid; + + #[tokio::test] + async fn test_kafka() { + let _guard = test::prepare().await; + + let conf = Config { + brokers: vec!["kafka:9092".to_string()], + topic: "chirpstack".to_string(), + json: true, + ..Default::default() + }; + + let consumer: StreamConsumer = loop { + match ClientConfig::new() + .set("group.id", "testgroup") + .set("bootstrap.servers", "kafka:9092") + .set("allow.auto.create.topics", "true") + .set("auto.offset.reset", "beginning") + .create() + { + Ok(v) => { + break v; + } + Err(e) => { + error!("Kafka connect error: {:?}", e); + sleep(Duration::from_secs(1)).await; + } + } + }; + trace!("Consumer created"); + + consumer.subscribe(&["chirpstack"]).unwrap(); + trace!("Subscription created"); + + let i = Integration::new(&conf).unwrap(); + trace!("Integration created"); + + let pl = integration::UplinkEvent { + device_info: Some(integration::DeviceInfo { + application_id: Uuid::nil().to_string(), + dev_eui: "0102030405060708".to_string(), + ..Default::default() + }), + ..Default::default() + }; + i.uplink_event(&HashMap::new(), &pl).await.unwrap(); + trace!("Event published"); + + let msg = consumer.recv().await.unwrap(); + trace!("Event received"); + + assert_eq!( + "application.00000000-0000-0000-0000-000000000000.device.0102030405060708.event.up" + .as_bytes(), + msg.key().unwrap() + ); + assert_eq!(serde_json::to_vec(&pl).unwrap(), msg.payload().unwrap()); + assert_eq!( + ("event", "up".as_bytes()), + msg.headers().unwrap().get(0).unwrap() + ); + } +} diff --git a/chirpstack/src/integration/mod.rs b/chirpstack/src/integration/mod.rs index 1ed0ac84..94cb96d7 100644 --- a/chirpstack/src/integration/mod.rs +++ b/chirpstack/src/integration/mod.rs @@ -20,6 +20,7 @@ mod gcp_pub_sub; mod http; mod ifttt; mod influxdb; +mod kafka; mod loracloud; #[cfg(test)] pub mod mock; diff --git a/docker-compose.yml b/docker-compose.yml index 5743d277..70decf74 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -9,6 +9,7 @@ services: - redis - mosquitto - rabbitmq + - kafka environment: - DATABASE_URL=postgres://chirpstack_test:chirpstack_test@postgres/chirpstack_test?sslmode=disable - REDIS_HOST=redis @@ -58,3 +59,19 @@ services: image: rabbitmq:3-management-alpine ports: - "15672:15672" + + zookeeper: + image: 'bitnami/zookeeper:3' + environment: + - ALLOW_ANONYMOUS_LOGIN=yes + + kafka: + image: 'bitnami/kafka:3' + environment: + - KAFKA_BROKER_ID=1 + - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092 + - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092 + - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181 + - ALLOW_PLAINTEXT_LISTENER=yes + depends_on: + - zookeeper