diff --git a/chirpstack/src/cmd/configfile.rs b/chirpstack/src/cmd/configfile.rs index 3b3c5a34..1940dc39 100644 --- a/chirpstack/src/cmd/configfile.rs +++ b/chirpstack/src/cmd/configfile.rs @@ -303,6 +303,33 @@ pub fn run() { tls_key="{{ integration.mqtt.tls_key }}" + # PostgreSQL integration configuration. + [integration.postgresql] + + # PostgreSQL DSN. + # + # Format example: postgres://:@/?sslmode=. + # + # SSL mode options: + # * disable - no SSL + # * require - Always SSL (skip verification) + # * verify-ca - Always SSL (verify that the certificate presented by the server was signed by a trusted CA) + # * verify-full - Always SSL (verify that the certification presented by the server was signed by a trusted CA and the server host name matches the one in the certificate) + dsn="{{ integration.postgresql.dsn }}" + + # Max open connections. + # + # This sets the max. number of open connections that are allowed in the + # PostgreSQL connection pool. + max_open_connections={{ integration.postgresql.max_open_connections }} + + # Min idle connections. + # + # This sets the min. number of idle connections in the PostgreSQL connection + # pool (0 = equal to max_open_connections). + min_idle_connections={{ integration.postgresql.min_idle_connections }} + + # Codec configuration. [codec] diff --git a/chirpstack/src/config.rs b/chirpstack/src/config.rs index 7b120347..5e0ca8c0 100644 --- a/chirpstack/src/config.rs +++ b/chirpstack/src/config.rs @@ -240,6 +240,7 @@ impl Default for Monitoring { pub struct Integration { pub enabled: Vec, pub mqtt: MqttIntegration, + pub postgresql: PostgresqlIntegration, } #[derive(Serialize, Deserialize, Clone)] @@ -302,6 +303,24 @@ impl Default for MqttIntegrationClient { } } +#[derive(Serialize, Deserialize, Clone)] +#[serde(default)] +pub struct PostgresqlIntegration { + pub dsn: String, + pub max_open_connections: u32, + pub min_idle_connections: u32, +} + +impl Default for PostgresqlIntegration { + fn default() -> Self { + PostgresqlIntegration { + dsn: "postgresql://chirpstack_integration:chirpstack_integration@localhost/chirpstack_integration?sslmode=disable".into(), + max_open_connections: 10, + min_idle_connections: 0, + } + } +} + #[derive(Serialize, Deserialize, Clone, Default)] #[serde(default)] pub struct Codec { diff --git a/chirpstack/src/integration/mod.rs b/chirpstack/src/integration/mod.rs index 20624c30..db2047ca 100644 --- a/chirpstack/src/integration/mod.rs +++ b/chirpstack/src/integration/mod.rs @@ -24,6 +24,7 @@ pub mod mock; mod mqtt; mod mydevices; mod pilot_things; +mod postgresql; mod redis; mod thingsboard; @@ -49,6 +50,10 @@ pub async fn setup() -> Result<()> { .context("Setup MQTT integration")?, )); } + "postgresql" => integrations.push(Box::new( + postgresql::Integration::new(&conf.integration.postgresql) + .context("Setup PostgreSQL integration")?, + )), _ => { return Err(anyhow!("Unexpected integration: {}", name)); } diff --git a/chirpstack/src/integration/postgresql/diesel.toml b/chirpstack/src/integration/postgresql/diesel.toml new file mode 100644 index 00000000..56bd2de8 --- /dev/null +++ b/chirpstack/src/integration/postgresql/diesel.toml @@ -0,0 +1,5 @@ +# For documentation on how to configure this file, +# see diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "schema.rs" diff --git a/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/down.sql b/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/down.sql new file mode 100644 index 00000000..45cb13ff --- /dev/null +++ b/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/down.sql @@ -0,0 +1,8 @@ +drop table event_integration; +drop table event_location; +drop table event_status; +drop table event_log; +drop table event_tx_ack; +drop table event_ack; +drop table event_join; +drop table event_up; diff --git a/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/up.sql b/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/up.sql new file mode 100644 index 00000000..6d04a544 --- /dev/null +++ b/chirpstack/src/integration/postgresql/migrations/00000000000000_initial/up.sql @@ -0,0 +1,145 @@ +create table event_up ( + deduplication_id uuid primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + dev_addr char(8) not null, + adr boolean not null, + dr smallint not null, + f_cnt bigint not null, + f_port smallint not null, + confirmed boolean not null, + data bytea not null, + object jsonb not null, + rx_info jsonb not null, + tx_info jsonb not null +); + +create table event_join ( + deduplication_id uuid primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + dev_addr char(8) not null +); + +create table event_ack ( + queue_item_id uuid primary key, + deduplication_id uuid not null, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + acknowledged boolean not null, + f_cnt_down bigint not null +); + +create table event_tx_ack ( + queue_item_id uuid primary key, + downlink_id bigint not null, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + f_cnt_down bigint not null, + gateway_id char(16) not null, + tx_info jsonb not null +); + +create table event_log ( + id bigserial primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + level text not null, + code text not null, + description text not null, + context jsonb not null +); + +create table event_status ( + deduplication_id uuid primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + margin smallint not null, + external_power_source boolean not null, + battery_level_unavailable boolean not null, + battery_level real not null +); + +create table event_location ( + deduplication_id uuid primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + latitude double precision not null, + longitude double precision not null, + altitude double precision not null, + source text not null, + accuracy real not null +); + +create table event_integration ( + deduplication_id uuid primary key, + time timestamp with time zone not null, + tenant_id uuid not null, + tenant_name text not null, + application_id uuid not null, + application_name text not null, + device_profile_id uuid not null, + device_profile_name text not null, + device_name text not null, + dev_eui char(16) not null, + tags jsonb not null, + integration_name text not null, + event_type text not null, + object jsonb not null +); diff --git a/chirpstack/src/integration/postgresql/mod.rs b/chirpstack/src/integration/postgresql/mod.rs new file mode 100644 index 00000000..52f1d78e --- /dev/null +++ b/chirpstack/src/integration/postgresql/mod.rs @@ -0,0 +1,522 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use anyhow::{Context, Result}; +use async_trait::async_trait; +use chrono::{DateTime, Utc}; +use diesel::pg::PgConnection; +use diesel::prelude::*; +use diesel::r2d2::{ConnectionManager, Pool, PooledConnection}; +use diesel_migrations::embed_migrations; +use tokio::task; +use tracing::info; +use uuid::Uuid; + +use super::Integration as IntegrationTrait; +use crate::config::PostgresqlIntegration as Config; +use chirpstack_api::integration; +use schema::{ + event_ack, event_integration, event_join, event_location, event_log, event_status, + event_tx_ack, event_up, +}; + +mod schema; + +embed_migrations!("./src/integration/postgresql/migrations"); + +type PgPool = Pool>; +type PgPoolConnection = PooledConnection>; + +#[derive(Insertable)] +#[table_name = "event_up"] +struct EventUp { + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub dev_addr: String, + pub adr: bool, + pub dr: i16, + pub f_cnt: i64, + pub f_port: i16, + pub confirmed: bool, + pub data: Vec, + pub object: serde_json::Value, + pub rx_info: serde_json::Value, + pub tx_info: serde_json::Value, +} + +#[derive(Insertable)] +#[table_name = "event_join"] +struct EventJoin { + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub dev_addr: String, +} + +#[derive(Insertable)] +#[table_name = "event_ack"] +struct EventAck { + pub queue_item_id: Uuid, + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub acknowledged: bool, + pub f_cnt_down: i64, +} + +#[derive(Insertable)] +#[table_name = "event_tx_ack"] +struct EventTxAck { + pub queue_item_id: Uuid, + pub downlink_id: i64, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub f_cnt_down: i64, + pub gateway_id: String, + pub tx_info: serde_json::Value, +} + +#[derive(Insertable)] +#[table_name = "event_log"] +struct EventLog { + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub level: String, + pub code: String, + pub description: String, + pub context: serde_json::Value, +} + +#[derive(Insertable)] +#[table_name = "event_status"] +struct EventStatus { + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub margin: i16, + pub external_power_source: bool, + pub battery_level_unavailable: bool, + pub battery_level: f32, +} + +#[derive(Insertable)] +#[table_name = "event_location"] +struct EventLocation { + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub latitude: f64, + pub longitude: f64, + pub altitude: f64, + pub source: String, + pub accuracy: f32, +} + +#[derive(Insertable)] +#[table_name = "event_integration"] +struct EventIntegration { + pub deduplication_id: Uuid, + pub time: DateTime, + pub tenant_id: Uuid, + pub tenant_name: String, + pub application_id: Uuid, + pub application_name: String, + pub device_profile_id: Uuid, + pub device_profile_name: String, + pub device_name: String, + pub dev_eui: String, + pub tags: serde_json::Value, + pub integration_name: String, + pub event_type: String, + pub object: serde_json::Value, +} + +pub struct Integration { + pg_pool: PgPool, +} + +impl Integration { + pub fn new(conf: &Config) -> Result { + info!("Initializing PostgreSQL integration"); + + let pg_pool = PgPool::builder() + .max_size(conf.max_open_connections) + .min_idle(match conf.min_idle_connections { + 0 => None, + _ => Some(conf.min_idle_connections), + }) + .build(ConnectionManager::new(&conf.dsn)) + .context("Setup PostgreSQL connection pool error")?; + let db_conn = pg_pool.get()?; + + info!("Applying schema migrations"); + embedded_migrations::run(&db_conn).context("Run migrations error")?; + + Ok(Integration { pg_pool: pg_pool }) + } +} + +#[async_trait] +impl IntegrationTrait for Integration { + async fn uplink_event( + &self, + _vars: &HashMap, + pl: &integration::UplinkEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "up", "Inserting event"); + + let e = EventUp { + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + dev_addr: pl.dev_addr.clone(), + adr: pl.adr, + dr: pl.dr as i16, + f_cnt: pl.f_cnt_up as i64, + f_port: pl.f_port as i16, + confirmed: pl.confirmed, + data: pl.data.clone(), + object: serde_json::to_value(&pl.object)?, + rx_info: serde_json::to_value(&pl.rx_info)?, + tx_info: serde_json::to_value(&pl.tx_info)?, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_up::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn join_event( + &self, + _vars: &HashMap, + pl: &integration::JoinEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "join", "Inserting event"); + + let e = EventJoin { + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + dev_addr: pl.dev_addr.clone(), + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_join::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn ack_event( + &self, + _vars: &HashMap, + pl: &integration::AckEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "ack", "Inserting event"); + + let e = EventAck { + queue_item_id: Uuid::from_str(&pl.queue_item_id)?, + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + acknowledged: pl.acknowledged, + f_cnt_down: pl.f_cnt_down as i64, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_ack::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn txack_event( + &self, + _vars: &HashMap, + pl: &integration::TxAckEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "txack", "Inserting event"); + + let e = EventTxAck { + queue_item_id: Uuid::from_str(&pl.queue_item_id)?, + downlink_id: pl.downlink_id as i64, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + f_cnt_down: pl.f_cnt_down as i64, + gateway_id: pl.gateway_id.clone(), + tx_info: serde_json::to_value(&pl.tx_info)?, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_tx_ack::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn log_event( + &self, + _vars: &HashMap, + pl: &integration::LogEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "log", "Inserting event"); + + let e = EventLog { + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + level: pl.level.to_string(), + code: pl.code.to_string(), + description: pl.description.clone(), + context: serde_json::to_value(&pl.context)?, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_log::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn status_event( + &self, + _vars: &HashMap, + pl: &integration::StatusEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "status", "Inserting event"); + + let e = EventStatus { + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + margin: pl.margin as i16, + external_power_source: pl.external_power_source, + battery_level_unavailable: pl.battery_level_unavailable, + battery_level: pl.battery_level, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_status::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + Ok(()) + } + + async fn location_event( + &self, + _vars: &HashMap, + pl: &integration::LocationEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + let loc = pl.location.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "location", "Inserting event"); + + let e = EventLocation { + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + latitude: loc.latitude, + longitude: loc.longitude, + altitude: loc.altitude, + source: loc.source.to_string(), + accuracy: loc.accuracy, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_location::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } + + async fn integration_event( + &self, + _vars: &HashMap, + pl: &integration::IntegrationEvent, + ) -> Result<()> { + let di = pl.device_info.as_ref().unwrap(); + info!(dev_eui = %di.dev_eui, event = "integration", "Inserting event"); + + let e = EventIntegration { + deduplication_id: Uuid::from_str(&pl.deduplication_id)?, + time: pl.time.as_ref().unwrap().clone().try_into()?, + tenant_id: Uuid::from_str(&di.tenant_id)?, + tenant_name: di.tenant_name.clone(), + application_id: Uuid::from_str(&di.application_id)?, + application_name: di.application_name.clone(), + device_profile_id: Uuid::from_str(&di.device_profile_id)?, + device_profile_name: di.device_profile_name.clone(), + device_name: di.device_name.clone(), + dev_eui: di.dev_eui.clone(), + tags: serde_json::to_value(&di.tags)?, + integration_name: pl.integration_name.clone(), + event_type: pl.event_type.clone(), + object: serde_json::to_value(&pl.object)?, + }; + let c = self.pg_pool.get()?; + + task::spawn_blocking(move || -> Result<()> { + diesel::insert_into(event_integration::table) + .values(&e) + .execute(&c)?; + Ok(()) + }) + .await??; + + Ok(()) + } +} diff --git a/chirpstack/src/integration/postgresql/schema.rs b/chirpstack/src/integration/postgresql/schema.rs new file mode 100644 index 00000000..2e6989bb --- /dev/null +++ b/chirpstack/src/integration/postgresql/schema.rs @@ -0,0 +1,172 @@ +table! { + event_ack (queue_item_id) { + queue_item_id -> Uuid, + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + acknowledged -> Bool, + f_cnt_down -> Int8, + } +} + +table! { + event_integration (deduplication_id) { + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + integration_name -> Text, + event_type -> Text, + object -> Jsonb, + } +} + +table! { + event_join (deduplication_id) { + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + dev_addr -> Bpchar, + } +} + +table! { + event_location (deduplication_id) { + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + latitude -> Float8, + longitude -> Float8, + altitude -> Float8, + source -> Text, + accuracy -> Float4, + } +} + +table! { + event_log (id) { + id -> Int8, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + level -> Text, + code -> Text, + description -> Text, + context -> Jsonb, + } +} + +table! { + event_status (deduplication_id) { + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + margin -> Int2, + external_power_source -> Bool, + battery_level_unavailable -> Bool, + battery_level -> Float4, + } +} + +table! { + event_tx_ack (queue_item_id) { + queue_item_id -> Uuid, + downlink_id -> Int8, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + f_cnt_down -> Int8, + gateway_id -> Bpchar, + tx_info -> Jsonb, + } +} + +table! { + event_up (deduplication_id) { + deduplication_id -> Uuid, + time -> Timestamptz, + tenant_id -> Uuid, + tenant_name -> Text, + application_id -> Uuid, + application_name -> Text, + device_profile_id -> Uuid, + device_profile_name -> Text, + device_name -> Text, + dev_eui -> Bpchar, + tags -> Jsonb, + dev_addr -> Bpchar, + adr -> Bool, + dr -> Int2, + f_cnt -> Int8, + f_port -> Int2, + confirmed -> Bool, + data -> Bytea, + object -> Jsonb, + rx_info -> Jsonb, + tx_info -> Jsonb, + } +} + +allow_tables_to_appear_in_same_query!( + event_ack, + event_integration, + event_join, + event_location, + event_log, + event_status, + event_tx_ack, + event_up, +);