Add Kafka integration.

This commit is contained in:
Orne Brocaar 2022-07-12 09:37:15 +01:00
parent 8350494846
commit 4131ed38ec
7 changed files with 452 additions and 1 deletions

61
Cargo.lock generated
View File

@ -937,6 +937,7 @@ dependencies = [
"r2d2",
"rand",
"rand_core",
"rdkafka",
"redis",
"regex",
"reqwest",
@ -2455,6 +2456,27 @@ dependencies = [
"libc",
]
[[package]]
name = "num_enum"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf5395665662ef45796a4ff5486c5d41d29e0c09640af4c5f17fd94ee2c119c9"
dependencies = [
"num_enum_derive",
]
[[package]]
name = "num_enum_derive"
version = "0.5.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b0498641e53dd6ac1a4f22547548caa6864cc4933784319cd1775271c5a46ce"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "num_threads"
version = "0.1.3"
@ -2943,6 +2965,16 @@ dependencies = [
"syn",
]
[[package]]
name = "proc-macro-crate"
version = "1.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e17d47ce914bf4de440332250b0edd23ce48c005f59fab39d3335866b114f11a"
dependencies = [
"thiserror",
"toml",
]
[[package]]
name = "proc-macro2"
version = "1.0.36"
@ -3120,6 +3152,35 @@ dependencies = [
"cipher 0.4.3",
]
[[package]]
name = "rdkafka"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de127f294f2dba488ed46760b129d5ecbeabbd337ccbf3739cb29d50db2161c"
dependencies = [
"futures",
"libc",
"log",
"rdkafka-sys",
"serde",
"serde_derive",
"serde_json",
"slab",
"tokio",
]
[[package]]
name = "rdkafka-sys"
version = "4.2.0+1.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e542c6863b04ce0fa0c5719bc6b7b348cf8dd21af1bb03c9db5f9805b2a6473"
dependencies = [
"libc",
"libz-sys",
"num_enum",
"pkg-config",
]
[[package]]
name = "reactor-trait"
version = "1.1.0"

View File

@ -54,6 +54,7 @@ gcp_auth = "0.7.2"
lapin = "2.1.1"
tokio-executor-trait = "2.1.0"
tokio-reactor-trait = "1.1.0"
rdkafka = "0.28.0"
# gRPC and Protobuf
tonic = "0.7"

View File

@ -323,7 +323,8 @@ pub fn run() {
# pool (0 = equal to max_open_connections).
min_idle_connections={{ integration.postgresql.min_idle_connections }}
# AMQP / RabbitMQ.
# AMQP / RabbitMQ integration configuration.
[integration.amqp]
# Server URL.
@ -342,6 +343,50 @@ pub fn run() {
json={{ integration.amqp.json }}
# Kafka integration configuration.
[integration.kafka]
# Brokers.
brokers=[
{{#each integration.kafka.brokers}}
"{{this}}",
{{/each}}
]
# TLS.
#
# Set this to true when the Kafka client must connect using TLS to the Broker.
tls={{ integration.kafka.tls }}
# Topic for events.
topic="{{ integration.kafka.topic }}"
# Template for keys included in Kafka messages.
# Kafka uses the key for distributing messages over partitions. You can use
# this to ensure some subset of messages end up in the same partition, so
# they can be consumed in-order. And Kafka can use the key for data retention
# decisions. A header "event" with the event type is included in each
# message. There is no need to parse it from the key.
event_key="{{ integration.kafka.event_key }}"
# Username (optional).
username="{{ integration.kafka.username }}"
# Password.
password="{{ integration.kafka.password }}"
# Mechanism.
#
# Valid options are:
# * PLAIN
# * SCRAM-SHA-256
# * SCRAM-SHA-512
mechanism="{{ integration.kafka.mechanism }}"
# Use JSON encoding instead of Protobuf (binary).
json={{ integration.kafka.json }}
# Codec configuration.
[codec]

View File

@ -244,6 +244,7 @@ pub struct Integration {
pub mqtt: MqttIntegration,
pub postgresql: PostgresqlIntegration,
pub amqp: AmqpIntegration,
pub kafka: KafkaIntegration,
}
#[derive(Serialize, Deserialize, Clone)]
@ -341,6 +342,35 @@ impl Default for AmqpIntegration {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct KafkaIntegration {
pub brokers: Vec<String>,
pub tls: bool,
pub topic: String,
pub event_key: String,
pub username: String,
pub password: String,
pub mechanism: String,
pub json: bool,
}
impl Default for KafkaIntegration {
fn default() -> Self {
KafkaIntegration {
brokers: vec!["localhost:9092".to_string()],
tls: false,
topic: "chirpstack".to_string(),
event_key: "application.{{application_id}}.device.{{dev_eui}}.event.{{event}}"
.to_string(),
username: "".to_string(),
password: "".to_string(),
mechanism: "PLAIN".to_string(),
json: true,
}
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
#[serde(default)]
pub struct Codec {

View File

@ -0,0 +1,296 @@
use std::collections::HashMap;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use handlebars::Handlebars;
use prost::Message;
use rdkafka::config::ClientConfig;
use rdkafka::message::OwnedHeaders;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde::Serialize;
use tracing::{error, info};
use super::Integration as IntegrationTrait;
use crate::config::KafkaIntegration as Config;
use chirpstack_api::integration;
pub struct Integration<'a> {
templates: Handlebars<'a>,
topic: String,
json: bool,
producer: FutureProducer,
}
#[derive(Serialize)]
struct EventKeyContext {
pub application_id: String,
pub dev_eui: String,
pub event: String,
}
impl<'a> Integration<'a> {
pub fn new(conf: &Config) -> Result<Integration<'a>> {
info!("Initializing Kafka integration");
// event-key template.
let mut templates = Handlebars::new();
templates.register_escape_fn(handlebars::no_escape);
templates.register_template_string("event_key", &conf.event_key)?;
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &conf.brokers.join(","))
.set("message.timeout.ms", "5000")
.set("allow.auto.create.topics", "true")
.set(
"sasl.mechanism",
match conf.mechanism.as_ref() {
"PLAIN" => "PLAIN",
"SCRAM-SHA-256" => "SCRAM-SHA-256",
"SCRAM-SHA-512" => "SCRAM-SHA-512",
_ => {
return Err(anyhow!(
"mechanism must be PLAIN, SCRAM-SHA-256 or SCRAM-SHA-512"
));
}
},
)
.set("sasl.username", &conf.username)
.set("sasl.password", &conf.password)
.create()?;
let i = Integration {
templates,
producer,
json: conf.json,
topic: conf.topic.clone(),
};
Ok(i)
}
async fn publish_event(&self, event: &str, event_key: String, b: &[u8]) -> Result<()> {
info!(topic = %self.topic, event_key = %event_key, "Publishing event");
let res = self
.producer
.send(
FutureRecord::to(&self.topic)
.key(&event_key)
.headers(OwnedHeaders::new().add("event", event))
.payload(b),
Duration::from_secs(0),
)
.await;
if let Err(e) = res {
error!(error = ?e, "Publishing event error");
return Err(anyhow!("{:?}", e));
}
Ok(())
}
fn get_event_key(&self, application_id: &str, dev_eui: &str, event: &str) -> Result<String> {
Ok(self.templates.render(
"event_key",
&EventKeyContext {
application_id: application_id.to_string(),
dev_eui: dev_eui.to_string(),
event: event.to_string(),
},
)?)
}
}
#[async_trait]
impl<'a> IntegrationTrait for Integration<'a> {
async fn uplink_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::UplinkEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "up")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("up", key, &b).await
}
async fn join_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::JoinEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "join")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("join", key, &b).await
}
async fn ack_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::AckEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "ack")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("ack", key, &b).await
}
async fn txack_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::TxAckEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "txack")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("txack", key, &b).await
}
async fn log_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::LogEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "log")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("log", key, &b).await
}
async fn status_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::StatusEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "status")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("status", key, &b).await
}
async fn location_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::LocationEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "location")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("location", key, &b).await
}
async fn integration_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::IntegrationEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_event_key(&di.application_id, &di.dev_eui, "integration")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event("integration", key, &b).await
}
}
#[cfg(test)]
pub mod test {
use super::*;
use crate::test;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::Headers;
use rdkafka::Message;
use std::time::Duration;
use tokio::time::sleep;
use tracing::trace;
use uuid::Uuid;
#[tokio::test]
async fn test_kafka() {
let _guard = test::prepare().await;
let conf = Config {
brokers: vec!["kafka:9092".to_string()],
topic: "chirpstack".to_string(),
json: true,
..Default::default()
};
let consumer: StreamConsumer = loop {
match ClientConfig::new()
.set("group.id", "testgroup")
.set("bootstrap.servers", "kafka:9092")
.set("allow.auto.create.topics", "true")
.set("auto.offset.reset", "beginning")
.create()
{
Ok(v) => {
break v;
}
Err(e) => {
error!("Kafka connect error: {:?}", e);
sleep(Duration::from_secs(1)).await;
}
}
};
trace!("Consumer created");
consumer.subscribe(&["chirpstack"]).unwrap();
trace!("Subscription created");
let i = Integration::new(&conf).unwrap();
trace!("Integration created");
let pl = integration::UplinkEvent {
device_info: Some(integration::DeviceInfo {
application_id: Uuid::nil().to_string(),
dev_eui: "0102030405060708".to_string(),
..Default::default()
}),
..Default::default()
};
i.uplink_event(&HashMap::new(), &pl).await.unwrap();
trace!("Event published");
let msg = consumer.recv().await.unwrap();
trace!("Event received");
assert_eq!(
"application.00000000-0000-0000-0000-000000000000.device.0102030405060708.event.up"
.as_bytes(),
msg.key().unwrap()
);
assert_eq!(serde_json::to_vec(&pl).unwrap(), msg.payload().unwrap());
assert_eq!(
("event", "up".as_bytes()),
msg.headers().unwrap().get(0).unwrap()
);
}
}

View File

@ -20,6 +20,7 @@ mod gcp_pub_sub;
mod http;
mod ifttt;
mod influxdb;
mod kafka;
mod loracloud;
#[cfg(test)]
pub mod mock;

View File

@ -9,6 +9,7 @@ services:
- redis
- mosquitto
- rabbitmq
- kafka
environment:
- DATABASE_URL=postgres://chirpstack_test:chirpstack_test@postgres/chirpstack_test?sslmode=disable
- REDIS_HOST=redis
@ -58,3 +59,19 @@ services:
image: rabbitmq:3-management-alpine
ports:
- "15672:15672"
zookeeper:
image: 'bitnami/zookeeper:3'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:3'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://kafka:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper