From f49fdf1c2c986032a188299dd0b5e7c80e95b471 Mon Sep 17 00:00:00 2001 From: Orne Brocaar Date: Thu, 2 Nov 2023 14:07:13 +0000 Subject: [PATCH] Log incoming Backend Interfaces requests. --- chirpstack/src/api/backend/mod.rs | 95 +++++++++++++++++++++++++------ 1 file changed, 78 insertions(+), 17 deletions(-) diff --git a/chirpstack/src/api/backend/mod.rs b/chirpstack/src/api/backend/mod.rs index 2d950bd7..10250ad5 100644 --- a/chirpstack/src/api/backend/mod.rs +++ b/chirpstack/src/api/backend/mod.rs @@ -5,10 +5,12 @@ use std::sync::Arc; use std::time::Duration; use anyhow::Result; +use chrono::Utc; use redis::streams::StreamReadReply; +use serde::Serialize; use tokio::sync::oneshot; use tokio::task; -use tracing::{debug, error, info, span, warn, Instrument, Level}; +use tracing::{error, info, span, warn, Instrument, Level}; use uuid::Uuid; use warp::{http::StatusCode, Filter, Reply}; @@ -19,8 +21,9 @@ use crate::storage::{ device_session, error::Error as StorageError, get_redis_conn, passive_roaming, redis_key, }; use crate::uplink::{data_sns, helpers, join_sns, RoamingMetaData, UplinkFrameSet}; -use crate::{config, region}; -use backend::{BasePayload, MessageType}; +use crate::{config, region, stream}; +use backend::{BasePayload, BasePayloadResultProvider, MessageType}; +use chirpstack_api::stream as stream_pb; use lrwn::region::CommonName; use lrwn::{AES128Key, NetID, EUI64}; @@ -70,9 +73,6 @@ pub async fn handle_request(mut body: impl warp::Buf) -> http::Response v, Err(e) => { @@ -97,6 +97,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec) -> http::Response) -> http::Response) -> http::Response) -> http::Response) -> http::Response) -> http::Response http::Response { +fn err_to_response(e: anyhow::Error, bp: &backend::BasePayload) -> backend::BasePayloadResult { let msg = format!("{}", e); - let pl = bp.to_base_payload_result(err_to_result_code(e), &msg); - warp::reply::json(&pl).into_response() + bp.to_base_payload_result(err_to_result_code(e), &msg) } fn err_to_result_code(e: anyhow::Error) -> backend::ResultCode { @@ -203,16 +207,25 @@ async fn handle_pr_start_req( } }; + log_request_response(&bp, &b, &ans).await; + if let Err(e) = sender_client.pr_start_ans(backend::Role::FNS, &ans).await { - error!(error = %e.full(), "Send async PRStartAns error"); + error!(error = %e.full(), transaction_id = bp.transaction_id, "Send async PRStartAns error"); } }); warp::reply::with_status("", StatusCode::OK).into_response() } else { match _handle_pr_start_req(b).await { - Ok(v) => warp::reply::json(&v).into_response(), - Err(e) => err_to_response(e, &bp), + Ok(ans) => { + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } + Err(e) => { + let ans = err_to_response(e, &bp); + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } } } } @@ -351,6 +364,8 @@ async fn handle_pr_stop_req( } }; + log_request_response(&bp, &b, &ans).await; + if let Err(e) = sender_client.pr_stop_ans(backend::Role::SNS, &ans).await { error!(error = %e.full(), "Send async PRStopAns error"); } @@ -359,8 +374,15 @@ async fn handle_pr_stop_req( warp::reply::with_status("", StatusCode::OK).into_response() } else { match _handle_pr_stop_req(b).await { - Ok(v) => warp::reply::json(&v).into_response(), - Err(e) => err_to_response(e, &bp), + Ok(ans) => { + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } + Err(e) => { + let ans = err_to_response(e, &bp); + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } } } } @@ -399,11 +421,14 @@ async fn handle_xmit_data_req( let pl: backend::XmitDataReqPayload = match serde_json::from_slice(b) { Ok(v) => v, Err(e) => { - return err_to_response(anyhow::Error::new(e), &bp); + let ans = err_to_response(anyhow::Error::new(e), &bp); + log_request_response(&bp, b, &ans).await; + return warp::reply::json(&ans).into_response(); } }; if sender_client.is_async() { + let b = b.to_vec(); task::spawn(async move { let sender_role = if pl.ul_meta_data.is_some() { backend::Role::FNS @@ -421,6 +446,8 @@ async fn handle_xmit_data_req( } }; + log_request_response(&bp, &b, &ans).await; + if let Err(e) = sender_client.xmit_data_ans(sender_role, &ans).await { error!(error = %e.full(), "Send async XmitDataAns error"); } @@ -429,8 +456,15 @@ async fn handle_xmit_data_req( warp::reply::with_status("", StatusCode::OK).into_response() } else { match _handle_xmit_data_req(pl).await { - Ok(v) => warp::reply::json(&v).into_response(), - Err(e) => err_to_response(e, &bp), + Ok(ans) => { + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } + Err(e) => { + let ans = err_to_response(e, &bp); + log_request_response(&bp, b, &ans).await; + warp::reply::json(&ans).into_response() + } } } } @@ -558,6 +592,33 @@ pub async fn get_async_receiver( Ok(rx) } +async fn log_request_response(bp: &backend::BasePayload, req_body: &[u8], resp: &T) +where + T: Serialize + BasePayloadResultProvider, +{ + // The incoming request is an async answer. + // This is already logged by the backend client. + if bp.is_answer() { + return; + } + + let be_req_log = stream_pb::BackendInterfacesRequest { + sender_id: hex::encode(&bp.sender_id), + receiver_id: hex::encode(&bp.receiver_id), + transaction_id: bp.transaction_id, + message_type: format!("{:?}", bp.message_type), + request_body: String::from_utf8(req_body.to_vec()).unwrap_or_default(), + response_body: serde_json::to_string(resp).unwrap_or_default(), + result_code: format!("{:?}", resp.base_payload().result.result_code), + time: Some(Utc::now().into()), + ..Default::default() + }; + + if let Err(e) = stream::backend_interfaces::log_request(be_req_log).await { + error!(error = %e.full(), "Log Backend Interfaces request error"); + } +} + #[cfg(test)] pub mod test { use super::*;