From 51b622e8e2c56bf3de9a997fab883844860468a0 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Wed, 28 Sep 2022 12:25:33 +0100 Subject: [PATCH] Fix terminating stream loop on client disconnect. As the spawn_blocking thread is not terminated when the frame_log_future is dropped, it would keep the Redis connection open. By closing the redis_rx channel on client-disconnect, we can check if the channel has been closed inside the spawn_blocking thread, and return if this is the case. Closes #40. --- chirpstack/src/api/internal.rs | 1 + chirpstack/src/framelog.rs | 8 +++++++- 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/chirpstack/src/api/internal.rs b/chirpstack/src/api/internal.rs index 1cf5e1c6..32527b0f 100644 --- a/chirpstack/src/api/internal.rs +++ b/chirpstack/src/api/internal.rs @@ -675,6 +675,7 @@ impl InternalService for Internal { // detect client disconnect _ = close_rx.recv() => { debug!("Client disconnected"); + redis_rx.close(); break; } // detect get_frame_logs function return diff --git a/chirpstack/src/framelog.rs b/chirpstack/src/framelog.rs index 04aaa32a..97eff9b6 100644 --- a/chirpstack/src/framelog.rs +++ b/chirpstack/src/framelog.rs @@ -9,7 +9,7 @@ use redis::streams::StreamReadReply; use serde_json::json; use tokio::sync::mpsc; use tokio::task; -use tracing::{error, trace}; +use tracing::{debug, error, trace}; use lrwn::EUI64; @@ -256,6 +256,11 @@ pub async fn get_frame_logs( let mut c = get_redis_conn()?; loop { + if channel.is_closed() { + debug!("Channel has been closed, returning"); + return Ok(()); + } + let srr: StreamReadReply = redis::cmd("XREAD") .arg("COUNT") .arg(count) @@ -362,6 +367,7 @@ pub async fn get_frame_logs( // check every 1 second if there are new messages, which should be sufficient. sleep(Duration::from_secs(1)); } + } }).await? }