Check if channel is closed. Fix blocking_send error.

In order to properly handle the SendError, we must return the original
error, instead of creating a new anyhow::Error.
This commit is contained in:
Orne Brocaar
2022-11-04 14:29:04 +00:00
parent 0b7ee05c6f
commit 347ac6fcfa
2 changed files with 15 additions and 25 deletions

View File

@ -743,6 +743,7 @@ impl InternalService for Internal {
// detect client disconnect // detect client disconnect
_ = close_rx.recv() => { _ = close_rx.recv() => {
debug!("Client disconnected"); debug!("Client disconnected");
redis_rx.close();
break; break;
}, },
// detect get_event_logs function return // detect get_event_logs function return

View File

@ -7,7 +7,7 @@ use prost::Message;
use redis::streams::StreamReadReply; use redis::streams::StreamReadReply;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::task; use tokio::task;
use tracing::{error, trace}; use tracing::{debug, error, trace};
use crate::config; use crate::config;
use crate::storage::{get_redis_conn, redis_key}; use crate::storage::{get_redis_conn, redis_key};
@ -83,6 +83,11 @@ pub async fn get_event_logs(
let mut c = get_redis_conn()?; let mut c = get_redis_conn()?;
loop { loop {
if channel.is_closed() {
debug!("Channel has been closed, returning");
return Ok(());
}
let srr: StreamReadReply = redis::cmd("XREAD") let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT") .arg("COUNT")
.arg(count) .arg(count)
@ -120,9 +125,7 @@ pub async fn get_event_logs(
.collect(), .collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"join" => { "join" => {
@ -143,9 +146,7 @@ pub async fn get_event_logs(
.collect(), .collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"ack" => { "ack" => {
@ -163,9 +164,7 @@ pub async fn get_event_logs(
properties: [].iter().cloned().collect(), properties: [].iter().cloned().collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"txack" => { "txack" => {
@ -183,9 +182,7 @@ pub async fn get_event_logs(
properties: [].iter().cloned().collect(), properties: [].iter().cloned().collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"status" => { "status" => {
@ -220,9 +217,7 @@ pub async fn get_event_logs(
.collect(), .collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"log" => { "log" => {
@ -246,9 +241,7 @@ pub async fn get_event_logs(
.collect(), .collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"location" => { "location" => {
@ -266,9 +259,7 @@ pub async fn get_event_logs(
properties: [].iter().cloned().collect(), properties: [].iter().cloned().collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
"integration" => { "integration" => {
@ -293,9 +284,7 @@ pub async fn get_event_logs(
.collect(), .collect(),
}; };
if channel.blocking_send(pl).is_err() { channel.blocking_send(pl)?;
return Err(anyhow!("Channel send error"));
}
} }
} }
_ => { _ => {