Update gw proto & refactor Mesh Heartbeat event.

This refactors the gateway protobuf payloads, such that the
Concentratord can publish an Event message, containing one of the
possible events published by the Concentratord (uplink, stats or mesh
event). It also combines the possible Concentratord commands into a
single Command message. This simplifies the ZMQ interface as it is no
longer needed to match the payload type by string.

This also refactors the MeshHeartbeat message into a Mesh message, which
can contain multiple events, of which the Heartbeat is one of the
possible events.

The future goal is to make it possible to send different types of events
from the Gateway Mesh Relay gateways (e.g. battery status, ...) and to
make it possible to also send proprietary event types.
This commit is contained in:
Orne Brocaar 2025-05-08 14:29:15 +01:00
parent d002f5c97b
commit bb53821aef
5 changed files with 218 additions and 60 deletions

95
api/proto/gw/gw.proto vendored
View File

@ -104,6 +104,61 @@ enum TxAckStatus {
DUTY_CYCLE_OVERFLOW = 11;
}
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
message Event {
oneof event {
// Uplink frame.
UplinkFrame uplink_frame = 1;
// Gateway stats.
GatewayStats gateway_stats = 2;
// Gateway Mesh Event.
Mesh mesh = 3;
}
}
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
message Command {
oneof command {
// Downlink frame.
DownlinkFrame send_downlink_frame = 1;
// Gateway configuration.
GatewayConfiguration set_gateway_configuration = 2;
// Get Gateway ID.
GetGatewayIdRequest get_gateway_id = 3;
// Get location.
GetLocationRequest get_location = 4;
}
}
message Mesh {
// Gateway ID (of the Border Gateway).
string gateway_id = 1;
// Relay ID.
string relay_id = 2;
// Timestamp (second precision).
google.protobuf.Timestamp time = 3;
// Mesh events.
repeated MeshEvent events = 4;
}
message MeshEvent {
oneof event {
// Proprietary Mesh event.
MeshEventProprietary proprietary = 1;
// Mesh heartbeat.
MeshEventHeartbeat heartbeat = 2;
}
}
message Modulation {
oneof parameters {
// LoRa modulation information.
@ -611,6 +666,23 @@ message GatewayConfiguration {
google.protobuf.Duration stats_interval = 4;
}
message GetGatewayIdRequest {}
message GetGatewayIdResponse {
// Gateway ID.
string gateway_id = 1;
}
message GetLocationRequest {}
message GetLocationResponse {
// Location.
common.Location location = 1;
// Last updated at.
google.protobuf.Timestamp updated_at = 2;
}
message ChannelConfiguration {
// Frequency (Hz).
uint32 frequency = 1;
@ -751,21 +823,13 @@ message ConnState {
}
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
message MeshHeartbeat {
// Gateway ID (of the Border Gateway).
string gateway_id = 1;
// Relay ID.
string relay_id = 2;
// Timestamp (second precision).
google.protobuf.Timestamp time = 3;
message MeshEventHeartbeat {
// Relay path.
repeated MeshHeartbeatRelayPath relay_path = 4;
repeated MeshEventHeartbeatRelayPath relay_path = 4;
}
message MeshHeartbeatRelayPath {
message MeshEventHeartbeatRelayPath {
// Relay ID.
string relay_id = 1;
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
// SNR.
int32 snr = 3;
}
// Proprietary mesh event.
message MeshEventProprietary {
// Event type.
uint32 event_type = 1;
// Payload.
bytes payload = 2;
}

View File

@ -104,6 +104,61 @@ enum TxAckStatus {
DUTY_CYCLE_OVERFLOW = 11;
}
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
message Event {
oneof event {
// Uplink frame.
UplinkFrame uplink_frame = 1;
// Gateway stats.
GatewayStats gateway_stats = 2;
// Gateway Mesh Event.
Mesh mesh = 3;
}
}
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
message Command {
oneof command {
// Downlink frame.
DownlinkFrame send_downlink_frame = 1;
// Gateway configuration.
GatewayConfiguration set_gateway_configuration = 2;
// Get Gateway ID.
GetGatewayIdRequest get_gateway_id = 3;
// Get location.
GetLocationRequest get_location = 4;
}
}
message Mesh {
// Gateway ID (of the Border Gateway).
string gateway_id = 1;
// Relay ID.
string relay_id = 2;
// Timestamp (second precision).
google.protobuf.Timestamp time = 3;
// Mesh events.
repeated MeshEvent events = 4;
}
message MeshEvent {
oneof event {
// Proprietary Mesh event.
MeshEventProprietary proprietary = 1;
// Mesh heartbeat.
MeshEventHeartbeat heartbeat = 2;
}
}
message Modulation {
oneof parameters {
// LoRa modulation information.
@ -611,6 +666,23 @@ message GatewayConfiguration {
google.protobuf.Duration stats_interval = 4;
}
message GetGatewayIdRequest {}
message GetGatewayIdResponse {
// Gateway ID.
string gateway_id = 1;
}
message GetLocationRequest {}
message GetLocationResponse {
// Location.
common.Location location = 1;
// Last updated at.
google.protobuf.Timestamp updated_at = 2;
}
message ChannelConfiguration {
// Frequency (Hz).
uint32 frequency = 1;
@ -751,21 +823,13 @@ message ConnState {
}
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
message MeshHeartbeat {
// Gateway ID (of the Border Gateway).
string gateway_id = 1;
// Relay ID.
string relay_id = 2;
// Timestamp (second precision).
google.protobuf.Timestamp time = 3;
message MeshEventHeartbeat {
// Relay path.
repeated MeshHeartbeatRelayPath relay_path = 4;
repeated MeshEventHeartbeatRelayPath relay_path = 4;
}
message MeshHeartbeatRelayPath {
message MeshEventHeartbeatRelayPath {
// Relay ID.
string relay_id = 1;
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
// SNR.
int32 snr = 3;
}
// Proprietary mesh event.
message MeshEventProprietary {
// Event type.
uint32 event_type = 1;
// Payload.
bytes payload = 2;
}

9
api/rust/src/lib.rs vendored
View File

@ -1,14 +1,17 @@
pub use prost;
pub use prost_types;
#[cfg(feature = "json")]
pub use pbjson_types;
pub use prost;
#[cfg(feature = "api")]
pub use tonic;
#[cfg(feature = "api")]
pub mod api;
#[cfg(feature = "internal")]
pub mod internal;
pub mod common;
pub mod gw;
pub mod integration;
#[cfg(feature = "internal")]
pub mod internal;
pub mod stream;

View File

@ -1,5 +1,4 @@
use std::collections::HashMap;
use std::io::Cursor;
use std::sync::{LazyLock, RwLock};
use std::time::Duration;
@ -352,7 +351,7 @@ async fn message_callback(
.inc();
let mut event = match json {
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(&p.payload))?,
false => chirpstack_api::gw::UplinkFrame::decode(p.payload.as_ref())?,
};
if v4_migrate {
@ -377,7 +376,7 @@ async fn message_callback(
.inc();
let mut event = match json {
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(&p.payload))?,
false => chirpstack_api::gw::GatewayStats::decode(p.payload.as_ref())?,
};
if v4_migrate {
@ -401,7 +400,7 @@ async fn message_callback(
.inc();
let mut event = match json {
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(&p.payload))?,
false => chirpstack_api::gw::DownlinkTxAck::decode(p.payload.as_ref())?,
};
if v4_migrate {
@ -410,18 +409,18 @@ async fn message_callback(
set_gateway_json(&event.gateway_id, json);
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
} else if topic.ends_with("/mesh-heartbeat") {
} else if topic.ends_with("/mesh") {
EVENT_COUNTER
.get_or_create(&EventLabels {
event: "mesh-heartbeat".to_string(),
event: "mesh".to_string(),
})
.inc();
let event = match json {
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::MeshHeartbeat::decode(&mut Cursor::new(&p.payload))?,
false => chirpstack_api::gw::Mesh::decode(p.payload.as_ref())?,
};
tokio::spawn(uplink::mesh::MeshHeartbeat::handle(event));
tokio::spawn(uplink::mesh::Mesh::handle(event));
} else {
return Err(anyhow!("Unknown event type"));
}

View File

@ -14,14 +14,15 @@ use crate::storage::{
};
use lrwn::EUI64;
pub struct MeshHeartbeat {
pub struct Mesh {
gateway_id: EUI64,
relay_id: RelayId,
mesh_stats: gw::MeshHeartbeat,
time: DateTime<Utc>,
mesh_event: gw::Mesh,
}
impl MeshHeartbeat {
pub async fn handle(s: gw::MeshHeartbeat) {
impl Mesh {
pub async fn handle(s: gw::Mesh) {
let gateway_id = match EUI64::from_str(&s.gateway_id) {
Ok(v) => v,
Err(e) => {
@ -38,9 +39,9 @@ impl MeshHeartbeat {
}
};
let span = span!(Level::INFO, "mesh_stats", gateway_id = %gateway_id, relay_id = %relay_id);
let span = span!(Level::INFO, "mesh", gateway_id = %gateway_id, relay_id = %relay_id);
if let Err(e) = MeshHeartbeat::_handle(gateway_id, relay_id, s)
if let Err(e) = Mesh::_handle(gateway_id, relay_id, s)
.instrument(span)
.await
{
@ -48,52 +49,61 @@ impl MeshHeartbeat {
Some(Error::NotFound(_)) => {
let conf = config::get();
if !conf.gateway.allow_unknown_gateways {
error!(error = %e.full(), "Handle mesh-stats error");
error!(error = %e.full(), "Handle mesh error");
}
}
Some(_) | None => {
error!(error = %e.full(), "Handle mesh-stats error");
error!(error = %e.full(), "Handle mesh error");
}
}
}
}
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::MeshHeartbeat) -> Result<()> {
let mut ctx = MeshHeartbeat {
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::Mesh) -> Result<()> {
let ctx = Mesh {
gateway_id,
relay_id,
mesh_stats: s,
time: s
.time
.ok_or_else(|| anyhow!("Time field is empty"))?
.try_into()
.map_err(|e| anyhow!("Covert time error: {}", e))?,
mesh_event: s,
};
ctx.update_or_create_relay_gateway().await?;
ctx.handle_events().await?;
Ok(())
}
async fn update_or_create_relay_gateway(&mut self) -> Result<()> {
trace!("Getting Border Gateway");
let border_gw = gateway::get(&self.gateway_id).await?;
async fn handle_events(&self) -> Result<()> {
trace!("Handling mesh events");
let ts: DateTime<Utc> = match &self.mesh_stats.time {
Some(v) => (*v)
.try_into()
.map_err(|e| anyhow!("Convert time error: {}", e))?,
None => {
warn!("Stats message does not have time field set");
return Ok(());
for event in &self.mesh_event.events {
match &event.event {
Some(gw::mesh_event::Event::Proprietary(_)) | None => continue,
Some(gw::mesh_event::Event::Heartbeat(v)) => self._handle_heartbeat(v).await?,
}
};
}
Ok(())
}
async fn _handle_heartbeat(&self, _pl: &gw::MeshEventHeartbeat) -> Result<()> {
trace!("Handling heartbeat event");
let border_gw = gateway::get(&self.gateway_id).await?;
match gateway::get_relay_gateway(border_gw.tenant_id.into(), self.relay_id).await {
Ok(mut v) => {
if let Some(last_seen_at) = v.last_seen_at {
if last_seen_at > ts {
warn!("Time is less than last seen timestamp, ignoring stats");
if last_seen_at > self.time {
warn!("Time is less than last seen timestamp, ignoring heartbeat");
return Ok(());
}
}
v.last_seen_at = Some(ts);
v.last_seen_at = Some(self.time);
v.region_config_id = border_gw
.properties
.get("region_config_id")
@ -106,7 +116,7 @@ impl MeshHeartbeat {
tenant_id: border_gw.tenant_id,
relay_id: self.relay_id,
name: self.relay_id.to_string(),
last_seen_at: Some(ts),
last_seen_at: Some(self.time),
..Default::default()
})
.await?;