From 3ab830f5a07efdbaa3b81b5428329a4de89297c1 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Sat, 30 Jul 2022 13:23:54 +0100 Subject: [PATCH] Change clean_session default + fix re-subscribe. This changes the clean_session default to false, as only in case of a persistent session, qos > 0 would be effective. If the client_id is not set, then ChirpStack will generate a random client_id, which stays the same during the lifetime of the chirpstack process. This also implements a subscribe loop, as the client re-connect feature does not re-subscribe. Even in case of a persistent session there is no guarantee that the subscription is recovered, as it might have been a MQTT broker restart. In case the broker stores the sessions in-memory, the client would re-connect, but without subscriptions. The (re)subscribe logic is placed outside the on-connected callback, as the callback function must not block, thus can not wait for the subscribe result. No the (re)subscribe happens async from the on-connected. --- Cargo.lock | 24 +++-- chirpstack/Cargo.toml | 2 +- chirpstack/configuration/region_as923.toml | 6 +- chirpstack/configuration/region_as923_2.toml | 6 +- chirpstack/configuration/region_as923_3.toml | 6 +- chirpstack/configuration/region_as923_4.toml | 6 +- chirpstack/configuration/region_au915_0.toml | 6 +- chirpstack/configuration/region_au915_1.toml | 6 +- chirpstack/configuration/region_au915_2.toml | 6 +- chirpstack/configuration/region_au915_3.toml | 6 +- chirpstack/configuration/region_au915_4.toml | 6 +- chirpstack/configuration/region_au915_5.toml | 6 +- chirpstack/configuration/region_au915_6.toml | 6 +- chirpstack/configuration/region_au915_7.toml | 6 +- chirpstack/configuration/region_cn779.toml | 6 +- chirpstack/configuration/region_eu433.toml | 6 +- chirpstack/configuration/region_eu868.toml | 6 +- chirpstack/configuration/region_in865.toml | 6 +- chirpstack/configuration/region_ism2400.toml | 6 +- chirpstack/configuration/region_kr920.toml | 6 +- chirpstack/configuration/region_ru864.toml | 6 +- chirpstack/configuration/region_us915_0.toml | 6 +- chirpstack/configuration/region_us915_1.toml | 6 +- chirpstack/configuration/region_us915_2.toml | 6 +- chirpstack/configuration/region_us915_3.toml | 6 +- chirpstack/configuration/region_us915_4.toml | 6 +- chirpstack/configuration/region_us915_5.toml | 6 +- chirpstack/configuration/region_us915_6.toml | 6 +- chirpstack/configuration/region_us915_7.toml | 6 +- chirpstack/src/cmd/configfile.rs | 4 +- chirpstack/src/config.rs | 4 +- chirpstack/src/gateway/backend/mqtt.rs | 104 +++++++++++-------- chirpstack/src/integration/mqtt.rs | 85 +++++++++------ 33 files changed, 222 insertions(+), 163 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index fc5e7e8b..c642638f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1068,9 +1068,9 @@ dependencies = [ [[package]] name = "cmake" -version = "0.1.48" +version = "0.1.45" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8ad8cef104ac57b68b89df3208164d228503abbdce70f6880ffa3d970e7443a" +checksum = "eb6210b637171dfba4cda12e579ac6dc73f5165ad56133e5d72ef3131f320855" dependencies = [ "cc", ] @@ -1131,6 +1131,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "338089f42c427b86394a5ee60ff321da23a5c89c9d89514c829687b26359fcff" +[[package]] +name = "crossbeam-channel" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" +dependencies = [ + "cfg-if", + "crossbeam-utils", +] + [[package]] name = "crossbeam-utils" version = "0.8.10" @@ -2600,10 +2610,12 @@ dependencies = [ [[package]] name = "paho-mqtt" -version = "0.9.1" +version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d82fea0990fe54e75d575bbd9bc2ee5919fd10cc0b4a95f1967528083129fc4b" +checksum = "9fac58bae33ba9679bb4908ffa7c3950114345860d3f9b98340c4943f18ff324" dependencies = [ + "async-channel", + "crossbeam-channel", "futures", "futures-timer", "libc", @@ -2614,9 +2626,9 @@ dependencies = [ [[package]] name = "paho-mqtt-sys" -version = "0.5.0" +version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1ad9ac6a77a7e7c70cd51262b94ab666c9e4c38fb0f4201dba8d7f8589aa8ce4" +checksum = "10e6244f27644eed5709e318a3ad7f785906fbb6030f0a9b9ba50923b456c0c5" dependencies = [ "cmake", "openssl-sys", diff --git a/chirpstack/Cargo.toml b/chirpstack/Cargo.toml index 4e47f18f..4f6870db 100644 --- a/chirpstack/Cargo.toml +++ b/chirpstack/Cargo.toml @@ -90,7 +90,7 @@ openssl = { version = "0.10", features = ["vendored"] } openidconnect = { version = "2.3.1", features = ["accept-rfc3339-timestamps"] } # MQTT -paho-mqtt = { version = "0.9", features = ["vendored-ssl"] } +paho-mqtt = { version = "0.11", features = ["vendored-ssl"] } hex = "0.4" # Codecs diff --git a/chirpstack/configuration/region_as923.toml b/chirpstack/configuration/region_as923.toml index 12b839a5..a8267645 100644 --- a/chirpstack/configuration/region_as923.toml +++ b/chirpstack/configuration/region_as923.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_as923_2.toml b/chirpstack/configuration/region_as923_2.toml index f71140eb..75f08257 100644 --- a/chirpstack/configuration/region_as923_2.toml +++ b/chirpstack/configuration/region_as923_2.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_as923_3.toml b/chirpstack/configuration/region_as923_3.toml index 992a8ac4..8665e9cf 100644 --- a/chirpstack/configuration/region_as923_3.toml +++ b/chirpstack/configuration/region_as923_3.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_as923_4.toml b/chirpstack/configuration/region_as923_4.toml index 6372794b..703d1676 100644 --- a/chirpstack/configuration/region_as923_4.toml +++ b/chirpstack/configuration/region_as923_4.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_0.toml b/chirpstack/configuration/region_au915_0.toml index 67b7969b..4323e7c0 100644 --- a/chirpstack/configuration/region_au915_0.toml +++ b/chirpstack/configuration/region_au915_0.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_1.toml b/chirpstack/configuration/region_au915_1.toml index f6b238c5..7323a2a6 100644 --- a/chirpstack/configuration/region_au915_1.toml +++ b/chirpstack/configuration/region_au915_1.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_2.toml b/chirpstack/configuration/region_au915_2.toml index 6d377379..161a753a 100644 --- a/chirpstack/configuration/region_au915_2.toml +++ b/chirpstack/configuration/region_au915_2.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_3.toml b/chirpstack/configuration/region_au915_3.toml index 8b82252c..74a13a57 100644 --- a/chirpstack/configuration/region_au915_3.toml +++ b/chirpstack/configuration/region_au915_3.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_4.toml b/chirpstack/configuration/region_au915_4.toml index 5d367a9c..57c15ec8 100644 --- a/chirpstack/configuration/region_au915_4.toml +++ b/chirpstack/configuration/region_au915_4.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_5.toml b/chirpstack/configuration/region_au915_5.toml index a0ee5d22..2d2ce029 100644 --- a/chirpstack/configuration/region_au915_5.toml +++ b/chirpstack/configuration/region_au915_5.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_6.toml b/chirpstack/configuration/region_au915_6.toml index 3a7832cd..476befa1 100644 --- a/chirpstack/configuration/region_au915_6.toml +++ b/chirpstack/configuration/region_au915_6.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_au915_7.toml b/chirpstack/configuration/region_au915_7.toml index 0ca4f232..e51ffcc0 100644 --- a/chirpstack/configuration/region_au915_7.toml +++ b/chirpstack/configuration/region_au915_7.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_cn779.toml b/chirpstack/configuration/region_cn779.toml index dd2747f9..ced99342 100644 --- a/chirpstack/configuration/region_cn779.toml +++ b/chirpstack/configuration/region_cn779.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_eu433.toml b/chirpstack/configuration/region_eu433.toml index 5dcabe3b..c15367cb 100644 --- a/chirpstack/configuration/region_eu433.toml +++ b/chirpstack/configuration/region_eu433.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_eu868.toml b/chirpstack/configuration/region_eu868.toml index a8535b5d..8812ebb3 100644 --- a/chirpstack/configuration/region_eu868.toml +++ b/chirpstack/configuration/region_eu868.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_in865.toml b/chirpstack/configuration/region_in865.toml index 86c3c190..609b55d3 100644 --- a/chirpstack/configuration/region_in865.toml +++ b/chirpstack/configuration/region_in865.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_ism2400.toml b/chirpstack/configuration/region_ism2400.toml index c6aa02fd..50d2ce23 100644 --- a/chirpstack/configuration/region_ism2400.toml +++ b/chirpstack/configuration/region_ism2400.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_kr920.toml b/chirpstack/configuration/region_kr920.toml index d736d9f9..ebdfff09 100644 --- a/chirpstack/configuration/region_kr920.toml +++ b/chirpstack/configuration/region_kr920.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_ru864.toml b/chirpstack/configuration/region_ru864.toml index 8e1164b7..d7578bdd 100644 --- a/chirpstack/configuration/region_ru864.toml +++ b/chirpstack/configuration/region_ru864.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_0.toml b/chirpstack/configuration/region_us915_0.toml index d561a235..9953c49d 100644 --- a/chirpstack/configuration/region_us915_0.toml +++ b/chirpstack/configuration/region_us915_0.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_1.toml b/chirpstack/configuration/region_us915_1.toml index 8fcc5d74..30611769 100644 --- a/chirpstack/configuration/region_us915_1.toml +++ b/chirpstack/configuration/region_us915_1.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_2.toml b/chirpstack/configuration/region_us915_2.toml index 9baffa98..94620139 100644 --- a/chirpstack/configuration/region_us915_2.toml +++ b/chirpstack/configuration/region_us915_2.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_3.toml b/chirpstack/configuration/region_us915_3.toml index bd50ceaf..e507cf0c 100644 --- a/chirpstack/configuration/region_us915_3.toml +++ b/chirpstack/configuration/region_us915_3.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_4.toml b/chirpstack/configuration/region_us915_4.toml index fdf57a4b..f9e23be2 100644 --- a/chirpstack/configuration/region_us915_4.toml +++ b/chirpstack/configuration/region_us915_4.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_5.toml b/chirpstack/configuration/region_us915_5.toml index 19be6f32..11948c7d 100644 --- a/chirpstack/configuration/region_us915_5.toml +++ b/chirpstack/configuration/region_us915_5.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_6.toml b/chirpstack/configuration/region_us915_6.toml index ddb754c6..aee3b346 100644 --- a/chirpstack/configuration/region_us915_6.toml +++ b/chirpstack/configuration/region_us915_6.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/configuration/region_us915_7.toml b/chirpstack/configuration/region_us915_7.toml index 3fb4ea63..86818aa9 100644 --- a/chirpstack/configuration/region_us915_7.toml +++ b/chirpstack/configuration/region_us915_7.toml @@ -57,13 +57,13 @@ # Set the "clean session" flag in the connect message when this client # connects to an MQTT broker. By setting this flag you are indicating # that no messages saved by the broker for this client should be delivered. - clean_session=true + clean_session=false # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="" # CA certificate file (optional) diff --git a/chirpstack/src/cmd/configfile.rs b/chirpstack/src/cmd/configfile.rs index 74bdee0a..6675237a 100644 --- a/chirpstack/src/cmd/configfile.rs +++ b/chirpstack/src/cmd/configfile.rs @@ -316,8 +316,8 @@ pub fn run() { # Client ID # # Set the client id to be used by this client when connecting to the MQTT - # broker. A client id must be no longer than 23 characters. When left blank, - # a random id will be generated. This requires clean_session=true. + # broker. A client id must be no longer than 23 characters. If left blank, + # a random id will be generated by ChirpStack. client_id="{{ integration.mqtt.client_id }}" # CA certificate file (optional) diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 7db032f7..44999141 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -280,7 +280,7 @@ impl Default for MqttIntegration { username: "".into(), password: "".into(), qos: 0, - clean_session: true, + clean_session: false, client_id: "".into(), ca_cert: "".into(), tls_cert: "".into(), @@ -532,7 +532,7 @@ impl Default for Region { command_topic: "eu868/gateway/{{ gateway_id }}/command/{{ command }}" .into(), server: "tcp://127.0.0.1:1883".into(), - clean_session: true, + clean_session: false, ..Default::default() }, }, diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index dab6d5f9..c7e0ae57 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -12,7 +12,9 @@ use prometheus_client::encoding::text::Encode; use prometheus_client::metrics::counter::Counter; use prometheus_client::metrics::family::Family; use prost::Message; +use rand::Rng; use serde::Serialize; +use tokio::sync::mpsc; use tokio::task; use tracing::{error, info, trace}; @@ -81,18 +83,56 @@ impl<'a> MqttBackend<'a> { let mut templates = Handlebars::new(); templates.register_template_string("command_topic", &conf.command_topic)?; + // get client id, this will generate a random client_id when no client_id has been + // configured. + let client_id = if conf.client_id.is_empty() { + let mut rnd = rand::thread_rng(); + let client_id: u64 = rnd.gen(); + format!("{:x}", client_id) + } else { + conf.client_id.clone() + }; + + // Create subscribe channel + // This is needed as we can't subscribe within the set_connected_callback as this would + // block the callback (we want to wait for success or error), which would create a + // deadlock. We need to re-subscribe on (re)connect to be sure we have a subscription. Even + // in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the + // session and that a re-connect would recover the subscription. + let (subscribe_tx, mut subscribe_rx) = mpsc::channel(10); + // create client let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(&conf.server) - .client_id(&conf.client_id) + .client_id(&client_id) .user_data(Box::new(MqttContext { region_name: region_name.to_string(), region_common_name, })) .finalize(); let mut client = mqtt::AsyncClient::new(create_opts).context("Create MQTT client")?; - client.set_connected_callback(connected_callback); - client.set_connection_lost_callback(connection_lost_callback); + client.set_connected_callback(move |client| { + let ctx = client + .user_data() + .unwrap() + .downcast_ref::() + .unwrap(); + + info!(region_name = %ctx.region_name, "Connected to MQTT broker"); + + if let Err(e) = subscribe_tx.try_send(()) { + error!(region_name = %ctx.region_name, error = %e, "Send to subscribe channel error"); + } + }); + client.set_connection_lost_callback(|client| { + let ctx = client + .user_data() + .unwrap() + .downcast_ref::() + .unwrap(); + + info!(region_name = %ctx.region_name, "MQTT connection to broker lost"); + }); // connection options let mut conn_opts_b = mqtt::ConnectOptionsBuilder::new(); @@ -142,24 +182,13 @@ impl<'a> MqttBackend<'a> { }; // connect - info!( - server_uri = conf.server.as_str(), - "Connecting to MQTT broker" - ); + info!(region_name = %region_name, server_uri = %conf.server, clean_session = conf.clean_session, client_id = %client_id, "Connecting to MQTT broker"); b.client .connect(conn_opts) .await .context("Connect to MQTT broker")?; - info!( - event_topic = conf.event_topic.as_str(), - "Subscribing to gateway event topic" - ); - b.client - .subscribe(&conf.event_topic, conf.qos as i32) - .await - .context("MQTT subscribe error")?; - + // Consumer loop. tokio::spawn({ let region_name = region_name.to_string(); @@ -173,6 +202,23 @@ impl<'a> MqttBackend<'a> { } }); + // (Re)subscribe loop. + tokio::spawn({ + let region_name = region_name.to_string(); + let event_topic = conf.event_topic.clone(); + let client = b.client.clone(); + let qos = conf.qos as i32; + + async move { + while subscribe_rx.recv().await.is_some() { + info!(region_name = %region_name, event_topic = %event_topic, "Subscribing to gateway event topic"); + if let Err(e) = client.subscribe(&event_topic, qos).await { + error!(region_name = %region_name, event_topic = %event_topic, error = %e, "MQTT subscribe error"); + } + } + } + }); + // return backend Ok(b) } @@ -315,32 +361,6 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg } } -fn connected_callback(client: &mqtt::AsyncClient) { - let ctx = client - .user_data() - .unwrap() - .downcast_ref::() - .unwrap(); - - info!( - region_name = ctx.region_name.as_str(), - "Connected to MQTT broker" - ); -} - -fn connection_lost_callback(client: &mqtt::AsyncClient) { - let ctx = client - .user_data() - .unwrap() - .downcast_ref::() - .unwrap(); - - info!( - region_name = ctx.region_name.as_str(), - "MQTT connection to broker lost" - ); -} - async fn is_locked(key: String) -> Result { task::spawn_blocking({ move || -> Result { diff --git a/chirpstack/src/integration/mqtt.rs b/chirpstack/src/integration/mqtt.rs index 1cc4c985..cef57708 100644 --- a/chirpstack/src/integration/mqtt.rs +++ b/chirpstack/src/integration/mqtt.rs @@ -8,8 +8,10 @@ use futures::stream::StreamExt; use handlebars::Handlebars; use paho_mqtt as mqtt; use prost::Message; +use rand::Rng; use regex::Regex; use serde::Serialize; +use tokio::sync::mpsc; use tracing::{error, info}; use super::Integration as IntegrationTrait; @@ -48,14 +50,48 @@ impl<'a> Integration<'a> { templates.register_template_string("event_topic", &conf.event_topic)?; templates.register_template_string("command_topic", &conf.command_topic)?; + let command_topic = templates.render( + "command_topic", + &CommandTopicContext { + application_id: "+".into(), + dev_eui: "+".into(), + command: "+".into(), + }, + )?; + + // get client id, this will generate a random client_id when no client_id has been + // configured. + let client_id = if conf.client_id.is_empty() { + let mut rnd = rand::thread_rng(); + let client_id: u64 = rnd.gen(); + format!("{:x}", client_id) + } else { + conf.client_id.clone() + }; + + // Create subscribe channel + // This is needed as we can't subscribe within the set_connected_callback as this would + // block the callback (we want to wait for success or error), which would create a + // deadlock. We need to re-subscribe on (re)connect to be sure we have a subscription. Even + // in case of a persistent MQTT session, there is no guarantee that the MQTT persisted the + // session and that a re-connect would recover the subscription. + let (subscribe_tx, mut subscribe_rx) = mpsc::channel(10); + // create client let create_opts = mqtt::CreateOptionsBuilder::new() .server_uri(&conf.server) - .client_id(&conf.client_id) + .client_id(&client_id) .finalize(); let mut client = mqtt::AsyncClient::new(create_opts).context("Create MQTT client")?; - client.set_connected_callback(connected_callback); - client.set_connection_lost_callback(connection_lost_callback); + client.set_connected_callback(move |_client| { + info!("Connected to MQTT broker"); + if let Err(e) = subscribe_tx.try_send(()) { + error!(error = %e, "Send to subscribe channel error"); + } + }); + client.set_connection_lost_callback(|_client| { + error!("MQTT connection to broker lost"); + }); // connection options let mut conn_opts_b = mqtt::ConnectOptionsBuilder::new(); @@ -114,29 +150,13 @@ impl<'a> Integration<'a> { }; // connect - info!(server_uri = %conf.server, "Connecting to MQTT broker"); + info!(server_uri = %conf.server, client_id = %client_id, clean_session = conf.clean_session, "Connecting to MQTT broker"); i.client .connect(conn_opts) .await .context("Connect to MQTT broker")?; - let command_topic = i.templates.render( - "command_topic", - &CommandTopicContext { - application_id: "+".into(), - dev_eui: "+".into(), - command: "+".into(), - }, - )?; - info!( - command_topic = %command_topic, - "Subscribing to command topic" - ); - i.client - .subscribe(&command_topic, conf.qos as i32) - .await - .context("MQTT subscribe")?; - + // Command consume loop. tokio::spawn({ let command_regex = i.command_regex.clone(); @@ -169,6 +189,21 @@ impl<'a> Integration<'a> { } }); + // (Re)subscribe loop. + tokio::spawn({ + let client = i.client.clone(); + let qos = conf.qos as i32; + + async move { + while subscribe_rx.recv().await.is_some() { + info!(command_topic = %command_topic, "Subscribing to command topic"); + if let Err(e) = client.subscribe(&command_topic, qos).await { + error!(error = %e, "MQTT subscribe error"); + } + } + } + }); + // Return integration. Ok(i) } @@ -349,14 +384,6 @@ impl IntegrationTrait for Integration<'_> { } } -fn connected_callback(_: &mqtt::AsyncClient) { - info!("Connected to MQTT broker"); -} - -fn connection_lost_callback(_: &mqtt::AsyncClient) { - info!("Connection to MQTT broker lost"); -} - async fn message_callback( application_id: String, dev_eui: String,