mirror of
https://github.com/chirpstack/chirpstack.git
synced 2024-12-19 13:17:55 +00:00
Add global PostgreSQL integration.
This commit is contained in:
parent
a46b0a9469
commit
777b95ef1d
@ -303,6 +303,33 @@ pub fn run() {
|
||||
tls_key="{{ integration.mqtt.tls_key }}"
|
||||
|
||||
|
||||
# PostgreSQL integration configuration.
|
||||
[integration.postgresql]
|
||||
|
||||
# PostgreSQL DSN.
|
||||
#
|
||||
# Format example: postgres://<USERNAME>:<PASSWORD>@<HOSTNAME>/<DATABASE>?sslmode=<SSLMODE>.
|
||||
#
|
||||
# SSL mode options:
|
||||
# * disable - no SSL
|
||||
# * require - Always SSL (skip verification)
|
||||
# * verify-ca - Always SSL (verify that the certificate presented by the server was signed by a trusted CA)
|
||||
# * verify-full - Always SSL (verify that the certification presented by the server was signed by a trusted CA and the server host name matches the one in the certificate)
|
||||
dsn="{{ integration.postgresql.dsn }}"
|
||||
|
||||
# Max open connections.
|
||||
#
|
||||
# This sets the max. number of open connections that are allowed in the
|
||||
# PostgreSQL connection pool.
|
||||
max_open_connections={{ integration.postgresql.max_open_connections }}
|
||||
|
||||
# Min idle connections.
|
||||
#
|
||||
# This sets the min. number of idle connections in the PostgreSQL connection
|
||||
# pool (0 = equal to max_open_connections).
|
||||
min_idle_connections={{ integration.postgresql.min_idle_connections }}
|
||||
|
||||
|
||||
# Codec configuration.
|
||||
[codec]
|
||||
|
||||
|
@ -240,6 +240,7 @@ impl Default for Monitoring {
|
||||
pub struct Integration {
|
||||
pub enabled: Vec<String>,
|
||||
pub mqtt: MqttIntegration,
|
||||
pub postgresql: PostgresqlIntegration,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
@ -302,6 +303,24 @@ impl Default for MqttIntegrationClient {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone)]
|
||||
#[serde(default)]
|
||||
pub struct PostgresqlIntegration {
|
||||
pub dsn: String,
|
||||
pub max_open_connections: u32,
|
||||
pub min_idle_connections: u32,
|
||||
}
|
||||
|
||||
impl Default for PostgresqlIntegration {
|
||||
fn default() -> Self {
|
||||
PostgresqlIntegration {
|
||||
dsn: "postgresql://chirpstack_integration:chirpstack_integration@localhost/chirpstack_integration?sslmode=disable".into(),
|
||||
max_open_connections: 10,
|
||||
min_idle_connections: 0,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize, Clone, Default)]
|
||||
#[serde(default)]
|
||||
pub struct Codec {
|
||||
|
@ -24,6 +24,7 @@ pub mod mock;
|
||||
mod mqtt;
|
||||
mod mydevices;
|
||||
mod pilot_things;
|
||||
mod postgresql;
|
||||
mod redis;
|
||||
mod thingsboard;
|
||||
|
||||
@ -49,6 +50,10 @@ pub async fn setup() -> Result<()> {
|
||||
.context("Setup MQTT integration")?,
|
||||
));
|
||||
}
|
||||
"postgresql" => integrations.push(Box::new(
|
||||
postgresql::Integration::new(&conf.integration.postgresql)
|
||||
.context("Setup PostgreSQL integration")?,
|
||||
)),
|
||||
_ => {
|
||||
return Err(anyhow!("Unexpected integration: {}", name));
|
||||
}
|
||||
|
5
chirpstack/src/integration/postgresql/diesel.toml
Normal file
5
chirpstack/src/integration/postgresql/diesel.toml
Normal file
@ -0,0 +1,5 @@
|
||||
# For documentation on how to configure this file,
|
||||
# see diesel.rs/guides/configuring-diesel-cli
|
||||
|
||||
[print_schema]
|
||||
file = "schema.rs"
|
@ -0,0 +1,8 @@
|
||||
drop table event_integration;
|
||||
drop table event_location;
|
||||
drop table event_status;
|
||||
drop table event_log;
|
||||
drop table event_tx_ack;
|
||||
drop table event_ack;
|
||||
drop table event_join;
|
||||
drop table event_up;
|
@ -0,0 +1,145 @@
|
||||
create table event_up (
|
||||
deduplication_id uuid primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
dev_addr char(8) not null,
|
||||
adr boolean not null,
|
||||
dr smallint not null,
|
||||
f_cnt bigint not null,
|
||||
f_port smallint not null,
|
||||
confirmed boolean not null,
|
||||
data bytea not null,
|
||||
object jsonb not null,
|
||||
rx_info jsonb not null,
|
||||
tx_info jsonb not null
|
||||
);
|
||||
|
||||
create table event_join (
|
||||
deduplication_id uuid primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
dev_addr char(8) not null
|
||||
);
|
||||
|
||||
create table event_ack (
|
||||
queue_item_id uuid primary key,
|
||||
deduplication_id uuid not null,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
acknowledged boolean not null,
|
||||
f_cnt_down bigint not null
|
||||
);
|
||||
|
||||
create table event_tx_ack (
|
||||
queue_item_id uuid primary key,
|
||||
downlink_id bigint not null,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
f_cnt_down bigint not null,
|
||||
gateway_id char(16) not null,
|
||||
tx_info jsonb not null
|
||||
);
|
||||
|
||||
create table event_log (
|
||||
id bigserial primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
level text not null,
|
||||
code text not null,
|
||||
description text not null,
|
||||
context jsonb not null
|
||||
);
|
||||
|
||||
create table event_status (
|
||||
deduplication_id uuid primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
margin smallint not null,
|
||||
external_power_source boolean not null,
|
||||
battery_level_unavailable boolean not null,
|
||||
battery_level real not null
|
||||
);
|
||||
|
||||
create table event_location (
|
||||
deduplication_id uuid primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
latitude double precision not null,
|
||||
longitude double precision not null,
|
||||
altitude double precision not null,
|
||||
source text not null,
|
||||
accuracy real not null
|
||||
);
|
||||
|
||||
create table event_integration (
|
||||
deduplication_id uuid primary key,
|
||||
time timestamp with time zone not null,
|
||||
tenant_id uuid not null,
|
||||
tenant_name text not null,
|
||||
application_id uuid not null,
|
||||
application_name text not null,
|
||||
device_profile_id uuid not null,
|
||||
device_profile_name text not null,
|
||||
device_name text not null,
|
||||
dev_eui char(16) not null,
|
||||
tags jsonb not null,
|
||||
integration_name text not null,
|
||||
event_type text not null,
|
||||
object jsonb not null
|
||||
);
|
522
chirpstack/src/integration/postgresql/mod.rs
Normal file
522
chirpstack/src/integration/postgresql/mod.rs
Normal file
@ -0,0 +1,522 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use diesel::pg::PgConnection;
|
||||
use diesel::prelude::*;
|
||||
use diesel::r2d2::{ConnectionManager, Pool, PooledConnection};
|
||||
use diesel_migrations::embed_migrations;
|
||||
use tokio::task;
|
||||
use tracing::info;
|
||||
use uuid::Uuid;
|
||||
|
||||
use super::Integration as IntegrationTrait;
|
||||
use crate::config::PostgresqlIntegration as Config;
|
||||
use chirpstack_api::integration;
|
||||
use schema::{
|
||||
event_ack, event_integration, event_join, event_location, event_log, event_status,
|
||||
event_tx_ack, event_up,
|
||||
};
|
||||
|
||||
mod schema;
|
||||
|
||||
embed_migrations!("./src/integration/postgresql/migrations");
|
||||
|
||||
type PgPool = Pool<ConnectionManager<PgConnection>>;
|
||||
type PgPoolConnection = PooledConnection<ConnectionManager<PgConnection>>;
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_up"]
|
||||
struct EventUp {
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub dev_addr: String,
|
||||
pub adr: bool,
|
||||
pub dr: i16,
|
||||
pub f_cnt: i64,
|
||||
pub f_port: i16,
|
||||
pub confirmed: bool,
|
||||
pub data: Vec<u8>,
|
||||
pub object: serde_json::Value,
|
||||
pub rx_info: serde_json::Value,
|
||||
pub tx_info: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_join"]
|
||||
struct EventJoin {
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub dev_addr: String,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_ack"]
|
||||
struct EventAck {
|
||||
pub queue_item_id: Uuid,
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub acknowledged: bool,
|
||||
pub f_cnt_down: i64,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_tx_ack"]
|
||||
struct EventTxAck {
|
||||
pub queue_item_id: Uuid,
|
||||
pub downlink_id: i64,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub f_cnt_down: i64,
|
||||
pub gateway_id: String,
|
||||
pub tx_info: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_log"]
|
||||
struct EventLog {
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub level: String,
|
||||
pub code: String,
|
||||
pub description: String,
|
||||
pub context: serde_json::Value,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_status"]
|
||||
struct EventStatus {
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub margin: i16,
|
||||
pub external_power_source: bool,
|
||||
pub battery_level_unavailable: bool,
|
||||
pub battery_level: f32,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_location"]
|
||||
struct EventLocation {
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub latitude: f64,
|
||||
pub longitude: f64,
|
||||
pub altitude: f64,
|
||||
pub source: String,
|
||||
pub accuracy: f32,
|
||||
}
|
||||
|
||||
#[derive(Insertable)]
|
||||
#[table_name = "event_integration"]
|
||||
struct EventIntegration {
|
||||
pub deduplication_id: Uuid,
|
||||
pub time: DateTime<Utc>,
|
||||
pub tenant_id: Uuid,
|
||||
pub tenant_name: String,
|
||||
pub application_id: Uuid,
|
||||
pub application_name: String,
|
||||
pub device_profile_id: Uuid,
|
||||
pub device_profile_name: String,
|
||||
pub device_name: String,
|
||||
pub dev_eui: String,
|
||||
pub tags: serde_json::Value,
|
||||
pub integration_name: String,
|
||||
pub event_type: String,
|
||||
pub object: serde_json::Value,
|
||||
}
|
||||
|
||||
pub struct Integration {
|
||||
pg_pool: PgPool,
|
||||
}
|
||||
|
||||
impl Integration {
|
||||
pub fn new(conf: &Config) -> Result<Integration> {
|
||||
info!("Initializing PostgreSQL integration");
|
||||
|
||||
let pg_pool = PgPool::builder()
|
||||
.max_size(conf.max_open_connections)
|
||||
.min_idle(match conf.min_idle_connections {
|
||||
0 => None,
|
||||
_ => Some(conf.min_idle_connections),
|
||||
})
|
||||
.build(ConnectionManager::new(&conf.dsn))
|
||||
.context("Setup PostgreSQL connection pool error")?;
|
||||
let db_conn = pg_pool.get()?;
|
||||
|
||||
info!("Applying schema migrations");
|
||||
embedded_migrations::run(&db_conn).context("Run migrations error")?;
|
||||
|
||||
Ok(Integration { pg_pool: pg_pool })
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl IntegrationTrait for Integration {
|
||||
async fn uplink_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::UplinkEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "up", "Inserting event");
|
||||
|
||||
let e = EventUp {
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
dev_addr: pl.dev_addr.clone(),
|
||||
adr: pl.adr,
|
||||
dr: pl.dr as i16,
|
||||
f_cnt: pl.f_cnt_up as i64,
|
||||
f_port: pl.f_port as i16,
|
||||
confirmed: pl.confirmed,
|
||||
data: pl.data.clone(),
|
||||
object: serde_json::to_value(&pl.object)?,
|
||||
rx_info: serde_json::to_value(&pl.rx_info)?,
|
||||
tx_info: serde_json::to_value(&pl.tx_info)?,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_up::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn join_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::JoinEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "join", "Inserting event");
|
||||
|
||||
let e = EventJoin {
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
dev_addr: pl.dev_addr.clone(),
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_join::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn ack_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::AckEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "ack", "Inserting event");
|
||||
|
||||
let e = EventAck {
|
||||
queue_item_id: Uuid::from_str(&pl.queue_item_id)?,
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
acknowledged: pl.acknowledged,
|
||||
f_cnt_down: pl.f_cnt_down as i64,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_ack::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn txack_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::TxAckEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "txack", "Inserting event");
|
||||
|
||||
let e = EventTxAck {
|
||||
queue_item_id: Uuid::from_str(&pl.queue_item_id)?,
|
||||
downlink_id: pl.downlink_id as i64,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
f_cnt_down: pl.f_cnt_down as i64,
|
||||
gateway_id: pl.gateway_id.clone(),
|
||||
tx_info: serde_json::to_value(&pl.tx_info)?,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_tx_ack::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn log_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::LogEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "log", "Inserting event");
|
||||
|
||||
let e = EventLog {
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
level: pl.level.to_string(),
|
||||
code: pl.code.to_string(),
|
||||
description: pl.description.clone(),
|
||||
context: serde_json::to_value(&pl.context)?,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_log::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn status_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::StatusEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "status", "Inserting event");
|
||||
|
||||
let e = EventStatus {
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
margin: pl.margin as i16,
|
||||
external_power_source: pl.external_power_source,
|
||||
battery_level_unavailable: pl.battery_level_unavailable,
|
||||
battery_level: pl.battery_level,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_status::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn location_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::LocationEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
let loc = pl.location.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "location", "Inserting event");
|
||||
|
||||
let e = EventLocation {
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
latitude: loc.latitude,
|
||||
longitude: loc.longitude,
|
||||
altitude: loc.altitude,
|
||||
source: loc.source.to_string(),
|
||||
accuracy: loc.accuracy,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_location::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn integration_event(
|
||||
&self,
|
||||
_vars: &HashMap<String, String>,
|
||||
pl: &integration::IntegrationEvent,
|
||||
) -> Result<()> {
|
||||
let di = pl.device_info.as_ref().unwrap();
|
||||
info!(dev_eui = %di.dev_eui, event = "integration", "Inserting event");
|
||||
|
||||
let e = EventIntegration {
|
||||
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
|
||||
time: pl.time.as_ref().unwrap().clone().try_into()?,
|
||||
tenant_id: Uuid::from_str(&di.tenant_id)?,
|
||||
tenant_name: di.tenant_name.clone(),
|
||||
application_id: Uuid::from_str(&di.application_id)?,
|
||||
application_name: di.application_name.clone(),
|
||||
device_profile_id: Uuid::from_str(&di.device_profile_id)?,
|
||||
device_profile_name: di.device_profile_name.clone(),
|
||||
device_name: di.device_name.clone(),
|
||||
dev_eui: di.dev_eui.clone(),
|
||||
tags: serde_json::to_value(&di.tags)?,
|
||||
integration_name: pl.integration_name.clone(),
|
||||
event_type: pl.event_type.clone(),
|
||||
object: serde_json::to_value(&pl.object)?,
|
||||
};
|
||||
let c = self.pg_pool.get()?;
|
||||
|
||||
task::spawn_blocking(move || -> Result<()> {
|
||||
diesel::insert_into(event_integration::table)
|
||||
.values(&e)
|
||||
.execute(&c)?;
|
||||
Ok(())
|
||||
})
|
||||
.await??;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
172
chirpstack/src/integration/postgresql/schema.rs
Normal file
172
chirpstack/src/integration/postgresql/schema.rs
Normal file
@ -0,0 +1,172 @@
|
||||
table! {
|
||||
event_ack (queue_item_id) {
|
||||
queue_item_id -> Uuid,
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
acknowledged -> Bool,
|
||||
f_cnt_down -> Int8,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_integration (deduplication_id) {
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
integration_name -> Text,
|
||||
event_type -> Text,
|
||||
object -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_join (deduplication_id) {
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
dev_addr -> Bpchar,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_location (deduplication_id) {
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
latitude -> Float8,
|
||||
longitude -> Float8,
|
||||
altitude -> Float8,
|
||||
source -> Text,
|
||||
accuracy -> Float4,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_log (id) {
|
||||
id -> Int8,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
level -> Text,
|
||||
code -> Text,
|
||||
description -> Text,
|
||||
context -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_status (deduplication_id) {
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
margin -> Int2,
|
||||
external_power_source -> Bool,
|
||||
battery_level_unavailable -> Bool,
|
||||
battery_level -> Float4,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_tx_ack (queue_item_id) {
|
||||
queue_item_id -> Uuid,
|
||||
downlink_id -> Int8,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
f_cnt_down -> Int8,
|
||||
gateway_id -> Bpchar,
|
||||
tx_info -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
table! {
|
||||
event_up (deduplication_id) {
|
||||
deduplication_id -> Uuid,
|
||||
time -> Timestamptz,
|
||||
tenant_id -> Uuid,
|
||||
tenant_name -> Text,
|
||||
application_id -> Uuid,
|
||||
application_name -> Text,
|
||||
device_profile_id -> Uuid,
|
||||
device_profile_name -> Text,
|
||||
device_name -> Text,
|
||||
dev_eui -> Bpchar,
|
||||
tags -> Jsonb,
|
||||
dev_addr -> Bpchar,
|
||||
adr -> Bool,
|
||||
dr -> Int2,
|
||||
f_cnt -> Int8,
|
||||
f_port -> Int2,
|
||||
confirmed -> Bool,
|
||||
data -> Bytea,
|
||||
object -> Jsonb,
|
||||
rx_info -> Jsonb,
|
||||
tx_info -> Jsonb,
|
||||
}
|
||||
}
|
||||
|
||||
allow_tables_to_appear_in_same_query!(
|
||||
event_ack,
|
||||
event_integration,
|
||||
event_join,
|
||||
event_location,
|
||||
event_log,
|
||||
event_status,
|
||||
event_tx_ack,
|
||||
event_up,
|
||||
);
|
Loading…
Reference in New Issue
Block a user