Log incoming Backend Interfaces requests.

This commit is contained in:
Orne Brocaar 2023-11-02 14:07:13 +00:00
parent a4b775e75a
commit f49fdf1c2c

View File

@ -5,10 +5,12 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use chrono::Utc;
use redis::streams::StreamReadReply;
use serde::Serialize;
use tokio::sync::oneshot;
use tokio::task;
use tracing::{debug, error, info, span, warn, Instrument, Level};
use tracing::{error, info, span, warn, Instrument, Level};
use uuid::Uuid;
use warp::{http::StatusCode, Filter, Reply};
@ -19,8 +21,9 @@ use crate::storage::{
device_session, error::Error as StorageError, get_redis_conn, passive_roaming, redis_key,
};
use crate::uplink::{data_sns, helpers, join_sns, RoamingMetaData, UplinkFrameSet};
use crate::{config, region};
use backend::{BasePayload, MessageType};
use crate::{config, region, stream};
use backend::{BasePayload, BasePayloadResultProvider, MessageType};
use chirpstack_api::stream as stream_pb;
use lrwn::region::CommonName;
use lrwn::{AES128Key, NetID, EUI64};
@ -70,9 +73,6 @@ pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::B
body.advance(cnt);
}
// TODO: should this be improved?
debug!("JSON: {}", String::from_utf8(b.clone()).unwrap_or_default());
let bp: BasePayload = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(e) => {
@ -97,6 +97,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
warn!(error = %e.full(), "Error decoding SenderID as EUI64");
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
}
};
@ -107,6 +108,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
warn!("Unknown SenderID");
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
}
}
@ -118,6 +120,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
warn!(error = %e.full(), "Error decoding SenderID as NetID");
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
}
};
@ -128,6 +131,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
warn!("Unknown SenderID");
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
}
}
@ -138,6 +142,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
backend::ResultCode::MalformedRequest,
"Invalid SenderID length",
);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
}
};
@ -165,10 +170,9 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
}
}
fn err_to_response(e: anyhow::Error, bp: &backend::BasePayload) -> http::Response<hyper::Body> {
fn err_to_response(e: anyhow::Error, bp: &backend::BasePayload) -> backend::BasePayloadResult {
let msg = format!("{}", e);
let pl = bp.to_base_payload_result(err_to_result_code(e), &msg);
warp::reply::json(&pl).into_response()
bp.to_base_payload_result(err_to_result_code(e), &msg)
}
fn err_to_result_code(e: anyhow::Error) -> backend::ResultCode {
@ -203,16 +207,25 @@ async fn handle_pr_start_req(
}
};
log_request_response(&bp, &b, &ans).await;
if let Err(e) = sender_client.pr_start_ans(backend::Role::FNS, &ans).await {
error!(error = %e.full(), "Send async PRStartAns error");
error!(error = %e.full(), transaction_id = bp.transaction_id, "Send async PRStartAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
} else {
match _handle_pr_start_req(b).await {
Ok(v) => warp::reply::json(&v).into_response(),
Err(e) => err_to_response(e, &bp),
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
}
}
}
@ -351,6 +364,8 @@ async fn handle_pr_stop_req(
}
};
log_request_response(&bp, &b, &ans).await;
if let Err(e) = sender_client.pr_stop_ans(backend::Role::SNS, &ans).await {
error!(error = %e.full(), "Send async PRStopAns error");
}
@ -359,8 +374,15 @@ async fn handle_pr_stop_req(
warp::reply::with_status("", StatusCode::OK).into_response()
} else {
match _handle_pr_stop_req(b).await {
Ok(v) => warp::reply::json(&v).into_response(),
Err(e) => err_to_response(e, &bp),
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
}
}
}
@ -399,11 +421,14 @@ async fn handle_xmit_data_req(
let pl: backend::XmitDataReqPayload = match serde_json::from_slice(b) {
Ok(v) => v,
Err(e) => {
return err_to_response(anyhow::Error::new(e), &bp);
let ans = err_to_response(anyhow::Error::new(e), &bp);
log_request_response(&bp, b, &ans).await;
return warp::reply::json(&ans).into_response();
}
};
if sender_client.is_async() {
let b = b.to_vec();
task::spawn(async move {
let sender_role = if pl.ul_meta_data.is_some() {
backend::Role::FNS
@ -421,6 +446,8 @@ async fn handle_xmit_data_req(
}
};
log_request_response(&bp, &b, &ans).await;
if let Err(e) = sender_client.xmit_data_ans(sender_role, &ans).await {
error!(error = %e.full(), "Send async XmitDataAns error");
}
@ -429,8 +456,15 @@ async fn handle_xmit_data_req(
warp::reply::with_status("", StatusCode::OK).into_response()
} else {
match _handle_xmit_data_req(pl).await {
Ok(v) => warp::reply::json(&v).into_response(),
Err(e) => err_to_response(e, &bp),
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
}
}
}
}
@ -558,6 +592,33 @@ pub async fn get_async_receiver(
Ok(rx)
}
async fn log_request_response<T>(bp: &backend::BasePayload, req_body: &[u8], resp: &T)
where
T: Serialize + BasePayloadResultProvider,
{
// The incoming request is an async answer.
// This is already logged by the backend client.
if bp.is_answer() {
return;
}
let be_req_log = stream_pb::BackendInterfacesRequest {
sender_id: hex::encode(&bp.sender_id),
receiver_id: hex::encode(&bp.receiver_id),
transaction_id: bp.transaction_id,
message_type: format!("{:?}", bp.message_type),
request_body: String::from_utf8(req_body.to_vec()).unwrap_or_default(),
response_body: serde_json::to_string(resp).unwrap_or_default(),
result_code: format!("{:?}", resp.base_payload().result.result_code),
time: Some(Utc::now().into()),
..Default::default()
};
if let Err(e) = stream::backend_interfaces::log_request(be_req_log).await {
error!(error = %e.full(), "Log Backend Interfaces request error");
}
}
#[cfg(test)]
pub mod test {
use super::*;