diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index d2e52c93..9fe7b7ff 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -185,10 +185,16 @@ impl<'a> MqttBackend<'a> { } else { conf.event_topic.clone() }; - let event_topic = format!("$share/{}/{}", conf.share_name, event_topic); + let share_name = conf.share_name.clone(); async move { - while connect_rx.recv().await.is_some() { + while let Some(shared_sub_support) = connect_rx.recv().await { + let event_topic = if shared_sub_support { + format!("$share/{}/{}", share_name, event_topic) + } else { + event_topic.clone() + }; + info!(region_id = %region_config_id, event_topic = %event_topic, "Subscribing to gateway event topic"); if let Err(e) = client.subscribe(&event_topic, qos).await { error!(region_id = %region_config_id, event_topic = %event_topic, error = %e, "MQTT subscribe error"); @@ -222,7 +228,18 @@ impl<'a> MqttBackend<'a> { } Event::Incoming(Incoming::ConnAck(v)) => { if v.code == ConnectReturnCode::Success { - if let Err(e) = connect_tx.try_send(()) { + // Per specification: + // A value of 1 means Shared Subscriptions are supported. If not present, then Shared Subscriptions are supported. + let shared_sub_support = v + .properties + .map(|v| { + v.shared_subscription_available + .map(|v| v == 1) + .unwrap_or(true) + }) + .unwrap_or(true); + + if let Err(e) = connect_tx.try_send(shared_sub_support) { error!(error = %e, "Send to subscribe channel error"); } } else {