Implement sending gw config for Concentratord based gws.

This commit is contained in:
Orne Brocaar 2022-04-07 21:29:13 +01:00
parent 519a899398
commit ecbfef26db
8 changed files with 189 additions and 20 deletions

View File

@ -15,7 +15,7 @@ fn _rquickjs_to_struct_val(val: &rquickjs::Value) -> Option<pbjson_types::value:
val.as_int().unwrap().into(),
)),
rquickjs::Type::Float => Some(pbjson_types::value::Kind::NumberValue(
val.as_float().unwrap().into(),
val.as_float().unwrap(),
)),
rquickjs::Type::String => Some(pbjson_types::value::Kind::StringValue(
val.as_string().unwrap().to_string().unwrap(),

View File

@ -372,6 +372,7 @@ impl Default for Region {
},
gateway: RegionGateway {
force_gws_private: false,
channels: vec![],
backend: GatewayBackend {
enabled: "mqtt".into(),
mqtt: GatewayBackendMqtt {
@ -443,6 +444,7 @@ pub struct ExtraChannel {
pub struct RegionGateway {
pub force_gws_private: bool,
pub backend: GatewayBackend,
pub channels: Vec<GatewayChannel>,
}
#[derive(Default, Serialize, Deserialize, Clone)]
@ -468,6 +470,36 @@ pub struct GatewayBackendMqtt {
pub tls_key: String,
}
#[derive(Serialize, Deserialize, Clone, Hash)]
#[allow(non_camel_case_types)]
#[allow(clippy::upper_case_acronyms)]
pub enum GatewayChannelModulation {
LORA,
FSK,
}
#[derive(Serialize, Deserialize, Clone, Hash)]
#[serde(default)]
pub struct GatewayChannel {
pub frequency: u32,
pub bandwidth: u32,
pub modulation: GatewayChannelModulation,
pub spreading_factors: Vec<u32>,
pub datarate: u32,
}
impl Default for GatewayChannel {
fn default() -> Self {
GatewayChannel {
frequency: 0,
bandwidth: 0,
modulation: GatewayChannelModulation::LORA,
spreading_factors: vec![],
datarate: 0,
}
}
}
pub fn load(config_dir: &Path) -> Result<()> {
let mut content: String = String::new();
@ -523,6 +555,17 @@ pub fn get_region_network(region_name: &str) -> Result<RegionNetwork> {
Err(anyhow!("region_name not found"))
}
pub fn get_region_gateway(region_name: &str) -> Result<RegionGateway> {
let conf = get();
for region in &conf.regions {
if region.name == region_name {
return Ok(region.gateway.clone());
}
}
Err(anyhow!("region_name not found"))
}
pub fn get_required_snr_for_sf(sf: u8) -> Result<f32> {
Ok(match sf {
6 => -5.0,

View File

@ -26,7 +26,11 @@ impl GatewayBackend for Backend {
Ok(())
}
async fn send_configuration(&self) -> Result<()> {
async fn send_configuration(
&self,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()> {
GATEWAY_CONFIGURATIONS.write().await.push(gw_conf.clone());
Ok(())
}
}

View File

@ -3,7 +3,7 @@ use std::collections::HashMap;
use anyhow::{Context, Result};
use async_trait::async_trait;
use tokio::sync::RwLock;
use tracing::info;
use tracing::{info, warn};
use crate::config;
@ -19,7 +19,10 @@ lazy_static! {
#[async_trait]
pub trait GatewayBackend {
async fn send_downlink(&self, df: &chirpstack_api::gw::DownlinkFrame) -> Result<()>;
async fn send_configuration(&self) -> Result<()>;
async fn send_configuration(
&self,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()>;
}
pub async fn setup() -> Result<()> {
@ -27,6 +30,11 @@ pub async fn setup() -> Result<()> {
info!("Setting up gateway backends for the different regions");
for region in &conf.regions {
if !conf.network.enabled_regions.contains(&region.name) {
warn!("Config exists, but region is not enabled. To enable it, add it to 'network.enabled_regions'");
continue;
}
info!(
region_name = %region.name,
region_common_name = %region.common_name,
@ -66,13 +74,16 @@ pub async fn send_downlink(
Ok(())
}
pub async fn send_configuration(region_name: &str) -> Result<()> {
pub async fn send_configuration(
region_name: &str,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()> {
let b_r = BACKENDS.read().await;
let b = b_r
.get(region_name)
.ok_or_else(|| anyhow!("region_name '{}' does not exist in BACKENDS", region_name))?;
b.send_configuration().await?;
b.send_configuration(gw_conf).await?;
Ok(())
}

View File

@ -164,7 +164,19 @@ impl GatewayBackend for MqttBackend<'_> {
Ok(())
}
async fn send_configuration(&self) -> Result<()> {
async fn send_configuration(
&self,
gw_conf: &chirpstack_api::gw::GatewayConfiguration,
) -> Result<()> {
let gateway_id = EUI64::from_slice(&gw_conf.gateway_id)?;
let topic = self.get_command_topic(&gateway_id.to_string(), "config")?;
let b = gw_conf.encode_to_vec();
info!(gateway_id = %gateway_id, topic = %topic, "Sending gateway configuration");
let msg = mqtt::Message::new(topic, b, self.qos as i32);
self.client.publish(msg).await?;
trace!("Message sent");
Ok(())
}
}

View File

@ -61,15 +61,17 @@ async fn main() -> Result<()> {
)
.subcommand(App::new("configfile").about("Print the configuration template"))
.subcommand(
App::new("print-ds").about("Print the device-session").arg(
Arg::with_name("dev-eui")
.required(true)
.long("dev-eui")
.value_name("DEV_EUI")
.multiple(false)
.help("Device EUI")
.takes_value(true),
),
App::new("print-ds")
.about("Print the device-session for debugging")
.arg(
Arg::with_name("dev-eui")
.required(true)
.long("dev-eui")
.value_name("DEV_EUI")
.multiple(false)
.help("Device EUI")
.takes_value(true),
),
)
.get_matches();

View File

@ -34,6 +34,12 @@ pub fn setup() -> Result<()> {
);
for ec in &r.network.extra_channels {
trace!(
frequency = ec.frequency,
min_dr = ec.min_dr,
max_dr = ec.max_dr,
"Adding extra channel"
);
region_conf
.add_channel(ec.frequency, ec.min_dr, ec.max_dr)
.context("Add channel")?;

View File

@ -1,13 +1,16 @@
use std::collections::hash_map::DefaultHasher;
use std::collections::HashMap;
use std::hash::{Hash, Hasher};
use anyhow::{Context, Result};
use chrono::{DateTime, Local};
use tracing::{error, span, trace, Instrument, Level};
use tracing::{error, info, span, trace, Instrument, Level};
use uuid::Uuid;
use crate::region;
use crate::gateway::backend as gateway_backend;
use crate::storage::{gateway, metrics};
use chirpstack_api::gw;
use crate::{config, region};
use chirpstack_api::{common, gw};
use lrwn::EUI64;
pub struct Stats {
@ -41,7 +44,7 @@ impl Stats {
ctx.update_gateway_state().await?;
ctx.save_stats().await?;
// ctx.update_gateway_configuration().await?;
ctx.update_gateway_configuration().await?;
Ok(())
}
@ -130,6 +133,94 @@ impl Stats {
Ok(())
}
async fn update_gateway_configuration(&self) -> Result<()> {
trace!("Updating gateway configuration");
if !self.stats.meta_data.contains_key("concentratord_version") {
trace!("Gateway configuration only works with Concentratord, skipping");
return Ok(());
}
let gw = self.gateway.as_ref().unwrap();
let region_name = self
.stats
.meta_data
.get("region_name")
.cloned()
.unwrap_or_default();
let gateway_conf = config::get_region_gateway(&region_name)?;
if gateway_conf.channels.is_empty() {
trace!("Skipping gateway configuration, channels is empty");
return Ok(());
}
// get gw config version
let gw_config_version = self
.stats
.meta_data
.get("config_version")
.cloned()
.unwrap_or_default();
// We use the Hash trait to generate the config version.
let mut hasher = DefaultHasher::new();
gw.stats_interval_secs.hash(&mut hasher);
gateway_conf.channels.hash(&mut hasher);
let hash = format!("{:x}", hasher.finish());
if gw_config_version == hash {
trace!(config_version = %hash, "Config version is equal, no need for config update");
return Ok(());
}
info!(current_config_version = %gw_config_version, desired_config_version = %hash, "Updating gateway configuration");
let gw_conf = gw::GatewayConfiguration {
gateway_id: self.stats.gateway_id.clone(),
version: hash,
channels: gateway_conf
.channels
.iter()
.map(|c| gw::ChannelConfiguration {
frequency: c.frequency,
modulation: match c.modulation {
config::GatewayChannelModulation::LORA => common::Modulation::Lora,
config::GatewayChannelModulation::FSK => common::Modulation::Fsk,
}
.into(),
modulation_config: Some(match c.modulation {
config::GatewayChannelModulation::LORA => {
gw::channel_configuration::ModulationConfig::LoraModulationConfig(
gw::LoRaModulationConfig {
bandwidth: c.bandwidth / 1000,
spreading_factors: c.spreading_factors.clone(),
},
)
}
config::GatewayChannelModulation::FSK => {
gw::channel_configuration::ModulationConfig::FskModulationConfig(
gw::FskModulationConfig {
bandwidth: c.bandwidth / 1000,
bitrate: c.datarate,
},
)
}
}),
..Default::default()
})
.collect(),
stats_interval: Some(pbjson_types::Duration {
nanos: 0,
seconds: gw.stats_interval_secs.into(),
}),
};
gateway_backend::send_configuration(&region_name, &gw_conf)
.await
.context("Send gateway configuration")
}
}
fn per_modultation_to_per_dr(