Refactor gateway MQTT backend to use rumqttc.

This commit is contained in:
Orne Brocaar 2023-11-29 13:29:27 +00:00
parent ed06de231e
commit 5b6d037469
4 changed files with 123 additions and 195 deletions

43
Cargo.lock generated
View File

@ -783,7 +783,6 @@ dependencies = [
"mime_guess",
"openidconnect",
"openssl",
"paho-mqtt",
"pbjson-types",
"pbkdf2",
"petgraph",
@ -1031,16 +1030,6 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "crossbeam-channel"
version = "0.5.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a33c2bf77f2df06183c3aa30d1e96c0695a313d4f9c453cc3762a6db39f99200"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
@ -1729,12 +1718,6 @@ version = "0.3.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2"
[[package]]
name = "futures-timer"
version = "3.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e64b03909df88034c26dc1547e8970b91f98bdb65165d6a4e9110d94263dbb2c"
[[package]]
name = "futures-util"
version = "0.3.29"
@ -2866,32 +2849,6 @@ dependencies = [
"sha2",
]
[[package]]
name = "paho-mqtt"
version = "0.12.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "19e405de34b835fb6457d8b0169eda21949f855472b3e346556af9e29fac6eb2"
dependencies = [
"async-channel 1.9.0",
"crossbeam-channel",
"futures",
"futures-timer",
"libc",
"log",
"paho-mqtt-sys",
"thiserror",
]
[[package]]
name = "paho-mqtt-sys"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5e482419d847af4ec43c07eed70f5f94f87dc712d267aecc91ab940944ab6bf4"
dependencies = [
"cmake",
"openssl-sys",
]
[[package]]
name = "parking"
version = "2.2.0"

View File

@ -109,7 +109,6 @@ openssl = { version = "0.10" }
openidconnect = { version = "3.3", features = ["accept-rfc3339-timestamps"] }
# MQTT
paho-mqtt = { version = "0.12", features = ["ssl"] }
rumqttc = { version = "0.23", features = ["url"] }
hex = "0.4"

View File

@ -1,28 +1,31 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::env::temp_dir;
use std::hash::Hasher;
use std::io::Cursor;
use std::sync::RwLock;
use std::time::Duration;
use anyhow::{Context, Result};
use anyhow::Result;
use async_trait::async_trait;
use chrono::Utc;
use futures::stream::StreamExt;
use handlebars::Handlebars;
use paho_mqtt as mqtt;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prost::Message;
use rand::Rng;
use rumqttc::tokio_rustls::rustls;
use rumqttc::v5::mqttbytes::v5::{ConnectReturnCode, Publish};
use rumqttc::v5::{mqttbytes::QoS, AsyncClient, Event, Incoming, MqttOptions};
use rumqttc::Transport;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{error, info, trace};
use super::GatewayBackend;
use crate::config::GatewayBackendMqtt;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::monitoring::prometheus;
use crate::storage::{get_async_redis_conn, redis_key};
use crate::{downlink, uplink};
@ -60,15 +63,12 @@ lazy_static! {
static ref GATEWAY_JSON: RwLock<HashMap<String, bool>> = RwLock::new(HashMap::new());
}
struct MqttContext {
region_config_id: String,
}
pub struct MqttBackend<'a> {
client: mqtt::AsyncClient,
client: AsyncClient,
templates: handlebars::Handlebars<'a>,
qos: usize,
qos: QoS,
v4_migrate: bool,
region_config_id: String,
}
#[derive(Serialize)]
@ -109,126 +109,73 @@ impl<'a> MqttBackend<'a> {
conf.client_id.clone()
};
// Create subscribe channel
// This is needed as we can't subscribe within the set_connected_callback as this would
// block the callback (we want to wait for success or error), which would create a
// deadlock. We need to re-subscribe on (re)connect to be sure we have a subscription. Even
// Get QoS
let qos = match conf.qos {
0 => QoS::AtMostOnce,
1 => QoS::AtLeastOnce,
2 => QoS::ExactlyOnce,
_ => return Err(anyhow!("Invalid QoS: {}", conf.qos)),
};
// Create connect channel
// We need to re-subscribe on (re)connect to be sure we have a subscription. Even
// in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the
// session and that a re-connect would recover the subscription.
let (subscribe_tx, mut subscribe_rx) = mpsc::channel(10);
let (connect_tx, mut connect_rx) = mpsc::channel(10);
// create client
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(&conf.server)
.client_id(&client_id)
.user_data(Box::new(MqttContext {
region_config_id: region_config_id.to_string(),
}))
.persistence(mqtt::create_options::PersistenceType::FilePath(temp_dir()))
.finalize();
let mut client = mqtt::AsyncClient::new(create_opts).context("Create MQTT client")?;
client.set_connected_callback(move |client| {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(region_config_id = %ctx.region_config_id, "Connected to MQTT broker");
if let Err(e) = subscribe_tx.try_send(()) {
error!(region_id = %ctx.region_config_id, error = %e, "Send to subscribe channel error");
}
});
client.set_connection_lost_callback(|client| {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(region_id = %ctx.region_config_id, "MQTT connection to broker lost");
});
// connection options
let mut conn_opts_b = mqtt::ConnectOptionsBuilder::new();
conn_opts_b.automatic_reconnect(Duration::from_secs(1), Duration::from_secs(30));
conn_opts_b.clean_session(conf.clean_session);
conn_opts_b.keep_alive_interval(conf.keep_alive_interval);
if !conf.username.is_empty() {
conn_opts_b.user_name(&conf.username);
}
if !conf.password.is_empty() {
conn_opts_b.password(&conf.password);
// Create client
let mut mqtt_opts =
MqttOptions::parse_url(format!("{}?client_id={}", conf.server, client_id))?;
mqtt_opts.set_clean_start(conf.clean_session);
mqtt_opts.set_keep_alive(conf.keep_alive_interval);
if !conf.username.is_empty() || !conf.password.is_empty() {
mqtt_opts.set_credentials(&conf.username, &conf.password);
}
if !conf.ca_cert.is_empty() || !conf.tls_cert.is_empty() || !conf.tls_key.is_empty() {
info!(
region_id = %region_config_id,
ca_cert = conf.ca_cert.as_str(),
tls_cert = conf.tls_cert.as_str(),
tls_key = conf.tls_key.as_str(),
"Configuring connection with TLS certificate"
"Configuring client with TLS certificate, ca_cert: {}, tls_cert: {}, tls_key: {}",
conf.ca_cert, conf.tls_cert, conf.tls_key
);
let mut ssl_opts_b = mqtt::SslOptionsBuilder::new();
let root_certs = get_root_certs(if conf.ca_cert.is_empty() {
None
} else {
Some(conf.ca_cert.clone())
})?;
if !conf.ca_cert.is_empty() {
ssl_opts_b
.trust_store(&conf.ca_cert)
.context("Failed to set gateway ca_cert")?;
}
let client_conf = if conf.tls_cert.is_empty() && conf.tls_key.is_empty() {
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_certs.clone())
.with_no_client_auth()
} else {
rustls::ClientConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_certs.clone())
.with_client_auth_cert(load_cert(&conf.tls_cert)?, load_key(&conf.tls_key)?)?
};
if !conf.tls_cert.is_empty() {
ssl_opts_b
.key_store(&conf.tls_cert)
.context("Failed to set gateway tls_cert")?;
}
if !conf.tls_key.is_empty() {
ssl_opts_b
.private_key(&conf.tls_key)
.context("Failed to set gateway tls_key")?;
}
conn_opts_b.ssl_options(ssl_opts_b.finalize());
mqtt_opts.set_transport(Transport::tls_with_config(client_conf.into()));
}
let conn_opts = conn_opts_b.finalize();
// get message stream
let mut stream = client.get_stream(None);
let (client, mut eventloop) = AsyncClient::new(mqtt_opts, 100);
let b = MqttBackend {
client,
qos,
templates,
qos: conf.qos,
v4_migrate: conf.v4_migrate,
region_config_id: region_config_id.to_string(),
};
// connect
info!(region_id = %region_config_id, server_uri = %conf.server, clean_session = conf.clean_session, client_id = %client_id, "Connecting to MQTT broker");
b.client
.connect(conn_opts)
.await
.context("Connect to MQTT broker")?;
// Consumer loop.
tokio::spawn({
let region_config_id = region_config_id.to_string();
let v4_migrate = conf.v4_migrate;
async move {
info!(region_id = %region_config_id, "Starting MQTT consumer loop");
while let Some(msg_opt) = stream.next().await {
if let Some(msg) = msg_opt {
message_callback(v4_migrate, &region_config_id, region_common_name, msg)
.await;
}
}
}
});
// (Re)subscribe loop.
// (Re)subscribe loop
tokio::spawn({
let client = b.client.clone();
let qos = b.qos;
let region_config_id = region_config_id.to_string();
let event_topic = if conf.event_topic.is_empty() {
let event_topic = "gateway/+/event/+".to_string();
@ -240,11 +187,9 @@ impl<'a> MqttBackend<'a> {
} else {
conf.event_topic.clone()
};
let client = b.client.clone();
let qos = conf.qos as i32;
async move {
while subscribe_rx.recv().await.is_some() {
while connect_rx.recv().await.is_some() {
info!(region_id = %region_config_id, event_topic = %event_topic, "Subscribing to gateway event topic");
if let Err(e) = client.subscribe(&event_topic, qos).await {
error!(region_id = %region_config_id, event_topic = %event_topic, error = %e, "MQTT subscribe error");
@ -253,6 +198,51 @@ impl<'a> MqttBackend<'a> {
}
});
// Eventloop
tokio::spawn({
let region_config_id = region_config_id.to_string();
let v4_migrate = conf.v4_migrate;
async move {
info!("Starting MQTT event loop");
loop {
match eventloop.poll().await {
Ok(v) => {
trace!(event = ?v, "MQTT event");
match v {
Event::Incoming(Incoming::Publish(p)) => {
message_callback(
v4_migrate,
&region_config_id,
region_common_name,
p,
)
.await
}
Event::Incoming(Incoming::ConnAck(v)) => {
if v.code == ConnectReturnCode::Success {
if let Err(e) = connect_tx.try_send(()) {
error!(error = %e, "Send to subscribe channel error");
}
} else {
error!(code = ?v.code, "Connection error");
sleep(Duration::from_secs(1)).await
}
}
_ => {}
}
}
Err(e) => {
error!(error = %e, "MQTT error");
sleep(Duration::from_secs(1)).await
}
}
}
}
});
// return backend
Ok(b)
}
@ -271,13 +261,6 @@ impl<'a> MqttBackend<'a> {
#[async_trait]
impl GatewayBackend for MqttBackend<'_> {
async fn send_downlink(&self, df: &chirpstack_api::gw::DownlinkFrame) -> Result<()> {
let ctx = self
.client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
COMMAND_COUNTER
.get_or_create(&CommandLabels {
command: "down".to_string(),
@ -296,10 +279,9 @@ impl GatewayBackend for MqttBackend<'_> {
false => df.encode_to_vec(),
};
info!(region_id = %ctx.region_config_id, gateway_id = %df.gateway_id, topic = %topic, json = json, "Sending downlink frame");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
trace!("Message sent");
info!(region_id = %self.region_config_id, gateway_id = %df.gateway_id, topic = %topic, json = json, "Sending downlink frame");
self.client.publish(topic, self.qos, false, b).await?;
trace!("Message published");
Ok(())
}
@ -308,13 +290,6 @@ impl GatewayBackend for MqttBackend<'_> {
&self,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()> {
let ctx = self
.client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
COMMAND_COUNTER
.get_or_create(&CommandLabels {
command: "config".to_string(),
@ -327,10 +302,9 @@ impl GatewayBackend for MqttBackend<'_> {
false => gw_conf.encode_to_vec(),
};
info!(region_id = %ctx.region_config_id, gateway_id = %gw_conf.gateway_id, topic = %topic, json = json, "Sending gateway configuration");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
trace!("Message sent");
info!(region_id = %self.region_config_id, gateway_id = %gw_conf.gateway_id, topic = %topic, json = json, "Sending gateway configuration");
self.client.publish(topic, self.qos, false, b).await?;
trace!("Message published");
Ok(())
}
@ -340,14 +314,12 @@ async fn message_callback(
v4_migrate: bool,
region_config_id: &str,
region_common_name: CommonName,
msg: mqtt::Message,
p: Publish,
) {
let topic = msg.topic();
let qos = msg.qos();
let b = msg.payload();
let topic = String::from_utf8_lossy(&p.topic);
let mut hasher = DefaultHasher::new();
hasher.write(b);
hasher.write(&p.payload);
let key = redis_key(format!("gw:mqtt:lock:{:x}", hasher.finish()));
let locked = is_locked(key).await;
@ -355,19 +327,19 @@ async fn message_callback(
if locked? {
trace!(
region_id = region_config_id,
topic = topic,
qos = qos,
topic = %topic,
qos = ?p.qos,
"Message is already handled by different instance"
);
return Ok(());
}
let json = payload_is_json(b);
let json = payload_is_json(&p.payload);
info!(
region_id = region_config_id,
topic = topic,
qos = qos,
topic = %topic,
qos = ?p.qos,
json = json,
"Message received from gateway"
);
@ -379,8 +351,8 @@ async fn message_callback(
})
.inc();
let mut event = match json {
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(b))?,
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(&p.payload))?,
};
if v4_migrate {
@ -407,8 +379,8 @@ async fn message_callback(
})
.inc();
let mut event = match json {
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(b))?,
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(&p.payload))?,
};
if v4_migrate {
@ -431,8 +403,8 @@ async fn message_callback(
})
.inc();
let mut event = match json {
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(b))?,
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(&p.payload))?,
};
if v4_migrate {
@ -452,8 +424,8 @@ async fn message_callback(
if err.is_some() {
error!(
region_id = %region_config_id,
topic = topic,
qos = qos,
topic = %topic,
qos = ?p.qos,
"Processing gateway event error: {}",
err.as_ref().unwrap()
);

View File

@ -85,7 +85,7 @@ impl<'a> Integration<'a> {
// We need to re-subscribe on (re)connect to be sure we have a subscription. Even
// in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the
// session and that a re-connect would recover the subscription.
let (connect_tx, mut connect_rx) = mpsc::channel(1);
let (connect_tx, mut connect_rx) = mpsc::channel(10);
// Create client
let mut mqtt_opts =