Add global AMQP integration.

This commit is contained in:
Orne Brocaar 2022-07-07 10:16:55 +01:00
parent 96ab49944c
commit 0d4003a2be
8 changed files with 757 additions and 12 deletions

379
Cargo.lock generated
View File

@ -43,6 +43,54 @@ dependencies = [
"memchr",
]
[[package]]
name = "amq-protocol"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acc7cad07d1b4533fcb46f0819a6126fa201fd0385469aba75e405424f3fe009"
dependencies = [
"amq-protocol-tcp",
"amq-protocol-types",
"amq-protocol-uri",
"cookie-factory",
"nom",
"serde",
]
[[package]]
name = "amq-protocol-tcp"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5d8b20aba8c35a0b885e1e978eff456ced925730a4e012e63e4ff89a1deb602b"
dependencies = [
"amq-protocol-uri",
"tcp-stream",
"tracing",
]
[[package]]
name = "amq-protocol-types"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e245e0e9083b6a6db5f8c10013074cb382266eb9e2a37204d19c651b8d3b8114"
dependencies = [
"cookie-factory",
"nom",
"serde",
"serde_json",
]
[[package]]
name = "amq-protocol-uri"
version = "7.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "56987108bf48d2eb500cae8896cd9291564eedd8744776ecc5c3338a8b2ca5f8"
dependencies = [
"amq-protocol-types",
"percent-encoding",
"url",
]
[[package]]
name = "ansi_term"
version = "0.12.1"
@ -118,6 +166,17 @@ dependencies = [
"once_cell",
]
[[package]]
name = "async-global-executor-trait"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33dd14c5a15affd2abcff50d84efd4009ada28a860f01c14f9d654f3e81b3f75"
dependencies = [
"async-global-executor",
"async-trait",
"executor-trait",
]
[[package]]
name = "async-io"
version = "1.6.0"
@ -181,6 +240,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "async-reactor-trait"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a6012d170ad00de56c9ee354aef2e358359deb1ec504254e0e5a3774771de0e"
dependencies = [
"async-io",
"async-trait",
"futures-core",
"reactor-trait",
]
[[package]]
name = "async-recursion"
version = "1.0.0"
@ -277,9 +348,9 @@ dependencies = [
[[package]]
name = "autocfg"
version = "1.0.1"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "aws-config"
@ -674,7 +745,7 @@ version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c0940dc441f31689269e10ac70eb1002a3a1d3ad1390e030043662eb7fe4688b"
dependencies = [
"block-padding",
"block-padding 0.1.5",
"byte-tools",
"byteorder",
"generic-array 0.12.4",
@ -707,6 +778,15 @@ dependencies = [
"byte-tools",
]
[[package]]
name = "block-padding"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a90ec2df9600c28a01c56c4784c9207a96d2451833aeceb8cc97e4c9548bb78"
dependencies = [
"generic-array 0.14.5",
]
[[package]]
name = "blocking"
version = "1.1.0"
@ -777,6 +857,15 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2698f953def977c68f935bb0dfa959375ad4638570e969e2f1e9f433cbf1af6"
[[package]]
name = "cbc"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b52a9543ae338f279b96b0b9fed9c8093744685043739079ce85cd58f289a6"
dependencies = [
"cipher 0.4.3",
]
[[package]]
name = "cc"
version = "1.0.72"
@ -830,6 +919,7 @@ dependencies = [
"humantime-serde",
"hyper",
"jsonwebtoken",
"lapin",
"lazy_static",
"lrwn",
"mime_guess",
@ -858,6 +948,8 @@ dependencies = [
"sha2 0.10.1",
"thiserror",
"tokio",
"tokio-executor-trait",
"tokio-reactor-trait",
"tokio-stream",
"toml",
"tonic",
@ -1011,6 +1103,12 @@ dependencies = [
"cache-padded",
]
[[package]]
name = "cookie-factory"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b"
[[package]]
name = "core-foundation"
version = "0.9.2"
@ -1138,6 +1236,15 @@ dependencies = [
"generic-array 0.14.5",
]
[[package]]
name = "des"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffdd80ce8ce993de27e9f063a444a4d53ce8e8db4c1f00cc03af5ad5a9867a1e"
dependencies = [
"cipher 0.4.3",
]
[[package]]
name = "diesel"
version = "1.4.8"
@ -1236,6 +1343,12 @@ dependencies = [
"winapi",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "dtoa"
version = "0.4.8"
@ -1291,6 +1404,15 @@ version = "2.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77f3309417938f28bf8228fcff79a4a37103981e3e186d2ccd19c74b38f4eb71"
[[package]]
name = "executor-trait"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a1052dd43212a7777ec6a69b117da52f5e52f07aec47d00c1a2b33b85d06b08"
dependencies = [
"async-trait",
]
[[package]]
name = "fake-simd"
version = "0.1.2"
@ -1318,6 +1440,18 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "279fb028e20b3c4c320317955b77c5e0c9701f05a1d309905d6fc702cdc5053e"
[[package]]
name = "flume"
version = "0.10.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ceeb589a3157cac0ab8cc585feb749bd2cea5cb55a6ee802ad72d9fd38303da"
dependencies = [
"futures-core",
"futures-sink",
"pin-project",
"spin 0.9.3",
]
[[package]]
name = "fnv"
version = "1.0.7"
@ -1849,6 +1983,7 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1f03d4ab4d5dc9ec2d219f86c15d2a15fc08239d1cd3b2d6a19717c0a2f443"
dependencies = [
"block-padding 0.3.2",
"generic-array 0.14.5",
]
@ -1979,6 +2114,28 @@ dependencies = [
"regex",
]
[[package]]
name = "lapin"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bd03ea5831b44775e296239a64851e2fd14a80a363d202ba147009ffc994ff0f"
dependencies = [
"amq-protocol",
"async-global-executor-trait",
"async-reactor-trait",
"async-trait",
"executor-trait",
"flume",
"futures-core",
"futures-io",
"parking_lot 0.12.1",
"pinky-swear",
"reactor-trait",
"serde",
"tracing",
"waker-fn",
]
[[package]]
name = "lazy_static"
version = "1.4.0"
@ -2049,10 +2206,11 @@ checksum = "7fb9b38af92608140b86b693604b9ffcc5824240a484d1ecd4795bacb2fe88f3"
[[package]]
name = "lock_api"
version = "0.4.5"
version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712a4d093c9976e24e7dbca41db895dabcbac38eb5f4045393d17a95bdfb1109"
checksum = "327fa5b6a6940e4699ec49a9beae1ea4845c6bab9314e4f84ac68742139d8c53"
dependencies = [
"autocfg",
"scopeguard",
]
@ -2435,6 +2593,23 @@ dependencies = [
"stable_deref_trait",
]
[[package]]
name = "p12"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4873306de53fe82e7e484df31e1e947d61514b6ea2ed6cd7b45d63006fd9224"
dependencies = [
"cbc",
"cipher 0.4.3",
"des",
"getrandom",
"hmac 0.12.0",
"lazy_static",
"rc2",
"sha1 0.10.0",
"yasna",
]
[[package]]
name = "paho-mqtt"
version = "0.9.1"
@ -2473,7 +2648,17 @@ checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core",
"parking_lot_core 0.8.5",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core 0.9.3",
]
[[package]]
@ -2490,6 +2675,19 @@ dependencies = [
"winapi",
]
[[package]]
name = "parking_lot_core"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a279cbf25cb0757810394fbc1e359949b59e348145c643a939a525692e6929"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"smallvec",
"windows-sys",
]
[[package]]
name = "password-hash"
version = "0.2.3"
@ -2682,6 +2880,18 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]]
name = "pinky-swear"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d894b67aa7a4bf295db5e85349078c604edaa6fa5c8721e8eca3c7729a27f2ac"
dependencies = [
"doc-comment",
"flume",
"parking_lot 0.12.1",
"tracing",
]
[[package]]
name = "pkg-config"
version = "0.3.24"
@ -2857,7 +3067,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "545c5bc2b880973c9c10e4067418407a0ccaa3091781d1671d46eb35107cb26f"
dependencies = [
"log",
"parking_lot",
"parking_lot 0.11.2",
"scheduled-thread-pool",
]
@ -2901,6 +3111,26 @@ dependencies = [
"rand_core",
]
[[package]]
name = "rc2"
version = "0.8.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62c64daa8e9438b84aaae55010a93f396f8e60e3911590fcba770d04643fc1dd"
dependencies = [
"cipher 0.4.3",
]
[[package]]
name = "reactor-trait"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "438a4293e4d097556730f4711998189416232f009c137389e0f961d2bc0ddc58"
dependencies = [
"async-trait",
"futures-core",
"futures-io",
]
[[package]]
name = "redis"
version = "0.21.5"
@ -2915,7 +3145,7 @@ dependencies = [
"percent-encoding",
"r2d2",
"rand",
"sha1",
"sha1 0.6.1",
"url",
]
@ -3029,7 +3259,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -3139,6 +3369,18 @@ dependencies = [
"webpki 0.22.0",
]
[[package]]
name = "rustls-connector"
version = "0.16.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c6a18f8d10f71bce9bca6eaeb80429460e652f3bcf0381f0c5f8954abf7b3b8"
dependencies = [
"log",
"rustls 0.20.2",
"rustls-native-certs 0.6.1",
"webpki 0.22.0",
]
[[package]]
name = "rustls-native-certs"
version = "0.5.0"
@ -3181,6 +3423,15 @@ dependencies = [
"base64",
]
[[package]]
name = "rustls-pemfile"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7522c9de787ff061458fe9a829dc790a3f5b22dc571694fc5883f448b94d9a9"
dependencies = [
"base64",
]
[[package]]
name = "rustversion"
version = "1.0.6"
@ -3224,7 +3475,7 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc6f74fd1204073fa02d5d5d68bec8021be4c38690b61264b2fdb48083d0e7d7"
dependencies = [
"parking_lot",
"parking_lot 0.11.2",
]
[[package]]
@ -3406,6 +3657,17 @@ dependencies = [
"sha1_smol",
]
[[package]]
name = "sha1"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cc229fb94bcb689ffc39bd4ded842f6ff76885efede7c6d1ffb62582878bea"
dependencies = [
"cfg-if",
"cpufeatures",
"digest 0.10.1",
]
[[package]]
name = "sha1_smol"
version = "1.0.0"
@ -3533,6 +3795,15 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c530c2b0d0bf8b69304b39fe2001993e267461948b890cd037d8ad4293fa1a0d"
dependencies = [
"lock_api",
]
[[package]]
name = "stable_deref_trait"
version = "1.2.0"
@ -3547,7 +3818,7 @@ checksum = "923f0f39b6267d37d23ce71ae7235602134b250ace715dd2c90421998ddac0c6"
dependencies = [
"lazy_static",
"new_debug_unreachable",
"parking_lot",
"parking_lot 0.11.2",
"phf_shared",
"precomputed-hash",
]
@ -3587,6 +3858,18 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "20518fe4a4c9acf048008599e464deb21beeae3d3578418951a189c235a7a9a8"
[[package]]
name = "tcp-stream"
version = "0.24.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09a4b0a70bac0a58ca6a7659d1328e34ee462339c70b0fa49f72bad1f278910a"
dependencies = [
"cfg-if",
"p12",
"rustls-connector",
"rustls-pemfile 1.0.0",
]
[[package]]
name = "tempfile"
version = "3.3.0"
@ -3738,6 +4021,17 @@ dependencies = [
"winapi",
]
[[package]]
name = "tokio-executor-trait"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b78630814cbcdf383c949bd8f17c3cfe3142e93b0d5801bdcae4b2a5275db7e"
dependencies = [
"async-trait",
"executor-trait",
"tokio",
]
[[package]]
name = "tokio-io-timeout"
version = "1.2.0"
@ -3769,6 +4063,20 @@ dependencies = [
"tokio",
]
[[package]]
name = "tokio-reactor-trait"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9481a72f36bd9cbb8d6dd349227c4783e234e4332cfe806225bc929c4b92486"
dependencies = [
"async-trait",
"futures-core",
"futures-io",
"reactor-trait",
"tokio",
"tokio-stream",
]
[[package]]
name = "tokio-rustls"
version = "0.22.0"
@ -4490,6 +4798,49 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-sys"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [
"windows_aarch64_msvc",
"windows_i686_gnu",
"windows_i686_msvc",
"windows_x86_64_gnu",
"windows_x86_64_msvc",
]
[[package]]
name = "windows_aarch64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]]
name = "windows_i686_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]]
name = "windows_i686_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]]
name = "windows_x86_64_gnu"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_msvc"
version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "winreg"
version = "0.7.0"
@ -4514,6 +4865,12 @@ dependencies = [
"linked-hash-map",
]
[[package]]
name = "yasna"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "346d34a236c9d3e5f3b9b74563f238f955bbd05fa0b8b4efa53c130c43982f4c"
[[package]]
name = "zeroize"
version = "1.5.1"

View File

@ -51,6 +51,9 @@ sha2 = "0.10"
urlencoding = "2.1"
geohash = "0.12"
gcp_auth = "0.7.2"
lapin = "2.1.1"
tokio-executor-trait = "2.1.0"
tokio-reactor-trait = "1.1.0"
# gRPC and Protobuf
tonic = "0.7"

View File

@ -329,6 +329,24 @@ pub fn run() {
# pool (0 = equal to max_open_connections).
min_idle_connections={{ integration.postgresql.min_idle_connections }}
# AMQP / RabbitMQ.
[integration.amqp]
# Server URL.
#
# See for a specification of all the possible options:
# https://www.rabbitmq.com/uri-spec.html
url="{{ integration.amqp.url }}"
# Event routing key.
#
# This is the event routing-key template used when publishing device
# events. Messages will be published to the "amq.topic" exchange.
event_routing_key="{{ integration.amqp.event_routing_key }}"
# Use JSON encoding instead of Protobuf (binary).
json={{ integration.mqtt.json }}
# Codec configuration.
[codec]

View File

@ -243,6 +243,7 @@ pub struct Integration {
pub enabled: Vec<String>,
pub mqtt: MqttIntegration,
pub postgresql: PostgresqlIntegration,
pub amqp: AmqpIntegration,
}
#[derive(Serialize, Deserialize, Clone)]
@ -323,6 +324,25 @@ impl Default for PostgresqlIntegration {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct AmqpIntegration {
pub url: String,
pub json: bool,
pub event_routing_key: String,
}
impl Default for AmqpIntegration {
fn default() -> Self {
AmqpIntegration {
url: "amqp://guest:guest@localhost:5672".to_string(),
json: true,
event_routing_key: "application.{{application_id}}.device.{{dev_eui}}.event.{{event}}"
.to_string(),
}
}
}
#[derive(Serialize, Deserialize, Clone, Default)]
#[serde(default)]
pub struct Codec {

View File

@ -0,0 +1,336 @@
use std::collections::HashMap;
use anyhow::Result;
use async_trait::async_trait;
use handlebars::Handlebars;
use lapin::{
options::BasicPublishOptions, BasicProperties, Channel, Connection, ConnectionProperties,
};
use prost::Message;
use serde::Serialize;
use tokio::sync::RwLock;
use tracing::{error, info};
use super::Integration as IntegrationTrait;
use crate::config::AmqpIntegration as Config;
use chirpstack_api::integration;
// We define the connection and channel outside the Integration struct as the AMQP client does not
// implement re-connect on error. To reconnect within the Integration struct would require
// mutability of the Integration struct, which is not possible without changing the
// IntegrationTrait as we would need to change the (&self, ...) signatures to (&mut self, ...).
lazy_static! {
static ref CONNECTION: RwLock<Option<Connection>> = RwLock::new(None);
static ref CHANNEL: RwLock<Option<Channel>> = RwLock::new(None);
}
pub struct Integration<'a> {
templates: Handlebars<'a>,
json: bool,
url: String,
}
#[derive(Serialize)]
struct EventRoutingKeyContext {
pub application_id: String,
pub dev_eui: String,
pub event: String,
}
impl<'a> Integration<'a> {
pub async fn new(conf: &Config) -> Result<Integration<'a>> {
info!("Initializing AMQP integration");
// routing-key template
let mut templates = Handlebars::new();
templates.register_escape_fn(handlebars::no_escape);
templates.register_template_string("event_routing_key", &conf.event_routing_key)?;
let i = Integration {
templates,
url: conf.url.clone(),
json: conf.json,
};
i.connect().await?;
Ok(i)
}
async fn connect(&self) -> Result<()> {
info!("(Re)connecting to AMQP broker");
let mut conn_w = CONNECTION.write().await;
let mut chan_w = CHANNEL.write().await;
let options = ConnectionProperties::default()
// Use tokio executor and reactor.
// At the moment the reactor is only available for unix.
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio);
let conn = Connection::connect(&self.url, options).await?;
let chan = conn.create_channel().await?;
*conn_w = Some(conn);
*chan_w = Some(chan);
Ok(())
}
async fn publish_event(&self, routing_key: String, b: &[u8]) -> Result<()> {
info!(routing_key = %routing_key, "Publishing event");
// The publishing code is scoped, to make sure that when the scope returns, the channel
// mutex has been released. This is important since in case of an error, we will attempt to
// reconnect. Trying to aquire a write lock on the mutex while still having a read lock
// would cause a deadlock.
let res = {
let chan_r = CHANNEL.read().await;
match chan_r
.as_ref()
.unwrap()
.basic_publish(
"amq.topic",
&routing_key,
BasicPublishOptions::default(),
b,
BasicProperties::default().with_content_type(match self.json {
true => "application/json".into(),
false => "application/octet-stream".into(),
}),
)
.await
{
Ok(v) => v.await,
Err(e) => Err(e),
}
};
if let Err(e) = res {
error!(error = %e, "Publishing event error");
self.connect().await?;
return Err(anyhow::Error::new(e));
}
Ok(())
}
fn get_routing_key(&self, application_id: &str, dev_eui: &str, event: &str) -> Result<String> {
Ok(self.templates.render(
"event_routing_key",
&EventRoutingKeyContext {
application_id: application_id.to_string(),
dev_eui: dev_eui.to_string(),
event: event.to_string(),
},
)?)
}
}
#[async_trait]
impl<'a> IntegrationTrait for Integration<'a> {
async fn uplink_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::UplinkEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "up")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn join_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::JoinEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "join")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn ack_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::AckEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "ack")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn txack_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::TxAckEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "txack")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn log_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::LogEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "log")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn status_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::StatusEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "status")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn location_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::LocationEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "location")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
async fn integration_event(
&self,
_vars: &HashMap<String, String>,
pl: &integration::IntegrationEvent,
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
let key = self.get_routing_key(&di.application_id, &di.dev_eui, "integration")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_event(key, &b).await
}
}
#[cfg(test)]
pub mod test {
use super::*;
use futures::stream::StreamExt;
use lapin::options::{
BasicAckOptions, BasicConsumeOptions, QueueBindOptions, QueueDeclareOptions,
};
use lapin::types::FieldTable;
use std::time::Duration;
use tokio::time::sleep;
use uuid::Uuid;
#[tokio::test]
async fn test_amqp() {
let conf = Config {
url: "amqp://guest:guest@rabbitmq:5672".to_string(),
json: true,
event_routing_key: "application.{{application_id}}.device.{{dev_eui}}.event.{{event}}"
.to_string(),
};
let conn = loop {
match Connection::connect(
&conf.url,
ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
)
.await
{
Ok(v) => {
break v;
}
Err(e) => {
println!("AMQP connect error: {:?}", e);
sleep(Duration::from_secs(1)).await;
}
}
};
let chan = conn.create_channel().await.unwrap();
let _queue = chan
.queue_declare(
"test-queue",
QueueDeclareOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
chan.queue_bind(
"test-queue",
"amq.topic",
"*.*.*.*.*.*",
QueueBindOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let mut consumer = chan
.basic_consume(
"test-queue",
"test-consumer",
BasicConsumeOptions::default(),
FieldTable::default(),
)
.await
.unwrap();
let i = Integration::new(&conf).await.unwrap();
let pl = integration::UplinkEvent {
device_info: Some(integration::DeviceInfo {
application_id: Uuid::nil().to_string(),
dev_eui: "0102030405060708".to_string(),
..Default::default()
}),
..Default::default()
};
i.uplink_event(&HashMap::new(), &pl).await.unwrap();
let delivery = consumer.next().await.unwrap().unwrap();
delivery.ack(BasicAckOptions::default()).await.unwrap();
assert_eq!(
"application.00000000-0000-0000-0000-000000000000.device.0102030405060708.event.up",
delivery.routing_key.to_string()
);
assert_eq!(serde_json::to_vec(&pl).unwrap(), delivery.data);
}
}

View File

@ -13,6 +13,7 @@ use crate::{codec, config};
use chirpstack_api::integration;
use lrwn::EUI64;
mod amqp;
mod aws_sns;
mod azure_service_bus;
mod gcp_pub_sub;
@ -55,6 +56,11 @@ pub async fn setup() -> Result<()> {
postgresql::Integration::new(&conf.integration.postgresql)
.context("Setup PostgreSQL integration")?,
)),
"amqp" => integrations.push(Box::new(
amqp::Integration::new(&conf.integration.amqp)
.await
.context("Setup AMQP integration")?,
)),
_ => {
return Err(anyhow!("Unexpected integration: {}", name));
}

View File

@ -444,7 +444,6 @@ pub mod test {
#[tokio::test]
async fn test_mqtt() {
// to avoid race-conditions with other tests using MQTT
let _guard = test::prepare().await;
// setup base objects

View File

@ -8,6 +8,7 @@ services:
- postgres
- redis
- mosquitto
- rabbitmq
environment:
- DATABASE_URL=postgres://chirpstack_test:chirpstack_test@postgres/chirpstack_test?sslmode=disable
- REDIS_HOST=redis
@ -52,3 +53,8 @@ services:
image: eclipse-mosquitto:1.6
ports:
- "1883:1883"
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "15672:15672"