Fix 'Cannot serialize NaN as google.protobuf.Value.number_value'.

In case the codec returns a NaN float, this is converted to None to
avoid Protobuf serialization error. This also fixes the eventlog such
that it does not break out of the loop (unless it is a channel error).
Messages that can't be processed will be ignored and an error is
printed.

Fixes https://github.com/chirpstack/chirpstack-v3-to-v4/issues/4.
This commit is contained in:
Orne Brocaar 2022-10-06 12:31:11 +01:00
parent 5f4d03f3e4
commit e04f991e76
2 changed files with 215 additions and 194 deletions

View File

@ -14,9 +14,15 @@ fn _rquickjs_to_struct_val(val: &rquickjs::Value) -> Option<pbjson_types::value:
rquickjs::Type::Int => Some(pbjson_types::value::Kind::NumberValue(
val.as_int().unwrap().into(),
)),
rquickjs::Type::Float => Some(pbjson_types::value::Kind::NumberValue(
val.as_float().unwrap(),
)),
rquickjs::Type::Float => {
let v = val.as_float().unwrap();
if v.is_nan() {
// Avoid Cannot serialize NaN as google.protobuf.Value.number_value error.
None
} else {
Some(pbjson_types::value::Kind::NumberValue(v))
}
}
rquickjs::Type::String => Some(pbjson_types::value::Kind::StringValue(
val.as_string().unwrap().to_string().unwrap(),
)),

View File

@ -96,210 +96,225 @@ pub async fn get_event_logs(
for stream_id in &stream_key.ids {
last_id = stream_id.id.clone();
for (k, v) in &stream_id.map {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), format!("{}", pl.dr)),
("FPort".to_string(), format!("{}", pl.f_port)),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"join" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
let res = || -> Result<()> {
match k.as_ref() {
"up" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("DR".to_string(), format!("{}", pl.dr)),
("FPort".to_string(), format!("{}", pl.f_port)),
("Data".to_string(), hex::encode(&pl.data)),
]
.iter()
.cloned()
.collect(),
};
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
}
"ack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"txack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"status" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
(
"Battery level".into(),
format!("{:.0}%", pl.battery_level),
),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"log" => {
trace!(key = %k, id =%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"location" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"integration" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
"join" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [("DevAddr".to_string(), pl.dev_addr)]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"ack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::AckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"txack" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"status" => {
trace!(key = %k, id = %last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Margin".into(), format!("{}", pl.margin)),
(
"Battery level".into(),
format!("{:.0}%", pl.battery_level),
),
(
"Battery level unavailable".into(),
format!("{}", pl.battery_level_unavailable),
),
(
"External power source".into(),
format!("{}", pl.external_power_source),
),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"log" => {
trace!(key = %k, id =%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LogEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Level".into(), pl.level().into()),
("Code".into(), pl.code().into()),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"location" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [].iter().cloned().collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
"integration" => {
trace!(key = %k, id=%last_id, "Event-log received from stream");
if let redis::Value::Data(b) = v {
let pl =
integration::IntegrationEvent::decode(&mut Cursor::new(b))?;
let pl = api::LogItem {
id: stream_id.id.clone(),
time: pl.time.as_ref().map(|v| prost_types::Timestamp{
seconds: v.seconds,
nanos: v.nanos,
}),
description: k.clone(),
body: serde_json::to_string(&pl)?,
properties: [
("Integration".into(), pl.integration_name.clone()),
("Event".into(), pl.event_type.clone()),
]
.iter()
.cloned()
.collect(),
};
if channel.blocking_send(pl).is_err() {
return Err(anyhow!("Channel send error"));
}
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
}
}
_ => {
error!(key = %k, "Unexpected key in in event-log stream");
Ok(())
}();
if let Err(e) = res {
// Return in case of channel error, in any other case we just log
// the error.
if e.downcast_ref::<mpsc::error::SendError<api::LogItem>>().is_some() {
return Err(e);
}
error!(key = %k, error = %e, "Parsing frame-log error");
}
}
}