diff --git a/api/proto/gw/gw.proto b/api/proto/gw/gw.proto index 490b77a5..809693a1 100644 --- a/api/proto/gw/gw.proto +++ b/api/proto/gw/gw.proto @@ -104,6 +104,61 @@ enum TxAckStatus { DUTY_CYCLE_OVERFLOW = 11; } +// Gateway events as reported by the ChirpStack Concentratord ZMQ interface. +message Event { + oneof event { + // Uplink frame. + UplinkFrame uplink_frame = 1; + + // Gateway stats. + GatewayStats gateway_stats = 2; + + // Gateway Mesh Event. + Mesh mesh = 3; + } +} + +// Commands that can be sent to the ChirpStack Concentratord ZMQ interface. +message Command { + oneof command { + // Downlink frame. + DownlinkFrame send_downlink_frame = 1; + + // Gateway configuration. + GatewayConfiguration set_gateway_configuration = 2; + + // Get Gateway ID. + GetGatewayIdRequest get_gateway_id = 3; + + // Get location. + GetLocationRequest get_location = 4; + } +} + +message Mesh { + // Gateway ID (of the Border Gateway). + string gateway_id = 1; + + // Relay ID. + string relay_id = 2; + + // Timestamp (second precision). + google.protobuf.Timestamp time = 3; + + // Mesh events. + repeated MeshEvent events = 4; +} + +message MeshEvent { + oneof event { + // Proprietary Mesh event. + MeshEventProprietary proprietary = 1; + + // Mesh heartbeat. + MeshEventHeartbeat heartbeat = 2; + } +} + message Modulation { oneof parameters { // LoRa modulation information. @@ -611,6 +666,23 @@ message GatewayConfiguration { google.protobuf.Duration stats_interval = 4; } +message GetGatewayIdRequest {} + +message GetGatewayIdResponse { + // Gateway ID. + string gateway_id = 1; +} + +message GetLocationRequest {} + +message GetLocationResponse { + // Location. + common.Location location = 1; + + // Last updated at. + google.protobuf.Timestamp updated_at = 2; +} + message ChannelConfiguration { // Frequency (Hz). uint32 frequency = 1; @@ -751,21 +823,13 @@ message ConnState { } // Gateway Mesh heartbeat (sent periodically by the Relay Gateways). -message MeshHeartbeat { - // Gateway ID (of the Border Gateway). - string gateway_id = 1; - - // Relay ID. - string relay_id = 2; - - // Timestamp (second precision). - google.protobuf.Timestamp time = 3; +message MeshEventHeartbeat { // Relay path. - repeated MeshHeartbeatRelayPath relay_path = 4; + repeated MeshEventHeartbeatRelayPath relay_path = 4; } -message MeshHeartbeatRelayPath { +message MeshEventHeartbeatRelayPath { // Relay ID. string relay_id = 1; @@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath { // SNR. int32 snr = 3; } + +// Proprietary mesh event. +message MeshEventProprietary { + // Event type. + uint32 event_type = 1; + + // Payload. + bytes payload = 2; +} diff --git a/api/rust/proto/chirpstack/gw/gw.proto b/api/rust/proto/chirpstack/gw/gw.proto index 490b77a5..809693a1 100644 --- a/api/rust/proto/chirpstack/gw/gw.proto +++ b/api/rust/proto/chirpstack/gw/gw.proto @@ -104,6 +104,61 @@ enum TxAckStatus { DUTY_CYCLE_OVERFLOW = 11; } +// Gateway events as reported by the ChirpStack Concentratord ZMQ interface. +message Event { + oneof event { + // Uplink frame. + UplinkFrame uplink_frame = 1; + + // Gateway stats. + GatewayStats gateway_stats = 2; + + // Gateway Mesh Event. + Mesh mesh = 3; + } +} + +// Commands that can be sent to the ChirpStack Concentratord ZMQ interface. +message Command { + oneof command { + // Downlink frame. + DownlinkFrame send_downlink_frame = 1; + + // Gateway configuration. + GatewayConfiguration set_gateway_configuration = 2; + + // Get Gateway ID. + GetGatewayIdRequest get_gateway_id = 3; + + // Get location. + GetLocationRequest get_location = 4; + } +} + +message Mesh { + // Gateway ID (of the Border Gateway). + string gateway_id = 1; + + // Relay ID. + string relay_id = 2; + + // Timestamp (second precision). + google.protobuf.Timestamp time = 3; + + // Mesh events. + repeated MeshEvent events = 4; +} + +message MeshEvent { + oneof event { + // Proprietary Mesh event. + MeshEventProprietary proprietary = 1; + + // Mesh heartbeat. + MeshEventHeartbeat heartbeat = 2; + } +} + message Modulation { oneof parameters { // LoRa modulation information. @@ -611,6 +666,23 @@ message GatewayConfiguration { google.protobuf.Duration stats_interval = 4; } +message GetGatewayIdRequest {} + +message GetGatewayIdResponse { + // Gateway ID. + string gateway_id = 1; +} + +message GetLocationRequest {} + +message GetLocationResponse { + // Location. + common.Location location = 1; + + // Last updated at. + google.protobuf.Timestamp updated_at = 2; +} + message ChannelConfiguration { // Frequency (Hz). uint32 frequency = 1; @@ -751,21 +823,13 @@ message ConnState { } // Gateway Mesh heartbeat (sent periodically by the Relay Gateways). -message MeshHeartbeat { - // Gateway ID (of the Border Gateway). - string gateway_id = 1; - - // Relay ID. - string relay_id = 2; - - // Timestamp (second precision). - google.protobuf.Timestamp time = 3; +message MeshEventHeartbeat { // Relay path. - repeated MeshHeartbeatRelayPath relay_path = 4; + repeated MeshEventHeartbeatRelayPath relay_path = 4; } -message MeshHeartbeatRelayPath { +message MeshEventHeartbeatRelayPath { // Relay ID. string relay_id = 1; @@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath { // SNR. int32 snr = 3; } + +// Proprietary mesh event. +message MeshEventProprietary { + // Event type. + uint32 event_type = 1; + + // Payload. + bytes payload = 2; +} diff --git a/api/rust/src/lib.rs b/api/rust/src/lib.rs index 3c79fc71..6533e1a8 100644 --- a/api/rust/src/lib.rs +++ b/api/rust/src/lib.rs @@ -1,14 +1,17 @@ +pub use prost; +pub use prost_types; + #[cfg(feature = "json")] pub use pbjson_types; -pub use prost; #[cfg(feature = "api")] pub use tonic; #[cfg(feature = "api")] pub mod api; +#[cfg(feature = "internal")] +pub mod internal; + pub mod common; pub mod gw; pub mod integration; -#[cfg(feature = "internal")] -pub mod internal; pub mod stream; diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index 0017c8f2..1cb9a066 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::io::Cursor; use std::sync::{LazyLock, RwLock}; use std::time::Duration; @@ -352,7 +351,7 @@ async fn message_callback( .inc(); let mut event = match json { true => serde_json::from_slice(&p.payload)?, - false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(&p.payload))?, + false => chirpstack_api::gw::UplinkFrame::decode(p.payload.as_ref())?, }; if v4_migrate { @@ -377,7 +376,7 @@ async fn message_callback( .inc(); let mut event = match json { true => serde_json::from_slice(&p.payload)?, - false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(&p.payload))?, + false => chirpstack_api::gw::GatewayStats::decode(p.payload.as_ref())?, }; if v4_migrate { @@ -401,7 +400,7 @@ async fn message_callback( .inc(); let mut event = match json { true => serde_json::from_slice(&p.payload)?, - false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(&p.payload))?, + false => chirpstack_api::gw::DownlinkTxAck::decode(p.payload.as_ref())?, }; if v4_migrate { @@ -410,18 +409,18 @@ async fn message_callback( set_gateway_json(&event.gateway_id, json); tokio::spawn(downlink::tx_ack::TxAck::handle(event)); - } else if topic.ends_with("/mesh-heartbeat") { + } else if topic.ends_with("/mesh") { EVENT_COUNTER .get_or_create(&EventLabels { - event: "mesh-heartbeat".to_string(), + event: "mesh".to_string(), }) .inc(); let event = match json { true => serde_json::from_slice(&p.payload)?, - false => chirpstack_api::gw::MeshHeartbeat::decode(&mut Cursor::new(&p.payload))?, + false => chirpstack_api::gw::Mesh::decode(p.payload.as_ref())?, }; - tokio::spawn(uplink::mesh::MeshHeartbeat::handle(event)); + tokio::spawn(uplink::mesh::Mesh::handle(event)); } else { return Err(anyhow!("Unknown event type")); } diff --git a/chirpstack/src/uplink/mesh.rs b/chirpstack/src/uplink/mesh.rs index e9a2fb98..78eb199b 100644 --- a/chirpstack/src/uplink/mesh.rs +++ b/chirpstack/src/uplink/mesh.rs @@ -14,14 +14,15 @@ use crate::storage::{ }; use lrwn::EUI64; -pub struct MeshHeartbeat { +pub struct Mesh { gateway_id: EUI64, relay_id: RelayId, - mesh_stats: gw::MeshHeartbeat, + time: DateTime, + mesh_event: gw::Mesh, } -impl MeshHeartbeat { - pub async fn handle(s: gw::MeshHeartbeat) { +impl Mesh { + pub async fn handle(s: gw::Mesh) { let gateway_id = match EUI64::from_str(&s.gateway_id) { Ok(v) => v, Err(e) => { @@ -38,9 +39,9 @@ impl MeshHeartbeat { } }; - let span = span!(Level::INFO, "mesh_stats", gateway_id = %gateway_id, relay_id = %relay_id); + let span = span!(Level::INFO, "mesh", gateway_id = %gateway_id, relay_id = %relay_id); - if let Err(e) = MeshHeartbeat::_handle(gateway_id, relay_id, s) + if let Err(e) = Mesh::_handle(gateway_id, relay_id, s) .instrument(span) .await { @@ -48,52 +49,61 @@ impl MeshHeartbeat { Some(Error::NotFound(_)) => { let conf = config::get(); if !conf.gateway.allow_unknown_gateways { - error!(error = %e.full(), "Handle mesh-stats error"); + error!(error = %e.full(), "Handle mesh error"); } } Some(_) | None => { - error!(error = %e.full(), "Handle mesh-stats error"); + error!(error = %e.full(), "Handle mesh error"); } } } } - async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::MeshHeartbeat) -> Result<()> { - let mut ctx = MeshHeartbeat { + async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::Mesh) -> Result<()> { + let ctx = Mesh { gateway_id, relay_id, - mesh_stats: s, + time: s + .time + .ok_or_else(|| anyhow!("Time field is empty"))? + .try_into() + .map_err(|e| anyhow!("Covert time error: {}", e))?, + mesh_event: s, }; - ctx.update_or_create_relay_gateway().await?; + ctx.handle_events().await?; Ok(()) } - async fn update_or_create_relay_gateway(&mut self) -> Result<()> { - trace!("Getting Border Gateway"); - let border_gw = gateway::get(&self.gateway_id).await?; + async fn handle_events(&self) -> Result<()> { + trace!("Handling mesh events"); - let ts: DateTime = match &self.mesh_stats.time { - Some(v) => (*v) - .try_into() - .map_err(|e| anyhow!("Convert time error: {}", e))?, - None => { - warn!("Stats message does not have time field set"); - return Ok(()); + for event in &self.mesh_event.events { + match &event.event { + Some(gw::mesh_event::Event::Proprietary(_)) | None => continue, + Some(gw::mesh_event::Event::Heartbeat(v)) => self._handle_heartbeat(v).await?, } - }; + } + + Ok(()) + } + + async fn _handle_heartbeat(&self, _pl: &gw::MeshEventHeartbeat) -> Result<()> { + trace!("Handling heartbeat event"); + + let border_gw = gateway::get(&self.gateway_id).await?; match gateway::get_relay_gateway(border_gw.tenant_id.into(), self.relay_id).await { Ok(mut v) => { if let Some(last_seen_at) = v.last_seen_at { - if last_seen_at > ts { - warn!("Time is less than last seen timestamp, ignoring stats"); + if last_seen_at > self.time { + warn!("Time is less than last seen timestamp, ignoring heartbeat"); return Ok(()); } } - v.last_seen_at = Some(ts); + v.last_seen_at = Some(self.time); v.region_config_id = border_gw .properties .get("region_config_id") @@ -106,7 +116,7 @@ impl MeshHeartbeat { tenant_id: border_gw.tenant_id, relay_id: self.relay_id, name: self.relay_id.to_string(), - last_seen_at: Some(ts), + last_seen_at: Some(self.time), ..Default::default() }) .await?;