Disable v3-v4 GW<>NS message migration and make config option.

By making this opt-in, we can reduce the overhead of supporting both v3
and v4 gateway messages. In case of v3 to v4 migration, one would
upgrade all ChirpStack Gateway Bridge instances to the latest v3
version, migrate ChirpStack as described here + enable this config flag:
https://www.chirpstack.io/docs/v3-v4-migration.html

Then upgrade ChirpStack Gateway Bridge to the latest v4 version followed
by disabling / removing this config flag (`v4_migrate`) again.
This commit is contained in:
Orne Brocaar 2023-04-11 14:19:07 +01:00
parent 0a3fcaeb2d
commit 31e359b314
2 changed files with 26 additions and 5 deletions

View File

@ -637,6 +637,7 @@ pub struct GatewayBackendMqtt {
pub tls_key: String,
#[serde(with = "humantime_serde")]
pub keep_alive_interval: Duration,
pub v4_migrate: bool,
}
impl Default for GatewayBackendMqtt {
@ -655,6 +656,7 @@ impl Default for GatewayBackendMqtt {
tls_cert: "".into(),
tls_key: "".into(),
keep_alive_interval: Duration::from_secs(30),
v4_migrate: false,
}
}
}

View File

@ -69,6 +69,7 @@ pub struct MqttBackend<'a> {
client: mqtt::AsyncClient,
templates: handlebars::Handlebars<'a>,
qos: usize,
v4_migrate: bool,
}
#[derive(Serialize)]
@ -197,6 +198,7 @@ impl<'a> MqttBackend<'a> {
client,
templates,
qos: conf.qos,
v4_migrate: conf.v4_migrate,
};
// connect
@ -209,12 +211,14 @@ impl<'a> MqttBackend<'a> {
// Consumer loop.
tokio::spawn({
let region_config_id = region_config_id.to_string();
let v4_migrate = conf.v4_migrate;
async move {
info!("Starting MQTT consumer loop");
while let Some(msg_opt) = stream.next().await {
if let Some(msg) = msg_opt {
message_callback(&region_config_id, region_common_name, msg).await;
message_callback(v4_migrate, &region_config_id, region_common_name, msg)
.await;
}
}
}
@ -271,7 +275,10 @@ impl GatewayBackend for MqttBackend<'_> {
.inc();
let topic = self.get_command_topic(&df.gateway_id, "down")?;
let mut df = df.clone();
df.v4_migrate();
if self.v4_migrate {
df.v4_migrate();
}
let json = gateway_is_json(&df.gateway_id);
let b = match json {
@ -313,6 +320,7 @@ impl GatewayBackend for MqttBackend<'_> {
}
async fn message_callback(
v4_migrate: bool,
region_config_id: &str,
region_common_name: CommonName,
msg: mqtt::Message,
@ -357,7 +365,10 @@ async fn message_callback(
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
if v4_migrate {
event.v4_migrate();
}
if let Some(rx_info) = &mut event.rx_info {
set_gateway_json(&rx_info.gateway_id, json);
@ -381,7 +392,11 @@ async fn message_callback(
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
if v4_migrate {
event.v4_migrate();
}
event
.metadata
.insert("region_config_id".to_string(), region_config_id.to_string());
@ -401,7 +416,11 @@ async fn message_callback(
true => serde_json::from_slice(b)?,
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(b))?,
};
event.v4_migrate();
if v4_migrate {
event.v4_migrate();
}
set_gateway_json(&event.gateway_id, json);
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
} else {