From e04f991e76e8cf896cd3f11398d4a16c05c09e63 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 6 Oct 2022 12:31:11 +0100 Subject: [PATCH] Fix 'Cannot serialize NaN as google.protobuf.Value.number_value'. In case the codec returns a NaN float, this is converted to None to avoid Protobuf serialization error. This also fixes the eventlog such that it does not break out of the loop (unless it is a channel error). Messages that can't be processed will be ignored and an error is printed. Fixes https://github.com/chirpstack/chirpstack-v3-to-v4/issues/4. --- chirpstack/src/codec/convert.rs | 12 +- chirpstack/src/eventlog.rs | 397 +++++++++++++++++--------------- 2 files changed, 215 insertions(+), 194 deletions(-) diff --git a/chirpstack/src/codec/convert.rs b/chirpstack/src/codec/convert.rs index 01c9c081..e3508e93 100644 --- a/chirpstack/src/codec/convert.rs +++ b/chirpstack/src/codec/convert.rs @@ -14,9 +14,15 @@ fn _rquickjs_to_struct_val(val: &rquickjs::Value) -> Option Some(pbjson_types::value::Kind::NumberValue( val.as_int().unwrap().into(), )), - rquickjs::Type::Float => Some(pbjson_types::value::Kind::NumberValue( - val.as_float().unwrap(), - )), + rquickjs::Type::Float => { + let v = val.as_float().unwrap(); + if v.is_nan() { + // Avoid Cannot serialize NaN as google.protobuf.Value.number_value error. + None + } else { + Some(pbjson_types::value::Kind::NumberValue(v)) + } + } rquickjs::Type::String => Some(pbjson_types::value::Kind::StringValue( val.as_string().unwrap().to_string().unwrap(), )), diff --git a/chirpstack/src/eventlog.rs b/chirpstack/src/eventlog.rs index 491653d9..5f504f4d 100644 --- a/chirpstack/src/eventlog.rs +++ b/chirpstack/src/eventlog.rs @@ -96,210 +96,225 @@ pub async fn get_event_logs( for stream_id in &stream_key.ids { last_id = stream_id.id.clone(); for (k, v) in &stream_id.map { - match k.as_ref() { - "up" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("DR".to_string(), format!("{}", pl.dr)), - ("FPort".to_string(), format!("{}", pl.f_port)), - ("Data".to_string(), hex::encode(&pl.data)), - ] - .iter() - .cloned() - .collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "join" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [("DevAddr".to_string(), pl.dev_addr)] + let res = || -> Result<()> { + match k.as_ref() { + "up" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::UplinkEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("DR".to_string(), format!("{}", pl.dr)), + ("FPort".to_string(), format!("{}", pl.f_port)), + ("Data".to_string(), hex::encode(&pl.data)), + ] .iter() .cloned() .collect(), - }; + }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } } } - } - "ack" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::AckEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "txack" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "status" => { - trace!(key = %k, id = %last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Margin".into(), format!("{}", pl.margin)), - ( - "Battery level".into(), - format!("{:.0}%", pl.battery_level), - ), - ( - "Battery level unavailable".into(), - format!("{}", pl.battery_level_unavailable), - ), - ( - "External power source".into(), - format!("{}", pl.external_power_source), - ), - ] - .iter() - .cloned() - .collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "log" => { - trace!(key = %k, id =%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::LogEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Level".into(), pl.level().into()), - ("Code".into(), pl.code().into()), - ] - .iter() - .cloned() - .collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "location" => { - trace!(key = %k, id=%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ - seconds: v.seconds, - nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [].iter().cloned().collect(), - }; - - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); - } - } - } - "integration" => { - trace!(key = %k, id=%last_id, "Event-log received from stream"); - if let redis::Value::Data(b) = v { - let pl = - integration::IntegrationEvent::decode(&mut Cursor::new(b))?; - let pl = api::LogItem { - id: stream_id.id.clone(), - time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + "join" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::JoinEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ seconds: v.seconds, nanos: v.nanos, - }), - description: k.clone(), - body: serde_json::to_string(&pl)?, - properties: [ - ("Integration".into(), pl.integration_name.clone()), - ("Event".into(), pl.event_type.clone()), - ] - .iter() - .cloned() - .collect(), - }; + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [("DevAddr".to_string(), pl.dev_addr)] + .iter() + .cloned() + .collect(), + }; - if channel.blocking_send(pl).is_err() { - return Err(anyhow!("Channel send error")); + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } } } + "ack" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::AckEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + "txack" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::TxAckEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + "status" => { + trace!(key = %k, id = %last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::StatusEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Margin".into(), format!("{}", pl.margin)), + ( + "Battery level".into(), + format!("{:.0}%", pl.battery_level), + ), + ( + "Battery level unavailable".into(), + format!("{}", pl.battery_level_unavailable), + ), + ( + "External power source".into(), + format!("{}", pl.external_power_source), + ), + ] + .iter() + .cloned() + .collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + "log" => { + trace!(key = %k, id =%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::LogEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Level".into(), pl.level().into()), + ("Code".into(), pl.code().into()), + ] + .iter() + .cloned() + .collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + "location" => { + trace!(key = %k, id=%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = integration::LocationEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [].iter().cloned().collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + "integration" => { + trace!(key = %k, id=%last_id, "Event-log received from stream"); + if let redis::Value::Data(b) = v { + let pl = + integration::IntegrationEvent::decode(&mut Cursor::new(b))?; + let pl = api::LogItem { + id: stream_id.id.clone(), + time: pl.time.as_ref().map(|v| prost_types::Timestamp{ + seconds: v.seconds, + nanos: v.nanos, + }), + description: k.clone(), + body: serde_json::to_string(&pl)?, + properties: [ + ("Integration".into(), pl.integration_name.clone()), + ("Event".into(), pl.event_type.clone()), + ] + .iter() + .cloned() + .collect(), + }; + + if channel.blocking_send(pl).is_err() { + return Err(anyhow!("Channel send error")); + } + } + } + _ => { + error!(key = %k, "Unexpected key in in event-log stream"); + } + } - _ => { - error!(key = %k, "Unexpected key in in event-log stream"); + + Ok(()) + }(); + + if let Err(e) = res { + // Return in case of channel error, in any other case we just log + // the error. + if e.downcast_ref::>().is_some() { + return Err(e); } + + error!(key = %k, error = %e, "Parsing frame-log error"); } } }