From 347ac6fcfaf1f1f52e100fe73485da64d45e8411 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Fri, 4 Nov 2022 14:29:04 +0000 Subject: [PATCH] Check if channel is closed. Fix blocking_send error. In order to properly handle the SendError, we must return the original error, instead of creating a new anyhow::Error. --- chirpstack/src/api/internal.rs | 1 + chirpstack/src/eventlog.rs | 39 ++++++++++++---------------------- 2 files changed, 15 insertions(+), 25 deletions(-) diff --git a/chirpstack/src/api/internal.rs b/chirpstack/src/api/internal.rs index 32527b0f..c8fb63bc 100644 --- a/chirpstack/src/api/internal.rs +++ b/chirpstack/src/api/internal.rs @@ -743,6 +743,7 @@ impl InternalService for Internal { // detect client disconnect _ = close_rx.recv() => { debug!("Client disconnected"); + redis_rx.close(); break; }, // detect get_event_logs function return diff --git a/chirpstack/src/eventlog.rs b/chirpstack/src/eventlog.rs index 5f504f4d..6b4f82a8 100644 --- a/chirpstack/src/eventlog.rs +++ b/chirpstack/src/eventlog.rs @@ -7,7 +7,7 @@ use prost::Message; use redis::streams::StreamReadReply; use tokio::sync::mpsc; use tokio::task; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; use crate::config; use crate::storage::{get_redis_conn, redis_key}; @@ -83,6 +83,11 @@ pub async fn get_event_logs( let mut c = get_redis_conn()?; loop { + if channel.is_closed() { + debug!("Channel has been closed, returning"); + return Ok(()); + } + let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") .arg(count) @@ -120,9 +125,7 @@ pub async fn get_event_logs( .collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "join" => { @@ -143,9 +146,7 @@ pub async fn get_event_logs( .collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "ack" => { @@ -163,9 +164,7 @@ pub async fn get_event_logs( properties: [].iter().cloned().collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "txack" => { @@ -183,9 +182,7 @@ pub async fn get_event_logs( properties: [].iter().cloned().collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "status" => { @@ -220,9 +217,7 @@ pub async fn get_event_logs( .collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "log" => { @@ -246,9 +241,7 @@ pub async fn get_event_logs( .collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "location" => { @@ -266,9 +259,7 @@ pub async fn get_event_logs( properties: [].iter().cloned().collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } "integration" => { @@ -293,9 +284,7 @@ pub async fn get_event_logs( .collect(), }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } + channel.blocking_send(pl)?; } } _ => {