Change clean_session default + fix re-subscribe.

This changes the clean_session default to false, as only in case of a
persistent session, qos > 0 would be effective. If the client_id is not
set, then ChirpStack will generate a random client_id, which stays the
same during the lifetime of the chirpstack process.

This also implements a subscribe loop, as the client re-connect feature
does not re-subscribe. Even in case of a persistent session there is no
guarantee that the subscription is recovered, as it might have been a
MQTT broker restart. In case the broker stores the sessions in-memory,
the client would re-connect, but without subscriptions.

The (re)subscribe logic is placed outside the on-connected callback, as
the callback function must not block, thus can not wait for the
subscribe result. No the (re)subscribe happens async from the
on-connected.
This commit is contained in:
Orne Brocaar 2022-07-30 13:23:54 +01:00
parent f58e39e503
commit 3ab830f5a0
33 changed files with 222 additions and 163 deletions

24
Cargo.lock generated
View File

@ -1068,9 +1068,9 @@ dependencies = [
[[package]]
name = "cmake"
version = "0.1.48"
version = "0.1.45"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a"
checksum = "eb6210b637171dfba4cda12e579ac6dc73f5165ad56133e5d72ef3131f320855"
dependencies = [
"cc",
]
@ -1131,6 +1131,16 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff"
[[package]]
name = "crossbeam-channel"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [
"cfg-if",
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.10"
@ -2600,10 +2610,12 @@ dependencies = [
[[package]]
name = "paho-mqtt"
version = "0.9.1"
version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d82fea0990fe54e75d575bbd9bc2ee5919fd10cc0b4a95f1967528083129fc4b"
checksum = "9fac58bae33ba9679bb4908ffa7c3950114345860d3f9b98340c4943f18ff324"
dependencies = [
"async-channel",
"crossbeam-channel",
"futures",
"futures-timer",
"libc",
@ -2614,9 +2626,9 @@ dependencies = [
[[package]]
name = "paho-mqtt-sys"
version = "0.5.0"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ad9ac6a77a7e7c70cd51262b94ab666c9e4c38fb0f4201dba8d7f8589aa8ce4"
checksum = "10e6244f27644eed5709e318a3ad7f785906fbb6030f0a9b9ba50923b456c0c5"
dependencies = [
"cmake",
"openssl-sys",

View File

@ -90,7 +90,7 @@ openssl = { version = "0.10", features = ["vendored"] }
openidconnect = { version = "2.3.1", features = ["accept-rfc3339-timestamps"] }
# MQTT
paho-mqtt = { version = "0.9", features = ["vendored-ssl"] }
paho-mqtt = { version = "0.11", features = ["vendored-ssl"] }
hex = "0.4"
# Codecs

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -57,13 +57,13 @@
# Set the "clean session" flag in the connect message when this client
# connects to an MQTT broker. By setting this flag you are indicating
# that no messages saved by the broker for this client should be delivered.
clean_session=true
clean_session=false
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id=""
# CA certificate file (optional)

View File

@ -316,8 +316,8 @@ pub fn run() {
# Client ID
#
# Set the client id to be used by this client when connecting to the MQTT
# broker. A client id must be no longer than 23 characters. When left blank,
# a random id will be generated. This requires clean_session=true.
# broker. A client id must be no longer than 23 characters. If left blank,
# a random id will be generated by ChirpStack.
client_id="{{ integration.mqtt.client_id }}"
# CA certificate file (optional)

View File

@ -280,7 +280,7 @@ impl Default for MqttIntegration {
username: "".into(),
password: "".into(),
qos: 0,
clean_session: true,
clean_session: false,
client_id: "".into(),
ca_cert: "".into(),
tls_cert: "".into(),
@ -532,7 +532,7 @@ impl Default for Region {
command_topic: "eu868/gateway/{{ gateway_id }}/command/{{ command }}"
.into(),
server: "tcp://127.0.0.1:1883".into(),
clean_session: true,
clean_session: false,
..Default::default()
},
},

View File

@ -12,7 +12,9 @@ use prometheus_client::encoding::text::Encode;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prost::Message;
use rand::Rng;
use serde::Serialize;
use tokio::sync::mpsc;
use tokio::task;
use tracing::{error, info, trace};
@ -81,18 +83,56 @@ impl<'a> MqttBackend<'a> {
let mut templates = Handlebars::new();
templates.register_template_string("command_topic", &conf.command_topic)?;
// get client id, this will generate a random client_id when no client_id has been
// configured.
let client_id = if conf.client_id.is_empty() {
let mut rnd = rand::thread_rng();
let client_id: u64 = rnd.gen();
format!("{:x}", client_id)
} else {
conf.client_id.clone()
};
// Create subscribe channel
// This is needed as we can't subscribe within the set_connected_callback as this would
// block the callback (we want to wait for success or error), which would create a
// deadlock. We need to re-subscribe on (re)connect to be sure we have a subscription. Even
// in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the
// session and that a re-connect would recover the subscription.
let (subscribe_tx, mut subscribe_rx) = mpsc::channel(10);
// create client
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(&conf.server)
.client_id(&conf.client_id)
.client_id(&client_id)
.user_data(Box::new(MqttContext {
region_name: region_name.to_string(),
region_common_name,
}))
.finalize();
let mut client = mqtt::AsyncClient::new(create_opts).context("Create MQTT client")?;
client.set_connected_callback(connected_callback);
client.set_connection_lost_callback(connection_lost_callback);
client.set_connected_callback(move |client| {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(region_name = %ctx.region_name, "Connected to MQTT broker");
if let Err(e) = subscribe_tx.try_send(()) {
error!(region_name = %ctx.region_name, error = %e, "Send to subscribe channel error");
}
});
client.set_connection_lost_callback(|client| {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(region_name = %ctx.region_name, "MQTT connection to broker lost");
});
// connection options
let mut conn_opts_b = mqtt::ConnectOptionsBuilder::new();
@ -142,24 +182,13 @@ impl<'a> MqttBackend<'a> {
};
// connect
info!(
server_uri = conf.server.as_str(),
"Connecting to MQTT broker"
);
info!(region_name = %region_name, server_uri = %conf.server, clean_session = conf.clean_session, client_id = %client_id, "Connecting to MQTT broker");
b.client
.connect(conn_opts)
.await
.context("Connect to MQTT broker")?;
info!(
event_topic = conf.event_topic.as_str(),
"Subscribing to gateway event topic"
);
b.client
.subscribe(&conf.event_topic, conf.qos as i32)
.await
.context("MQTT subscribe error")?;
// Consumer loop.
tokio::spawn({
let region_name = region_name.to_string();
@ -173,6 +202,23 @@ impl<'a> MqttBackend<'a> {
}
});
// (Re)subscribe loop.
tokio::spawn({
let region_name = region_name.to_string();
let event_topic = conf.event_topic.clone();
let client = b.client.clone();
let qos = conf.qos as i32;
async move {
while subscribe_rx.recv().await.is_some() {
info!(region_name = %region_name, event_topic = %event_topic, "Subscribing to gateway event topic");
if let Err(e) = client.subscribe(&event_topic, qos).await {
error!(region_name = %region_name, event_topic = %event_topic, error = %e, "MQTT subscribe error");
}
}
}
});
// return backend
Ok(b)
}
@ -315,32 +361,6 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
}
}
fn connected_callback(client: &mqtt::AsyncClient) {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(
region_name = ctx.region_name.as_str(),
"Connected to MQTT broker"
);
}
fn connection_lost_callback(client: &mqtt::AsyncClient) {
let ctx = client
.user_data()
.unwrap()
.downcast_ref::<MqttContext>()
.unwrap();
info!(
region_name = ctx.region_name.as_str(),
"MQTT connection to broker lost"
);
}
async fn is_locked(key: String) -> Result<bool> {
task::spawn_blocking({
move || -> Result<bool> {

View File

@ -8,8 +8,10 @@ use futures::stream::StreamExt;
use handlebars::Handlebars;
use paho_mqtt as mqtt;
use prost::Message;
use rand::Rng;
use regex::Regex;
use serde::Serialize;
use tokio::sync::mpsc;
use tracing::{error, info};
use super::Integration as IntegrationTrait;
@ -48,14 +50,48 @@ impl<'a> Integration<'a> {
templates.register_template_string("event_topic", &conf.event_topic)?;
templates.register_template_string("command_topic", &conf.command_topic)?;
let command_topic = templates.render(
"command_topic",
&CommandTopicContext {
application_id: "+".into(),
dev_eui: "+".into(),
command: "+".into(),
},
)?;
// get client id, this will generate a random client_id when no client_id has been
// configured.
let client_id = if conf.client_id.is_empty() {
let mut rnd = rand::thread_rng();
let client_id: u64 = rnd.gen();
format!("{:x}", client_id)
} else {
conf.client_id.clone()
};
// Create subscribe channel
// This is needed as we can't subscribe within the set_connected_callback as this would
// block the callback (we want to wait for success or error), which would create a
// deadlock. We need to re-subscribe on (re)connect to be sure we have a subscription. Even
// in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the
// session and that a re-connect would recover the subscription.
let (subscribe_tx, mut subscribe_rx) = mpsc::channel(10);
// create client
let create_opts = mqtt::CreateOptionsBuilder::new()
.server_uri(&conf.server)
.client_id(&conf.client_id)
.client_id(&client_id)
.finalize();
let mut client = mqtt::AsyncClient::new(create_opts).context("Create MQTT client")?;
client.set_connected_callback(connected_callback);
client.set_connection_lost_callback(connection_lost_callback);
client.set_connected_callback(move |_client| {
info!("Connected to MQTT broker");
if let Err(e) = subscribe_tx.try_send(()) {
error!(error = %e, "Send to subscribe channel error");
}
});
client.set_connection_lost_callback(|_client| {
error!("MQTT connection to broker lost");
});
// connection options
let mut conn_opts_b = mqtt::ConnectOptionsBuilder::new();
@ -114,29 +150,13 @@ impl<'a> Integration<'a> {
};
// connect
info!(server_uri = %conf.server, "Connecting to MQTT broker");
info!(server_uri = %conf.server, client_id = %client_id, clean_session = conf.clean_session, "Connecting to MQTT broker");
i.client
.connect(conn_opts)
.await
.context("Connect to MQTT broker")?;
let command_topic = i.templates.render(
"command_topic",
&CommandTopicContext {
application_id: "+".into(),
dev_eui: "+".into(),
command: "+".into(),
},
)?;
info!(
command_topic = %command_topic,
"Subscribing to command topic"
);
i.client
.subscribe(&command_topic, conf.qos as i32)
.await
.context("MQTT subscribe")?;
// Command consume loop.
tokio::spawn({
let command_regex = i.command_regex.clone();
@ -169,6 +189,21 @@ impl<'a> Integration<'a> {
}
});
// (Re)subscribe loop.
tokio::spawn({
let client = i.client.clone();
let qos = conf.qos as i32;
async move {
while subscribe_rx.recv().await.is_some() {
info!(command_topic = %command_topic, "Subscribing to command topic");
if let Err(e) = client.subscribe(&command_topic, qos).await {
error!(error = %e, "MQTT subscribe error");
}
}
}
});
// Return integration.
Ok(i)
}
@ -349,14 +384,6 @@ impl IntegrationTrait for Integration<'_> {
}
}
fn connected_callback(_: &mqtt::AsyncClient) {
info!("Connected to MQTT broker");
}
fn connection_lost_callback(_: &mqtt::AsyncClient) {
info!("Connection to MQTT broker lost");
}
async fn message_callback(
application_id: String,
dev_eui: String,