Implement MQTT shared-subscription for gw backend.

This commit is contained in:
Orne Brocaar 2023-11-29 14:36:13 +00:00
parent 5b6d037469
commit 5108f4451c
41 changed files with 393 additions and 33 deletions

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="as923"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="as923_2"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="as923_3"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="as923_4"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_0"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_1"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_2"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_3"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_4"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_5"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_6"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="au915_7"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_0"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_1"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_10"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_11"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_2"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_3"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_4"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_5"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_6"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_7"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_8"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn470_9"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="cn779"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="eu433"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="eu868"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="in865"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="ism2400"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="kr920"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="ru864"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_0"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_1"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_2"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_3"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_4"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_5"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_6"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -37,6 +37,16 @@
# '/' is automatically added to the prefix if it is configured.
topic_prefix="us915_7"
# Shared subscription name.
#
# In case there are multiple ChirpStack instances sharing the same
# subscription name, then the MQTT broker will deliver a gateway event
# only to one subscriber. In case you have a production and
# test-environment connected to the same MQTT broker, make sure that
# each environment has its own subscription name, for example:
# chirpstack_prod and chirpstack_tst.
share_name="chirpstack"
# MQTT server (e.g. scheme://host:port where scheme is tcp, ssl or ws)
server="tcp://localhost:1883"

View File

@ -624,6 +624,7 @@ pub struct GatewayBackendMqtt {
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
pub v4_migrate: bool,
pub share_name: String,
}
impl Default for GatewayBackendMqtt {
@ -643,6 +644,7 @@ impl Default for GatewayBackendMqtt {
tls_key: "".into(),
keep_alive_interval: Duration::from_secs(30),
v4_migrate: false,
share_name: "chirpstack".into(),
}
}
}

View File

@ -1,6 +1,4 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::hash::Hasher;
use std::io::Cursor;
use std::sync::RwLock;
use std::time::Duration;
@ -27,7 +25,6 @@ use super::GatewayBackend;
use crate::config::GatewayBackendMqtt;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::monitoring::prometheus;
use crate::storage::{get_async_redis_conn, redis_key};
use crate::{downlink, uplink};
use lrwn::region::CommonName;
@ -187,6 +184,7 @@ impl<'a> MqttBackend<'a> {
} else {
conf.event_topic.clone()
};
let event_topic = format!("$share/{}/{}", conf.share_name, event_topic);
async move {
while connect_rx.recv().await.is_some() {
@ -318,22 +316,7 @@ async fn message_callback(
) {
let topic = String::from_utf8_lossy(&p.topic);
let mut hasher = DefaultHasher::new();
hasher.write(&p.payload);
let key = redis_key(format!("gw:mqtt:lock:{:x}", hasher.finish()));
let locked = is_locked(key).await;
let err = || -> Result<()> {
if locked? {
trace!(
region_id = region_config_id,
topic = %topic,
qos = ?p.qos,
"Message is already handled by different instance"
);
return Ok(());
}
let json = payload_is_json(&p.payload);
info!(
@ -432,21 +415,6 @@ async fn message_callback(
}
}
async fn is_locked(key: String) -> Result<bool> {
let mut c = get_async_redis_conn().await?;
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock")
.arg("PX")
.arg(5000)
.arg("NX")
.query_async(&mut c)
.await?;
Ok(!set)
}
fn gateway_is_json(gateway_id: &str) -> bool {
let gw_json_r = GATEWAY_JSON.read().unwrap();
gw_json_r.get(gateway_id).cloned().unwrap_or(false)