Fix remaining blocking code after async Redis refactor.

This code still used channel.blocking_send(..) and sleep from std::time.
As async anonymous functions are not (yet) supported, this moves the
stream handling into a separate async function.
This commit is contained in:
Orne Brocaar 2023-12-04 14:19:01 +00:00
parent 53a570b0b0
commit a0d4e80720
2 changed files with 301 additions and 290 deletions

View File

@ -1,11 +1,11 @@
use std::io::Cursor;
use std::thread::sleep;
use std::time::Duration;
use anyhow::{Context, Result};
use prost::Message;
use redis::streams::StreamReadReply;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{debug, error, trace};
use crate::config;
@ -85,201 +85,7 @@ pub async fn get_event_logs(
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), pl.dr.to_string()),
("FPort".to_string(), pl.f_port.to_string()),
("FCnt".to_string(), pl.f_cnt.to_string()),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"join" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"ack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"txack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"status" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
(
"Battery level".into(),
format!("{:.0}%", pl.battery_level),
),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"log" => {
trace!(key = %k, id =%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"location" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.blocking_send(pl)?;
}
}
"integration" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
}
}
Ok(())
}();
let res = handle_stream(&last_id, &channel, k, v).await;
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
@ -296,6 +102,204 @@ pub async fn get_event_logs(
}
}
sleep(Duration::from_secs(1));
// If we use xread with block=0, the connection can't be used by other requests. Now we
// check every 1 second if there are new messages, which should be sufficient.
sleep(Duration::from_secs(1)).await;
}
}
async fn handle_stream(
stream_id: &str,
channel: &mpsc::Sender<api::LogItem>,
k: &str,
v: &redis::Value,
) -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), pl.dr.to_string()),
("FPort".to_string(), pl.f_port.to_string()),
("FCnt".to_string(), pl.f_cnt.to_string()),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
"join" => {
trace!(key = %k, id = %stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
"ack" => {
trace!(key = %k, id = %stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.send(pl).await?;
}
}
"txack" => {
trace!(key = %k, id = %stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.send(pl).await?;
}
}
"status" => {
trace!(key = %k, id = %stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
("Battery level".into(), format!("{:.0}%", pl.battery_level)),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
"log" => {
trace!(key = %k, id =%stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
"location" => {
trace!(key = %k, id=%stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
channel.send(pl).await?;
}
}
"integration" => {
trace!(key = %k, id=%stream_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp {
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.to_string(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
}
}
Ok(())
}

View File

@ -1,6 +1,5 @@
use std::io::Cursor;
use std::str::FromStr;
use std::thread::sleep;
use std::time::Duration;
use anyhow::{Context, Result};
@ -8,6 +7,7 @@ use prost::Message;
use redis::streams::StreamReadReply;
use serde_json::json;
use tokio::sync::mpsc;
use tokio::time::sleep;
use tracing::{debug, error, trace, warn};
use lrwn::EUI64;
@ -249,97 +249,7 @@ pub async fn get_frame_logs(
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
"rx_info": pl.rx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
"down" => {
trace!(key = %k, id = %last_id, "frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
("Gateway ID".to_string(), pl.gateway_id),
]
.iter()
.cloned()
.collect(),
};
channel.blocking_send(pl)?;
}
}
_ => {
error!(key = %k, "Unexpected key in frame-log stream");
}
}
Ok(())
}();
let res = handle_stream(&last_id, &channel, k, v).await;
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
@ -358,6 +268,103 @@ pub async fn get_frame_logs(
// If we use xread with block=0, the connection can't be used by other requests. Now we
// check every 1 second if there are new messages, which should be sufficient.
sleep(Duration::from_secs(1));
sleep(Duration::from_secs(1)).await;
}
}
async fn handle_stream(
stream_id: &str,
channel: &mpsc::Sender<api::LogItem>,
k: &str,
v: &redis::Value,
) -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %stream_id, "Frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::UplinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
"rx_info": pl.rx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
"down" => {
trace!(key = %k, id = %stream_id, "frame-log received from stream");
if let redis::Value::Data(b) = v {
let pl = stream::DownlinkFrameLog::decode(&mut Cursor::new(b))?;
let mut phy = lrwn::PhyPayload::from_slice(&pl.phy_payload)?;
if pl.plaintext_f_opts {
if let Err(e) = phy.decode_f_opts_to_mac_commands() {
warn!(error = %e.full(), "Decode f_opts to mac-commands error");
}
}
if pl.plaintext_frm_payload {
if let Err(e) = phy.decode_frm_payload() {
warn!(error = %e.full(), "Decode frm_payload error");
}
}
let pl = api::LogItem {
id: stream_id.to_string(),
time: pl.time.as_ref().map(|t| prost_types::Timestamp {
seconds: t.seconds,
nanos: t.nanos,
}),
description: pl.m_type().into(),
body: json!({
"phy_payload": phy,
"tx_info": pl.tx_info,
})
.to_string(),
properties: [
("DevAddr".to_string(), pl.dev_addr),
("DevEUI".to_string(), pl.dev_eui),
("Gateway ID".to_string(), pl.gateway_id),
]
.iter()
.cloned()
.collect(),
};
channel.send(pl).await?;
}
}
_ => {
error!(key = %k, "Unexpected key in frame-log stream");
}
}
Ok(())
}