This commit is contained in:
Orne Brocaar 2024-06-14 12:57:13 +01:00
parent 9fb3165173
commit 4f5b14eeb8
10 changed files with 961 additions and 21 deletions

View File

@ -75,6 +75,35 @@ service GatewayService {
get: "/api/gateways/{gateway_id}/duty-cycle-metrics"
};
}
// Get the given Relay Gateway.
rpc GetRelayGateway(GetRelayGatewayRequest) returns (GetRelayGatewayResponse) {
option(google.api.http) = {
get: "/api/gateways/relay-gateways/{tenant_id}/{relay_id}"
};
}
// List the detected Relay Gateways.
rpc ListRelayGateways(ListRelayGatewaysRequest) returns (ListRelayGatewaysResponse) {
option(google.api.http) = {
get: "/api/gateways/relay-gateways"
};
}
// Update the given Relay Gateway.
rpc UpdateRelayGateway(UpdateRelayGatewayRequest) returns (google.protobuf.Empty) {
option(google.api.http) = {
put: "/api/gateways/relay-gateways/{relay_gateway.tenant_id}/{relay_gateway.relay_id}"
body: "*"
};
}
// Delete the given Relay Gateway.
rpc DeleteRelayGateway(DeleteRelayGatewayRequest) returns (google.protobuf.Empty) {
option(google.api.http) = {
delete: "/api/gateways/relay-gateways/{tenant_id}/{relay_id}"
};
}
}
enum GatewayState {
@ -135,11 +164,11 @@ message GatewayListItem {
// Gateway properties.
map<string, string> properties = 6;
// Created at timestamp.
google.protobuf.Timestamp created_at = 7;
// Created at timestamp.
google.protobuf.Timestamp created_at = 7;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 8;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 8;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 9;
@ -284,3 +313,105 @@ message GetGatewayDutyCycleMetricsResponse {
// Percentage relative to tracking window.
common.Metric window_percentage = 2;
}
message GetRelayGatewayRequest {
// Tenant ID (UUID).
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
}
message GetRelayGatewayResponse {
// Relay Gateway object.
RelayGateway relay_gateway = 1;
// Created at timestamp.
google.protobuf.Timestamp created_at = 2;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 3;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 4;
}
message ListRelayGatewaysRequest {
// Max number of relay-gateways to return in the result-set.
uint32 limit = 1;
// Offset in the result-set (for pagination).
uint32 offset = 2;
// Tenant ID (UUID) to filter relay-gateways on.
// To list all relay-gateways as a global admin user, this field can be left blank.
string tenant_id = 3;
}
message ListRelayGatewaysResponse {
// Total number of relay-gateways.
uint32 total_count = 1;
// Result-set.
repeated RelayGatewayListItem result = 2;
}
message RelayGatewayListItem {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
// Name.
string name = 3;
// Description.
string description = 4;
// Created at timestamp.
google.protobuf.Timestamp created_at = 5;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 6;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 7;
// Gateway state.
// Please note that the state of the relay is driven by the last
// received stats packet sent by the relay-gateway.
GatewayState state = 10;
}
message UpdateRelayGatewayRequest {
// Relay Gateway object.
RelayGateway relay_gateway = 1;
}
message DeleteRelayGatewayRequest {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
}
message RelayGateway {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
// Name.
string name = 3;
// Description.
string description = 4;
// Stats interval (seconds).
// This defines the expected interval in which the gateway sends its
// statistics.
uint32 stats_interval = 5;
}

View File

@ -73,6 +73,35 @@ service GatewayService {
get: "/api/gateways/{gateway_id}/duty-cycle-metrics"
};
}
// Get the given Relay Gateway.
rpc GetRelayGateway(GetRelayGatewayRequest) returns (GetRelayGatewayResponse) {
option(google.api.http) = {
get: "/api/gateways/relay-gateways/{tenant_id}/{relay_id}"
};
}
// List the detected Relay Gateways.
rpc ListRelayGateways(ListRelayGatewaysRequest) returns (ListRelayGatewaysResponse) {
option(google.api.http) = {
get: "/api/gateways/relay-gateways"
};
}
// Update the given Relay Gateway.
rpc UpdateRelayGateway(UpdateRelayGatewayRequest) returns (google.protobuf.Empty) {
option(google.api.http) = {
put: "/api/gateways/relay-gateways/{relay_gateway.tenant_id}/{relay_gateway.relay_id}"
body: "*"
};
}
// Delete the given Relay Gateway.
rpc DeleteRelayGateway(DeleteRelayGatewayRequest) returns (google.protobuf.Empty) {
option(google.api.http) = {
delete: "/api/gateways/relay-gateways/{tenant_id}/{relay_id}"
};
}
}
enum GatewayState {
@ -133,11 +162,11 @@ message GatewayListItem {
// Gateway properties.
map<string, string> properties = 6;
// Created at timestamp.
google.protobuf.Timestamp created_at = 7;
// Created at timestamp.
google.protobuf.Timestamp created_at = 7;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 8;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 8;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 9;
@ -282,3 +311,105 @@ message GetGatewayDutyCycleMetricsResponse {
// Percentage relative to tracking window.
common.Metric window_percentage = 2;
}
message GetRelayGatewayRequest {
// Tenant ID (UUID).
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
}
message GetRelayGatewayResponse {
// Relay Gateway object.
RelayGateway relay_gateway = 1;
// Created at timestamp.
google.protobuf.Timestamp created_at = 2;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 3;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 4;
}
message ListRelayGatewaysRequest {
// Max number of relay-gateways to return in the result-set.
uint32 limit = 1;
// Offset in the result-set (for pagination).
uint32 offset = 2;
// Tenant ID (UUID) to filter relay-gateways on.
// To list all relay-gateways as a global admin user, this field can be left blank.
string tenant_id = 3;
}
message ListRelayGatewaysResponse {
// Total number of relay-gateways.
uint32 total_count = 1;
// Result-set.
repeated RelayGatewayListItem result = 2;
}
message RelayGatewayListItem {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
// Name.
string name = 3;
// Description.
string description = 4;
// Created at timestamp.
google.protobuf.Timestamp created_at = 5;
// Last update timestamp.
google.protobuf.Timestamp updated_at = 6;
// Last seen at timestamp.
google.protobuf.Timestamp last_seen_at = 7;
// Gateway state.
// Please note that the state of the relay is driven by the last
// received stats packet sent by the relay-gateway.
GatewayState state = 10;
}
message UpdateRelayGatewayRequest {
// Relay Gateway object.
RelayGateway relay_gateway = 1;
}
message DeleteRelayGatewayRequest {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
}
message RelayGateway {
// Tenant ID.
string tenant_id = 1;
// Relay ID (4 byte HEX).
string relay_id = 2;
// Name.
string name = 3;
// Description.
string description = 4;
// Stats interval (seconds).
// This defines the expected interval in which the gateway sends its
// statistics.
uint32 stats_interval = 5;
}

View File

@ -0,0 +1 @@
drop table relay_gateway;

View File

@ -0,0 +1,12 @@
create table relay_gateway (
tenant_id uuid not null references tenant on delete cascade,
relay_id bytea not null,
created_at timestamp with time zone not null,
updated_at timestamp with time zone not null,
last_seen_at timestamp with time zone,
name varchar(100) not null,
description text not null,
stats_interval_secs integer not null,
primary key (tenant_id, relay_id)
);

View File

@ -14,7 +14,11 @@ use super::auth::validator;
use super::error::ToStatus;
use super::helpers::{self, FromProto};
use crate::certificate;
use crate::storage::{fields, gateway, metrics};
use crate::storage::{
fields,
gateway::{self, RelayId},
metrics,
};
pub struct Gateway {
validator: validator::RequestValidator,
@ -783,6 +787,191 @@ impl GatewayService for Gateway {
Ok(resp)
}
async fn get_relay_gateway(
&self,
request: Request<api::GetRelayGatewayRequest>,
) -> Result<Response<api::GetRelayGatewayResponse>, Status> {
let req = request.get_ref();
let tenant_id = Uuid::from_str(&req.tenant_id).map_err(|e| e.status())?;
let relay_id = RelayId::from_str(&req.relay_id).map_err(|e| e.status())?;
// The tenant_id is part of the relay PK.
self.validator
.validate(
request.extensions(),
validator::ValidateGatewaysAccess::new(validator::Flag::List, tenant_id),
)
.await?;
let relay = gateway::get_relay_gateway(tenant_id, relay_id)
.await
.map_err(|e| e.status())?;
let mut resp = Response::new(api::GetRelayGatewayResponse {
relay_gateway: Some(api::RelayGateway {
tenant_id: relay.tenant_id.to_string(),
relay_id: relay.relay_id.to_string(),
name: relay.name,
description: relay.description,
stats_interval: relay.stats_interval_secs as u32,
}),
created_at: Some(helpers::datetime_to_prost_timestamp(&relay.created_at)),
updated_at: Some(helpers::datetime_to_prost_timestamp(&relay.updated_at)),
last_seen_at: relay
.last_seen_at
.as_ref()
.map(helpers::datetime_to_prost_timestamp),
});
resp.metadata_mut()
.insert("x-log-tenant_id", req.tenant_id.parse().unwrap());
resp.metadata_mut()
.insert("x-log-relay_id", req.relay_id.parse().unwrap());
Ok(resp)
}
async fn update_relay_gateway(
&self,
request: Request<api::UpdateRelayGatewayRequest>,
) -> Result<Response<()>, Status> {
let req_relay = match &request.get_ref().relay_gateway {
Some(v) => v,
None => {
return Err(Status::invalid_argument("relay_gateway is missing"));
}
};
let tenant_id = Uuid::from_str(&req_relay.tenant_id).map_err(|e| e.status())?;
let relay_id = RelayId::from_str(&req_relay.relay_id).map_err(|e| e.status())?;
// The tenant_id is part of the relay PK.
self.validator
.validate(
request.extensions(),
validator::ValidateGatewaysAccess::new(validator::Flag::List, tenant_id),
)
.await?;
let _ = gateway::update_relay_gateway(gateway::RelayGateway {
tenant_id,
relay_id,
name: req_relay.name.clone(),
description: req_relay.description.clone(),
stats_interval_secs: req_relay.stats_interval as i32,
..Default::default()
})
.await
.map_err(|e| e.status())?;
let mut resp = Response::new(());
resp.metadata_mut()
.insert("x-log-tenant_id", req_relay.tenant_id.parse().unwrap());
resp.metadata_mut()
.insert("x-log-relay_id", req_relay.relay_id.parse().unwrap());
Ok(resp)
}
async fn delete_relay_gateway(
&self,
request: Request<api::DeleteRelayGatewayRequest>,
) -> Result<Response<()>, Status> {
let req = request.get_ref();
let tenant_id = Uuid::from_str(&req.tenant_id).map_err(|e| e.status())?;
let relay_id = RelayId::from_str(&req.relay_id).map_err(|e| e.status())?;
// The tenant_id is part of the relay PK.
self.validator
.validate(
request.extensions(),
validator::ValidateGatewaysAccess::new(validator::Flag::List, tenant_id),
)
.await?;
gateway::delete_relay_gateway(tenant_id, relay_id)
.await
.map_err(|e| e.status())?;
let mut resp = Response::new(());
resp.metadata_mut()
.insert("x-log-tenant_id", req.tenant_id.parse().unwrap());
resp.metadata_mut()
.insert("x-log-relay_id", req.relay_id.parse().unwrap());
Ok(resp)
}
async fn list_relay_gateways(
&self,
request: Request<api::ListRelayGatewaysRequest>,
) -> Result<Response<api::ListRelayGatewaysResponse>, Status> {
let req = request.get_ref();
let tenant_id = if req.tenant_id.is_empty() {
None
} else {
Some(Uuid::from_str(&req.tenant_id).map_err(|e| e.status())?)
};
self.validator
.validate(
request.extensions(),
validator::ValidateGatewaysAccess::new(
validator::Flag::List,
tenant_id.unwrap_or(Uuid::nil()),
),
)
.await?;
let filters = gateway::RelayGatewayFilters { tenant_id };
let count = gateway::get_relay_gateway_count(&filters)
.await
.map_err(|e| e.status())?;
let result = gateway::list_relay_gateways(req.limit as i64, req.offset as i64, &filters)
.await
.map_err(|e| e.status())?;
let mut resp = Response::new(api::ListRelayGatewaysResponse {
total_count: count as u32,
result: result
.iter()
.map(|r| api::RelayGatewayListItem {
tenant_id: r.tenant_id.to_string(),
relay_id: r.relay_id.to_string(),
name: r.name.clone(),
description: r.description.clone(),
created_at: Some(helpers::datetime_to_prost_timestamp(&r.created_at)),
updated_at: Some(helpers::datetime_to_prost_timestamp(&r.updated_at)),
last_seen_at: r
.last_seen_at
.as_ref()
.map(helpers::datetime_to_prost_timestamp),
state: {
if let Some(ts) = r.last_seen_at {
if (Utc::now() - ts)
> Duration::try_seconds((r.stats_interval_secs * 2).into())
.unwrap_or_default()
{
api::GatewayState::Offline
} else {
api::GatewayState::Online
}
} else {
api::GatewayState::NeverSeen
}
}
.into(),
})
.collect(),
});
if !req.tenant_id.is_empty() {
resp.metadata_mut()
.insert("x-log-tenant_id", req.tenant_id.parse().unwrap());
}
Ok(resp)
}
}
#[cfg(test)]
@ -1162,5 +1351,99 @@ pub mod test {
}),
stats_resp.window_percentage
);
// create relay gateway
let _ = gateway::create_relay_gateway(gateway::RelayGateway {
tenant_id: t.id,
relay_id: gateway::RelayId::from_be_bytes([1, 2, 3, 4]),
name: "test-relay".into(),
description: "test relay".into(),
..Default::default()
})
.await
.unwrap();
// get relay gateway
let get_relay_req = api::GetRelayGatewayRequest {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
name: "test-relay".into(),
description: "test relay".into(),
stats_interval: 900,
}),
get_relay_resp.get_ref().relay_gateway
);
// update
let up_relay_req = api::UpdateRelayGatewayRequest {
relay_gateway: Some(api::RelayGateway {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
name: "updated-relay".into(),
description: "updated relay".into(),
stats_interval: 600,
}),
};
let mut up_relay_req = Request::new(up_relay_req);
up_relay_req.extensions_mut().insert(AuthID::User(u.id));
let _ = service.update_relay_gateway(up_relay_req).await.unwrap();
// get relay gateway
let get_relay_req = api::GetRelayGatewayRequest {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
name: "updated-relay".into(),
description: "updated relay".into(),
stats_interval: 600,
}),
get_relay_resp.get_ref().relay_gateway
);
// list
let list_relay_req = api::ListRelayGatewaysRequest {
tenant_id: t.id.to_string(),
limit: 10,
offset: 0,
};
let mut list_relay_req = Request::new(list_relay_req);
list_relay_req.extensions_mut().insert(AuthID::User(u.id));
let list_relay_resp = service.list_relay_gateways(list_relay_req).await.unwrap();
assert_eq!(1, list_relay_resp.get_ref().total_count);
assert_eq!(1, list_relay_resp.get_ref().result.len());
// delete
let del_relay_req = api::DeleteRelayGatewayRequest {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_ok());
let del_relay_req = api::DeleteRelayGatewayRequest {
tenant_id: t.id.to_string(),
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_err());
}
}

View File

@ -414,6 +414,18 @@ async fn message_callback(
set_gateway_json(&event.gateway_id, json);
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
} else if topic.ends_with("/mesh-stats") {
EVENT_COUNTER
.get_or_create(&EventLabels {
event: "mesh-stats".to_string(),
})
.inc();
let event = match json {
true => serde_json::from_slice(&p.payload)?,
false => chirpstack_api::gw::MeshStats::decode(&mut Cursor::new(&p.payload))?,
};
tokio::spawn(uplink::mesh_stats::MeshStats::handle(event));
} else {
return Err(anyhow!("Unknown event type"));
}

View File

@ -7,11 +7,13 @@ use diesel_async::RunQueryDsl;
use tracing::info;
use uuid::Uuid;
use lrwn::EUI64;
use lrwn::{DevAddr, EUI64};
use super::schema::{gateway, multicast_group_gateway, tenant};
use super::schema::{gateway, multicast_group_gateway, relay_gateway, tenant};
use super::{error::Error, fields, get_async_db_conn};
pub type RelayId = DevAddr;
#[derive(Queryable, Insertable, PartialEq, Debug)]
#[diesel(table_name = gateway)]
pub struct Gateway {
@ -40,6 +42,29 @@ impl Gateway {
}
}
impl Default for Gateway {
fn default() -> Self {
let now = Utc::now();
Gateway {
gateway_id: EUI64::from_be_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
tenant_id: Uuid::nil(),
created_at: now,
updated_at: now,
last_seen_at: None,
name: "".into(),
description: "".into(),
latitude: 0.0,
longitude: 0.0,
altitude: 0.0,
tls_certificate: None,
stats_interval_secs: 30,
tags: fields::KeyValue::new(HashMap::new()),
properties: fields::KeyValue::new(HashMap::new()),
}
}
}
#[derive(AsChangeset, Debug, Clone, Default)]
#[diesel(table_name = gateway)]
pub struct GatewayChangeset {
@ -95,29 +120,53 @@ pub struct GatewayCountsByState {
pub offline_count: i64,
}
impl Default for Gateway {
#[derive(Queryable, Insertable, PartialEq, Debug)]
#[diesel(table_name = relay_gateway)]
pub struct RelayGateway {
pub tenant_id: Uuid,
pub relay_id: RelayId,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_seen_at: Option<DateTime<Utc>>,
pub name: String,
pub description: String,
pub stats_interval_secs: i32,
}
impl Default for RelayGateway {
fn default() -> Self {
let now = Utc::now();
Gateway {
gateway_id: EUI64::from_be_bytes([0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00]),
RelayGateway {
relay_id: RelayId::from_be_bytes([1, 2, 3, 4]),
tenant_id: Uuid::nil(),
created_at: now,
updated_at: now,
last_seen_at: None,
name: "".into(),
description: "".into(),
latitude: 0.0,
longitude: 0.0,
altitude: 0.0,
tls_certificate: None,
stats_interval_secs: 30,
tags: fields::KeyValue::new(HashMap::new()),
properties: fields::KeyValue::new(HashMap::new()),
stats_interval_secs: 900,
}
}
}
#[derive(Default, Clone)]
pub struct RelayGatewayFilters {
pub tenant_id: Option<Uuid>,
}
#[derive(Queryable, PartialEq, Debug)]
pub struct RelayGatewayListItem {
pub relay_id: RelayId,
pub tenant_id: Uuid,
pub created_at: DateTime<Utc>,
pub updated_at: DateTime<Utc>,
pub last_seen_at: Option<DateTime<Utc>>,
pub name: String,
pub description: String,
pub stats_interval_secs: i32,
}
pub async fn create(gw: Gateway) -> Result<Gateway, Error> {
gw.validate()?;
let mut c = get_async_db_conn().await?;
@ -322,6 +371,101 @@ pub async fn get_counts_by_state(tenant_id: &Option<Uuid>) -> Result<GatewayCoun
Ok(counts)
}
pub async fn create_relay_gateway(relay: RelayGateway) -> Result<RelayGateway, Error> {
let relay: RelayGateway = diesel::insert_into(relay_gateway::table)
.values(&relay)
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, relay.relay_id.to_string()))?;
info!(relay_id = %relay.relay_id, "Relay Gateway created");
Ok(relay)
}
pub async fn get_relay_gateway(tenant_id: Uuid, relay_id: RelayId) -> Result<RelayGateway, Error> {
let relay = relay_gateway::dsl::relay_gateway
.find((&tenant_id, &relay_id))
.first(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, relay_id.to_string()))?;
Ok(relay)
}
pub async fn update_relay_gateway(relay: RelayGateway) -> Result<RelayGateway, Error> {
let relay: RelayGateway =
diesel::update(relay_gateway::dsl::relay_gateway.find((&relay.tenant_id, &relay.relay_id)))
.set((
relay_gateway::updated_at.eq(&relay.updated_at),
relay_gateway::name.eq(&relay.name),
relay_gateway::description.eq(&relay.description),
relay_gateway::stats_interval_secs.eq(&relay.stats_interval_secs),
))
.get_result(&mut get_async_db_conn().await?)
.await
.map_err(|e| Error::from_diesel(e, relay.relay_id.to_string()))?;
info!(relay_id = %relay.relay_id, "Relay Gateway updated");
Ok(relay)
}
pub async fn get_relay_gateway_count(filters: &RelayGatewayFilters) -> Result<i64, Error> {
let mut q = relay_gateway::dsl::relay_gateway
.select(dsl::count_star())
.into_boxed();
if let Some(tenant_id) = &filters.tenant_id {
q = q.filter(relay_gateway::dsl::tenant_id.eq(tenant_id));
}
Ok(q.first(&mut get_async_db_conn().await?).await?)
}
pub async fn delete_relay_gateway(tenant_id: Uuid, relay_id: RelayId) -> Result<(), Error> {
let ra = diesel::delete(relay_gateway::dsl::relay_gateway.find((&tenant_id, &relay_id)))
.execute(&mut get_async_db_conn().await?)
.await?;
if ra == 0 {
return Err(Error::NotFound(relay_id.to_string()));
}
info!(relay_id = %relay_id, "Relay Gateway deleted");
Ok(())
}
pub async fn list_relay_gateways(
limit: i64,
offset: i64,
filters: &RelayGatewayFilters,
) -> Result<Vec<RelayGatewayListItem>, Error> {
let mut q = relay_gateway::dsl::relay_gateway
.select((
relay_gateway::relay_id,
relay_gateway::tenant_id,
relay_gateway::created_at,
relay_gateway::updated_at,
relay_gateway::last_seen_at,
relay_gateway::name,
relay_gateway::description,
relay_gateway::stats_interval_secs,
))
.into_boxed();
if let Some(tenant_id) = &filters.tenant_id {
q = q.filter(relay_gateway::dsl::tenant_id.eq(tenant_id));
}
let items = q
.order_by(relay_gateway::dsl::name)
.limit(limit)
.offset(offset)
.load(&mut get_async_db_conn().await?)
.await?;
Ok(items)
}
#[cfg(test)]
pub mod test {
use super::*;
@ -337,6 +481,14 @@ pub mod test {
offset: i64,
}
struct RelayGatewayFilterTest<'a> {
filters: RelayGatewayFilters,
relay_gateways: Vec<&'a RelayGateway>,
count: usize,
limit: i64,
offset: i64,
}
pub async fn create_gateway(id: EUI64) -> Gateway {
let tenant_id = {
let t = storage::tenant::test::create_tenant().await;
@ -498,4 +650,91 @@ pub mod test {
delete(&gw.gateway_id).await.unwrap();
assert_eq!(true, delete(&gw.gateway_id).await.is_err());
}
#[tokio::test]
async fn test_relay_gateway() {
let _guard = test::prepare().await;
let gw = create_gateway(EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8])).await;
// create
let mut relay = create_relay_gateway(RelayGateway {
relay_id: RelayId::from_be_bytes([1, 2, 3, 4]),
tenant_id: gw.tenant_id,
name: "test-relay".into(),
description: "test relay".into(),
..Default::default()
})
.await
.unwrap();
// get
let relay_get = get_relay_gateway(relay.tenant_id, relay.relay_id)
.await
.unwrap();
assert_eq!(relay, relay_get);
// update
relay.name = "updated-relay".into();
relay = update_relay_gateway(relay).await.unwrap();
let relay_get = get_relay_gateway(relay.tenant_id, relay.relay_id)
.await
.unwrap();
assert_eq!(relay, relay_get);
// test count and list
let tests = vec![
RelayGatewayFilterTest {
filters: RelayGatewayFilters { tenant_id: None },
relay_gateways: vec![&relay],
count: 1,
limit: 10,
offset: 0,
},
RelayGatewayFilterTest {
filters: RelayGatewayFilters {
tenant_id: Some(gw.tenant_id),
},
relay_gateways: vec![&relay],
count: 1,
limit: 10,
offset: 0,
},
RelayGatewayFilterTest {
filters: RelayGatewayFilters {
tenant_id: Some(gw.tenant_id),
},
relay_gateways: vec![&relay],
count: 1,
limit: 10,
offset: 0,
},
];
for tst in tests {
let count = get_relay_gateway_count(&tst.filters).await.unwrap() as usize;
assert_eq!(tst.count, count);
let items = list_relay_gateways(tst.limit, tst.offset, &tst.filters)
.await
.unwrap();
assert_eq!(
tst.relay_gateways
.iter()
.map(|r| r.relay_id.to_string())
.collect::<String>(),
items
.iter()
.map(|r| r.relay_id.to_string())
.collect::<String>(),
);
}
// delete
delete_relay_gateway(relay.tenant_id, relay.relay_id)
.await
.unwrap();
assert!(delete_relay_gateway(relay.tenant_id, relay.relay_id)
.await
.is_err());
}
}

View File

@ -288,6 +288,20 @@ diesel::table! {
}
}
diesel::table! {
relay_gateway (tenant_id, relay_id) {
tenant_id -> Uuid,
relay_id -> Bytea,
created_at -> Timestamptz,
updated_at -> Timestamptz,
last_seen_at -> Nullable<Timestamptz>,
#[max_length = 100]
name -> Varchar,
description -> Text,
stats_interval_secs -> Int4,
}
}
diesel::table! {
tenant (id) {
id -> Uuid,
@ -349,6 +363,7 @@ diesel::joinable!(multicast_group_gateway -> gateway (gateway_id));
diesel::joinable!(multicast_group_gateway -> multicast_group (multicast_group_id));
diesel::joinable!(multicast_group_queue_item -> gateway (gateway_id));
diesel::joinable!(multicast_group_queue_item -> multicast_group (multicast_group_id));
diesel::joinable!(relay_gateway -> tenant (tenant_id));
diesel::joinable!(tenant_user -> tenant (tenant_id));
diesel::joinable!(tenant_user -> user (user_id));
@ -367,6 +382,7 @@ diesel::allow_tables_to_appear_in_same_query!(
multicast_group_gateway,
multicast_group_queue_item,
relay_device,
relay_gateway,
tenant,
tenant_user,
user,

View File

@ -0,0 +1,114 @@
use std::str::FromStr;
use anyhow::Result;
use chrono::{DateTime, Utc};
use tracing::{error, span, trace, warn, Instrument, Level};
use chirpstack_api::gw;
use crate::config;
use crate::helpers::errors::PrintFullError;
use crate::storage::{
error::Error,
gateway::{self, RelayId},
};
use lrwn::EUI64;
pub struct MeshStats {
gateway_id: EUI64,
relay_id: RelayId,
mesh_stats: gw::MeshStats,
}
impl MeshStats {
pub async fn handle(s: gw::MeshStats) {
let gateway_id = match EUI64::from_str(&s.gateway_id) {
Ok(v) => v,
Err(e) => {
warn!(error = %e.full(), "Decode gateway_id error");
return;
}
};
let relay_id = match RelayId::from_str(&s.relay_id) {
Ok(v) => v,
Err(e) => {
warn!(error = %e.full(), "Decode relay_id error");
return;
}
};
let span = span!(Level::INFO, "mesh_stats", gateway_id = %gateway_id, relay_id = %relay_id);
if let Err(e) = MeshStats::_handle(gateway_id, relay_id, s)
.instrument(span)
.await
{
match e.downcast_ref::<Error>() {
Some(Error::NotFound(_)) => {
let conf = config::get();
if !conf.gateway.allow_unknown_gateways {
error!(error = %e.full(), "Handle mesh-stats error");
}
}
Some(_) | None => {
error!(error = %e.full(), "Handle mesh-stats error");
}
}
}
}
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::MeshStats) -> Result<()> {
let mut ctx = MeshStats {
gateway_id,
relay_id,
mesh_stats: s,
};
ctx.update_or_create_relay_gateway().await?;
Ok(())
}
async fn update_or_create_relay_gateway(&mut self) -> Result<()> {
trace!("Getting Border Gateway");
let border_gw = gateway::get(&self.gateway_id).await?;
let ts: DateTime<Utc> = match &self.mesh_stats.time {
Some(v) => v
.clone()
.try_into()
.map_err(|e| anyhow!("Convert time error: {}", e))?,
None => {
warn!("Stats message does not have time field set");
return Ok(());
}
};
match gateway::get_relay_gateway(border_gw.tenant_id, self.relay_id).await {
Ok(mut v) => {
if let Some(last_seen_at) = v.last_seen_at {
if last_seen_at > ts {
warn!("Time is less than last seen timestamp, ignoring stats");
return Ok(());
}
}
v.last_seen_at = Some(ts);
gateway::update_relay_gateway(v).await?;
}
Err(_) => {
let _ = gateway::create_relay_gateway(gateway::RelayGateway {
tenant_id: border_gw.tenant_id,
relay_id: self.relay_id,
name: self.relay_id.to_string(),
last_seen_at: Some(ts),
..Default::default()
})
.await?;
}
}
Ok(())
}
}

View File

@ -33,6 +33,7 @@ pub mod helpers;
pub mod join;
pub mod join_fns;
pub mod join_sns;
pub mod mesh_stats;
pub mod stats;
#[derive(Clone, Hash, PartialEq, Eq, EncodeLabelSet, Debug)]