Implement support for JSON for GW<>NS.

This auto-detects the encoding used by the gateway and stores this
setting such that the commands will be encoded using the same setting.

Closes #2.
This commit is contained in:
Orne Brocaar 2022-08-03 15:19:51 +01:00
parent 3348ccf67d
commit 672511c696

View File

@ -1,7 +1,9 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::env::temp_dir;
use std::hash::Hasher;
use std::io::Cursor;
use std::sync::RwLock;
use std::time::Duration;
use anyhow::{Context, Result};
@ -55,6 +57,7 @@ lazy_static! {
);
counter
};
static ref GATEWAY_JSON: RwLock<HashMap<String, bool>> = RwLock::new(HashMap::new());
}
struct MqttContext {
@ -247,9 +250,14 @@ impl GatewayBackend for MqttBackend<'_> {
let topic = self.get_command_topic(&df.gateway_id, "down")?;
let mut df = df.clone();
df.v4_migrate();
let b = df.encode_to_vec();
info!(gateway_id = %df.gateway_id, topic = %topic, "Sending downlink frame");
let json = gateway_is_json(&df.gateway_id);
let b = match json {
true => serde_json::to_vec(&df)?,
false => df.encode_to_vec(),
};
info!(gateway_id = %df.gateway_id, topic = %topic, json = json, "Sending downlink frame");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
trace!("Message sent");
@ -267,9 +275,13 @@ impl GatewayBackend for MqttBackend<'_> {
})
.inc();
let topic = self.get_command_topic(&gw_conf.gateway_id, "config")?;
let b = gw_conf.encode_to_vec();
let json = gateway_is_json(&gw_conf.gateway_id);
let b = match json {
true => serde_json::to_vec(&gw_conf)?,
false => gw_conf.encode_to_vec(),
};
info!(gateway_id = %gw_conf.gateway_id, topic = %topic, "Sending gateway configuration");
info!(gateway_id = %gw_conf.gateway_id, topic = %topic, json = json, "Sending gateway configuration");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
trace!("Message sent");
@ -299,10 +311,13 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
return Ok(());
}
let json = payload_is_json(&b);
info!(
region_name = region_name,
topic = topic,
qos = qos,
json = json,
"Message received from gateway"
);
@ -312,10 +327,14 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
event: "up".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(b))?;
let mut event = match json {
true => serde_json::from_slice(&b)?,
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
if let Some(rx_info) = &mut event.rx_info {
set_gateway_json(&rx_info.gateway_id, json);
rx_info.set_metadata_string("region_name", region_name);
rx_info.set_metadata_string("region_common_name", &region_common_name.to_string());
}
@ -327,7 +346,10 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
event: "stats".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(b))?;
let mut event = match json {
true => serde_json::from_slice(&b)?,
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
event
.meta_data
@ -336,6 +358,7 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
"region_common_name".to_string(),
region_common_name.to_string(),
);
set_gateway_json(&event.gateway_id, json);
tokio::spawn(uplink::stats::Stats::handle(event));
} else if topic.ends_with("/ack") {
EVENT_COUNTER
@ -343,8 +366,12 @@ async fn message_callback(region_name: &str, region_common_name: CommonName, msg
event: "ack".to_string(),
})
.inc();
let mut event = chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(b))?;
let mut event = match json {
true => serde_json::from_slice(&b)?,
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
set_gateway_json(&event.gateway_id, json);
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
} else {
return Err(anyhow!("Unknown event type"));
@ -382,3 +409,17 @@ async fn is_locked(key: String) -> Result<bool> {
})
.await?
}
fn gateway_is_json(gateway_id: &str) -> bool {
let gw_json_r = GATEWAY_JSON.read().unwrap();
gw_json_r.get(gateway_id).cloned().unwrap_or(false)
}
fn set_gateway_json(gateway_id: &str, is_json: bool) {
let mut gw_json_w = GATEWAY_JSON.write().unwrap();
gw_json_w.insert(gateway_id.to_string(), is_json);
}
fn payload_is_json(b: &[u8]) -> bool {
String::from_utf8_lossy(b).contains("gatewayId")
}