From ecbfef26dbca6ca5d4ff21eb0cdc5b49604c9ef6 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 7 Apr 2022 21:29:13 +0100 Subject: [PATCH] Implement sending gw config for Concentratord based gws. --- chirpstack/src/codec/convert.rs | 2 +- chirpstack/src/config.rs | 43 +++++++++++ chirpstack/src/gateway/backend/mock.rs | 6 +- chirpstack/src/gateway/backend/mod.rs | 19 +++-- chirpstack/src/gateway/backend/mqtt.rs | 14 +++- chirpstack/src/main.rs | 20 +++--- chirpstack/src/region.rs | 6 ++ chirpstack/src/uplink/stats.rs | 99 ++++++++++++++++++++++++-- 8 files changed, 189 insertions(+), 20 deletions(-) diff --git a/chirpstack/src/codec/convert.rs b/chirpstack/src/codec/convert.rs index d5408070..f929ffa3 100644 --- a/chirpstack/src/codec/convert.rs +++ b/chirpstack/src/codec/convert.rs @@ -15,7 +15,7 @@ fn _rquickjs_to_struct_val(val: &rquickjs::Value) -> Option Some(pbjson_types::value::Kind::NumberValue( - val.as_float().unwrap().into(), + val.as_float().unwrap(), )), rquickjs::Type::String => Some(pbjson_types::value::Kind::StringValue( val.as_string().unwrap().to_string().unwrap(), diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 2d2a5d93..af812a77 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -372,6 +372,7 @@ impl Default for Region { }, gateway: RegionGateway { force_gws_private: false, + channels: vec![], backend: GatewayBackend { enabled: "mqtt".into(), mqtt: GatewayBackendMqtt { @@ -443,6 +444,7 @@ pub struct ExtraChannel { pub struct RegionGateway { pub force_gws_private: bool, pub backend: GatewayBackend, + pub channels: Vec, } #[derive(Default, Serialize, Deserialize, Clone)] @@ -468,6 +470,36 @@ pub struct GatewayBackendMqtt { pub tls_key: String, } +#[derive(Serialize, Deserialize, Clone, Hash)] +#[allow(non_camel_case_types)] +#[allow(clippy::upper_case_acronyms)] +pub enum GatewayChannelModulation { + LORA, + FSK, +} + +#[derive(Serialize, Deserialize, Clone, Hash)] +#[serde(default)] +pub struct GatewayChannel { + pub frequency: u32, + pub bandwidth: u32, + pub modulation: GatewayChannelModulation, + pub spreading_factors: Vec, + pub datarate: u32, +} + +impl Default for GatewayChannel { + fn default() -> Self { + GatewayChannel { + frequency: 0, + bandwidth: 0, + modulation: GatewayChannelModulation::LORA, + spreading_factors: vec![], + datarate: 0, + } + } +} + pub fn load(config_dir: &Path) -> Result<()> { let mut content: String = String::new(); @@ -523,6 +555,17 @@ pub fn get_region_network(region_name: &str) -> Result { Err(anyhow!("region_name not found")) } +pub fn get_region_gateway(region_name: &str) -> Result { + let conf = get(); + for region in &conf.regions { + if region.name == region_name { + return Ok(region.gateway.clone()); + } + } + + Err(anyhow!("region_name not found")) +} + pub fn get_required_snr_for_sf(sf: u8) -> Result { Ok(match sf { 6 => -5.0, diff --git a/chirpstack/src/gateway/backend/mock.rs b/chirpstack/src/gateway/backend/mock.rs index ec937be5..d54fdd21 100644 --- a/chirpstack/src/gateway/backend/mock.rs +++ b/chirpstack/src/gateway/backend/mock.rs @@ -26,7 +26,11 @@ impl GatewayBackend for Backend { Ok(()) } - async fn send_configuration(&self) -> Result<()> { + async fn send_configuration( + &self, + gw_conf: &chirpstack_api::gw::GatewayConfiguration, + ) -> Result<()> { + GATEWAY_CONFIGURATIONS.write().await.push(gw_conf.clone()); Ok(()) } } diff --git a/chirpstack/src/gateway/backend/mod.rs b/chirpstack/src/gateway/backend/mod.rs index 1b675547..119dffe9 100644 --- a/chirpstack/src/gateway/backend/mod.rs +++ b/chirpstack/src/gateway/backend/mod.rs @@ -3,7 +3,7 @@ use std::collections::HashMap; use anyhow::{Context, Result}; use async_trait::async_trait; use tokio::sync::RwLock; -use tracing::info; +use tracing::{info, warn}; use crate::config; @@ -19,7 +19,10 @@ lazy_static! { #[async_trait] pub trait GatewayBackend { async fn send_downlink(&self, df: &chirpstack_api::gw::DownlinkFrame) -> Result<()>; - async fn send_configuration(&self) -> Result<()>; + async fn send_configuration( + &self, + gw_conf: &chirpstack_api::gw::GatewayConfiguration, + ) -> Result<()>; } pub async fn setup() -> Result<()> { @@ -27,6 +30,11 @@ pub async fn setup() -> Result<()> { info!("Setting up gateway backends for the different regions"); for region in &conf.regions { + if !conf.network.enabled_regions.contains(®ion.name) { + warn!("Config exists, but region is not enabled. To enable it, add it to 'network.enabled_regions'"); + continue; + } + info!( region_name = %region.name, region_common_name = %region.common_name, @@ -66,13 +74,16 @@ pub async fn send_downlink( Ok(()) } -pub async fn send_configuration(region_name: &str) -> Result<()> { +pub async fn send_configuration( + region_name: &str, + gw_conf: &chirpstack_api::gw::GatewayConfiguration, +) -> Result<()> { let b_r = BACKENDS.read().await; let b = b_r .get(region_name) .ok_or_else(|| anyhow!("region_name '{}' does not exist in BACKENDS", region_name))?; - b.send_configuration().await?; + b.send_configuration(gw_conf).await?; Ok(()) } diff --git a/chirpstack/src/gateway/backend/mqtt.rs b/chirpstack/src/gateway/backend/mqtt.rs index 1f9140a7..953fe081 100644 --- a/chirpstack/src/gateway/backend/mqtt.rs +++ b/chirpstack/src/gateway/backend/mqtt.rs @@ -164,7 +164,19 @@ impl GatewayBackend for MqttBackend<'_> { Ok(()) } - async fn send_configuration(&self) -> Result<()> { + async fn send_configuration( + &self, + gw_conf: &chirpstack_api::gw::GatewayConfiguration, + ) -> Result<()> { + let gateway_id = EUI64::from_slice(&gw_conf.gateway_id)?; + let topic = self.get_command_topic(&gateway_id.to_string(), "config")?; + let b = gw_conf.encode_to_vec(); + + info!(gateway_id = %gateway_id, topic = %topic, "Sending gateway configuration"); + let msg = mqtt::Message::new(topic, b, self.qos as i32); + self.client.publish(msg).await?; + trace!("Message sent"); + Ok(()) } } diff --git a/chirpstack/src/main.rs b/chirpstack/src/main.rs index 29ef0eda..a8f3016a 100644 --- a/chirpstack/src/main.rs +++ b/chirpstack/src/main.rs @@ -61,15 +61,17 @@ async fn main() -> Result<()> { ) .subcommand(App::new("configfile").about("Print the configuration template")) .subcommand( - App::new("print-ds").about("Print the device-session").arg( - Arg::with_name("dev-eui") - .required(true) - .long("dev-eui") - .value_name("DEV_EUI") - .multiple(false) - .help("Device EUI") - .takes_value(true), - ), + App::new("print-ds") + .about("Print the device-session for debugging") + .arg( + Arg::with_name("dev-eui") + .required(true) + .long("dev-eui") + .value_name("DEV_EUI") + .multiple(false) + .help("Device EUI") + .takes_value(true), + ), ) .get_matches(); diff --git a/chirpstack/src/region.rs b/chirpstack/src/region.rs index d455c6fe..1f8eaa38 100644 --- a/chirpstack/src/region.rs +++ b/chirpstack/src/region.rs @@ -34,6 +34,12 @@ pub fn setup() -> Result<()> { ); for ec in &r.network.extra_channels { + trace!( + frequency = ec.frequency, + min_dr = ec.min_dr, + max_dr = ec.max_dr, + "Adding extra channel" + ); region_conf .add_channel(ec.frequency, ec.min_dr, ec.max_dr) .context("Add channel")?; diff --git a/chirpstack/src/uplink/stats.rs b/chirpstack/src/uplink/stats.rs index d4b2dd85..0f1c7a1e 100644 --- a/chirpstack/src/uplink/stats.rs +++ b/chirpstack/src/uplink/stats.rs @@ -1,13 +1,16 @@ +use std::collections::hash_map::DefaultHasher; use std::collections::HashMap; +use std::hash::{Hash, Hasher}; use anyhow::{Context, Result}; use chrono::{DateTime, Local}; -use tracing::{error, span, trace, Instrument, Level}; +use tracing::{error, info, span, trace, Instrument, Level}; use uuid::Uuid; -use crate::region; +use crate::gateway::backend as gateway_backend; use crate::storage::{gateway, metrics}; -use chirpstack_api::gw; +use crate::{config, region}; +use chirpstack_api::{common, gw}; use lrwn::EUI64; pub struct Stats { @@ -41,7 +44,7 @@ impl Stats { ctx.update_gateway_state().await?; ctx.save_stats().await?; - // ctx.update_gateway_configuration().await?; + ctx.update_gateway_configuration().await?; Ok(()) } @@ -130,6 +133,94 @@ impl Stats { Ok(()) } + + async fn update_gateway_configuration(&self) -> Result<()> { + trace!("Updating gateway configuration"); + + if !self.stats.meta_data.contains_key("concentratord_version") { + trace!("Gateway configuration only works with Concentratord, skipping"); + return Ok(()); + } + + let gw = self.gateway.as_ref().unwrap(); + let region_name = self + .stats + .meta_data + .get("region_name") + .cloned() + .unwrap_or_default(); + + let gateway_conf = config::get_region_gateway(®ion_name)?; + if gateway_conf.channels.is_empty() { + trace!("Skipping gateway configuration, channels is empty"); + return Ok(()); + } + + // get gw config version + let gw_config_version = self + .stats + .meta_data + .get("config_version") + .cloned() + .unwrap_or_default(); + + // We use the Hash trait to generate the config version. + let mut hasher = DefaultHasher::new(); + gw.stats_interval_secs.hash(&mut hasher); + gateway_conf.channels.hash(&mut hasher); + let hash = format!("{:x}", hasher.finish()); + + if gw_config_version == hash { + trace!(config_version = %hash, "Config version is equal, no need for config update"); + return Ok(()); + } + + info!(current_config_version = %gw_config_version, desired_config_version = %hash, "Updating gateway configuration"); + + let gw_conf = gw::GatewayConfiguration { + gateway_id: self.stats.gateway_id.clone(), + version: hash, + channels: gateway_conf + .channels + .iter() + .map(|c| gw::ChannelConfiguration { + frequency: c.frequency, + modulation: match c.modulation { + config::GatewayChannelModulation::LORA => common::Modulation::Lora, + config::GatewayChannelModulation::FSK => common::Modulation::Fsk, + } + .into(), + modulation_config: Some(match c.modulation { + config::GatewayChannelModulation::LORA => { + gw::channel_configuration::ModulationConfig::LoraModulationConfig( + gw::LoRaModulationConfig { + bandwidth: c.bandwidth / 1000, + spreading_factors: c.spreading_factors.clone(), + }, + ) + } + config::GatewayChannelModulation::FSK => { + gw::channel_configuration::ModulationConfig::FskModulationConfig( + gw::FskModulationConfig { + bandwidth: c.bandwidth / 1000, + bitrate: c.datarate, + }, + ) + } + }), + ..Default::default() + }) + .collect(), + stats_interval: Some(pbjson_types::Duration { + nanos: 0, + seconds: gw.stats_interval_secs.into(), + }), + }; + + gateway_backend::send_configuration(®ion_name, &gw_conf) + .await + .context("Send gateway configuration") + } } fn per_modultation_to_per_dr(