Auto-detect if MQTT broker supports shared-subscriptions.

Closes #413.
This commit is contained in:
Orne Brocaar 2024-05-02 13:48:36 +01:00
parent 0d11ad609a
commit 5f6ccc35fb

View File

@ -185,10 +185,16 @@ impl<'a> MqttBackend<'a> {
} else {
conf.event_topic.clone()
};
let event_topic = format!("$share/{}/{}", conf.share_name, event_topic);
let share_name = conf.share_name.clone();
async move {
while connect_rx.recv().await.is_some() {
while let Some(shared_sub_support) = connect_rx.recv().await {
let event_topic = if shared_sub_support {
format!("$share/{}/{}", share_name, event_topic)
} else {
event_topic.clone()
};
info!(region_id = %region_config_id, event_topic = %event_topic, "Subscribing to gateway event topic");
if let Err(e) = client.subscribe(&event_topic, qos).await {
error!(region_id = %region_config_id, event_topic = %event_topic, error = %e, "MQTT subscribe error");
@ -222,7 +228,18 @@ impl<'a> MqttBackend<'a> {
}
Event::Incoming(Incoming::ConnAck(v)) => {
if v.code == ConnectReturnCode::Success {
if let Err(e) = connect_tx.try_send(()) {
// Per specification:
// A value of 1 means Shared Subscriptions are supported. If not present, then Shared Subscriptions are supported.
let shared_sub_support = v
.properties
.map(|v| {
v.shared_subscription_available
.map(|v| v == 1)
.unwrap_or(true)
})
.unwrap_or(true);
if let Err(e) = connect_tx.try_send(shared_sub_support) {
error!(error = %e, "Send to subscribe channel error");
}
} else {