Remove mqtt state topic.

Adding this was probably not a good idea. For a number of reasons:

Persisted messages do not expire, so the database of messages will grow
over time, even when devices are removed.

Other integrations have nothing similar.

Decoded uplink events will probably data that could be seen as a certain
state (e.g. parking spot occupied or not), but are published as events.
This commit is contained in:
Orne Brocaar 2022-07-11 13:59:54 +01:00
parent 9ecfbab5b5
commit 8350494846
3 changed files with 10 additions and 57 deletions

View File

@ -242,12 +242,6 @@ pub fn run() {
# Event topic template.
event_topic="{{ integration.mqtt.event_topic }}"
# State topic template.
#
# Events that expose a certain state of the device, are published as retained messages
# to the state topic.
state_topic="{{ integration.mqtt.state_topic }}"
# Command topic.
#
# This is the topic on which the MQTT subscribes for receiving (enqueue) commands.
@ -345,7 +339,7 @@ pub fn run() {
event_routing_key="{{ integration.amqp.event_routing_key }}"
# Use JSON encoding instead of Protobuf (binary).
json={{ integration.mqtt.json }}
json={{ integration.amqp.json }}
# Codec configuration.

View File

@ -251,7 +251,6 @@ pub struct Integration {
pub struct MqttIntegration {
pub client: MqttIntegrationClient,
pub event_topic: String,
pub state_topic: String,
pub command_topic: String,
pub json: bool,
pub server: String,
@ -270,7 +269,6 @@ impl Default for MqttIntegration {
MqttIntegration {
client: Default::default(),
event_topic: "application/{{application_id}}/device/{{dev_eui}}/event/{{event}}".into(),
state_topic: "application/{{application_id}}/device/{{dev_eui}}/state/{{state}}".into(),
command_topic: "application/{{application_id}}/device/{{dev_eui}}/command/{{command}}"
.into(),
json: true,

View File

@ -31,13 +31,6 @@ struct EventTopicContext {
pub event: String,
}
#[derive(Serialize)]
struct StateTopicContext {
pub application_id: String,
pub dev_eui: String,
pub state: String,
}
#[derive(Serialize)]
struct CommandTopicContext {
pub application_id: String,
@ -53,7 +46,6 @@ impl<'a> Integration<'a> {
let mut templates = Handlebars::new();
templates.register_escape_fn(handlebars::no_escape);
templates.register_template_string("event_topic", &conf.event_topic)?;
templates.register_template_string("state_topic", &conf.state_topic)?;
templates.register_template_string("command_topic", &conf.command_topic)?;
// create client
@ -192,30 +184,12 @@ impl<'a> Integration<'a> {
)?)
}
fn get_state_topic(&self, application_id: &str, dev_eui: &str, state: &str) -> Result<String> {
Ok(self.templates.render(
"state_topic",
&StateTopicContext {
application_id: application_id.to_string(),
dev_eui: dev_eui.to_string(),
state: state.to_string(),
},
)?)
}
async fn publish_event(&self, topic: &str, b: &[u8]) -> Result<()> {
info!(topic = %topic, "Publishing event");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
Ok(())
}
async fn publish_state(&self, topic: &str, b: &[u8]) -> Result<()> {
info!(topic = %topic, "Publishing state");
let msg = mqtt::Message::new_retained(topic, b, self.qos as i32);
self.client.publish(msg).await?;
Ok(())
}
}
#[async_trait]
@ -249,13 +223,13 @@ impl IntegrationTrait for Integration<'_> {
.as_ref()
.ok_or(anyhow!("device_info is None"))?;
let topic = self.get_state_topic(&dev_info.application_id, &dev_info.dev_eui, "join")?;
let topic = self.get_event_topic(&dev_info.application_id, &dev_info.dev_eui, "join")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_state(&topic, &b).await
self.publish_event(&topic, &b).await
}
async fn ack_event(
@ -325,13 +299,13 @@ impl IntegrationTrait for Integration<'_> {
.as_ref()
.ok_or(anyhow!("device_info is None"))?;
let topic = self.get_state_topic(&dev_info.application_id, &dev_info.dev_eui, "status")?;
let topic = self.get_event_topic(&dev_info.application_id, &dev_info.dev_eui, "status")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_state(&topic, &b).await
self.publish_event(&topic, &b).await
}
async fn location_event(
@ -345,13 +319,13 @@ impl IntegrationTrait for Integration<'_> {
.ok_or(anyhow!("device_info is None"))?;
let topic =
self.get_state_topic(&dev_info.application_id, &dev_info.dev_eui, "location")?;
self.get_event_topic(&dev_info.application_id, &dev_info.dev_eui, "location")?;
let b = match self.json {
true => serde_json::to_vec(&pl)?,
false => pl.encode_to_vec(),
};
self.publish_state(&topic, &b).await
self.publish_event(&topic, &b).await
}
async fn integration_event(
@ -482,7 +456,6 @@ pub mod test {
// setup of integration and MQTT client
let conf = MqttIntegration {
event_topic: "application/{{application_id}}/device/{{dev_eui}}/event/{{event}}".into(),
state_topic: "application/{{application_id}}/device/{{dev_eui}}/state/{{state}}".into(),
json: true,
server: "tcp://mosquitto:1883/".into(),
clean_session: true,
@ -500,11 +473,6 @@ pub mod test {
let mut stream = client.get_stream(10);
client.connect(conn_opts).await.unwrap();
// remove retained messages by sending empty payloads
client.publish(mqtt::Message::new_retained("application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/join", vec![], mqtt::QOS_0)).await.unwrap();
client.publish(mqtt::Message::new_retained("application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/status", vec![], mqtt::QOS_0)).await.unwrap();
client.publish(mqtt::Message::new_retained("application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/location", vec![], mqtt::QOS_0)).await.unwrap();
client
.subscribe(
"application/00000000-0000-0000-0000-000000000000/device/+/event/+",
@ -512,13 +480,6 @@ pub mod test {
)
.await
.unwrap();
client
.subscribe(
"application/00000000-0000-0000-0000-000000000000/device/+/state/+",
mqtt::QOS_0,
)
.await
.unwrap();
// uplink event
let pl = integration::UplinkEvent {
@ -549,7 +510,7 @@ pub mod test {
i.join_event(&HashMap::new(), &pl).await.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/join",
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/event/join",
msg.topic()
);
assert_eq!(serde_json::to_string(&pl).unwrap(), msg.payload_str());
@ -617,7 +578,7 @@ pub mod test {
i.status_event(&HashMap::new(), &pl).await.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/status",
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/event/status",
msg.topic()
);
assert_eq!(serde_json::to_string(&pl).unwrap(), msg.payload_str());
@ -634,7 +595,7 @@ pub mod test {
i.location_event(&HashMap::new(), &pl).await.unwrap();
let msg = stream.next().await.unwrap().unwrap();
assert_eq!(
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/state/location",
"application/00000000-0000-0000-0000-000000000000/device/0102030405060708/event/location",
msg.topic()
);
assert_eq!(serde_json::to_string(&pl).unwrap(), msg.payload_str());