mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-02-21 01:21:21 +00:00
backend: Add optional logger func to client config.
This commit is contained in:
parent
6931e9adb5
commit
2020732459
@ -17,6 +17,7 @@ aes-kw = "0.2"
|
||||
reqwest = { version = "0.11", features = ["json", "rustls-tls"], default-features = false }
|
||||
chrono = { version = "0.4", features = ["serde"] }
|
||||
tokio = { version = "1.32", features = ["macros" ] }
|
||||
chirpstack_api = { path = "../api/rust", default-features = false, features = ["json"] }
|
||||
|
||||
# Development and testing
|
||||
[dev-dependencies]
|
||||
|
@ -2,7 +2,9 @@
|
||||
extern crate anyhow;
|
||||
|
||||
use std::fs::File;
|
||||
use std::future::Future;
|
||||
use std::io::Read;
|
||||
use std::pin::Pin;
|
||||
use std::time::Duration;
|
||||
|
||||
use aes_kw::Kek;
|
||||
@ -14,6 +16,8 @@ use serde::{Deserialize, Serialize};
|
||||
use tokio::sync::oneshot::Receiver;
|
||||
use tracing::{debug, error, info, trace};
|
||||
|
||||
use chirpstack_api::stream;
|
||||
|
||||
const PROTOCOL_VERSION: &str = "1.0";
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
|
||||
@ -31,6 +35,14 @@ pub trait BasePayloadResultProvider {
|
||||
fn base_payload(&self) -> &BasePayloadResult;
|
||||
}
|
||||
|
||||
pub type RequestLogFn = Box<
|
||||
dyn Fn(
|
||||
stream::BackendInterfacesRequest,
|
||||
) -> Pin<Box<dyn Future<Output = Result<()>> + Sync + Send>>
|
||||
+ Sync
|
||||
+ Send,
|
||||
>;
|
||||
|
||||
pub struct ClientConfig {
|
||||
pub sender_id: Vec<u8>,
|
||||
pub receiver_id: Vec<u8>,
|
||||
@ -49,6 +61,9 @@ pub struct ClientConfig {
|
||||
|
||||
// Use target-role URL suffix (e.g. /fns, /sns, ...).
|
||||
pub use_target_role_suffix: bool,
|
||||
|
||||
// Request log function.
|
||||
pub request_log_fn: Option<RequestLogFn>,
|
||||
}
|
||||
|
||||
impl Default for ClientConfig {
|
||||
@ -63,6 +78,7 @@ impl Default for ClientConfig {
|
||||
authorization: None,
|
||||
async_timeout: Duration::from_secs(0),
|
||||
use_target_role_suffix: false,
|
||||
request_log_fn: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -303,6 +319,39 @@ impl Client {
|
||||
ans: &mut D,
|
||||
async_resp: Option<Receiver<Vec<u8>>>,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: ?Sized + serde::ser::Serialize + BasePayloadProvider,
|
||||
D: serde::de::DeserializeOwned + BasePayloadResultProvider,
|
||||
{
|
||||
let mut be_req_log = stream::BackendInterfacesRequest {
|
||||
time: Some(Utc::now().into()),
|
||||
..Default::default()
|
||||
};
|
||||
let res = self
|
||||
._request(target_role, pl, ans, async_resp, &mut be_req_log)
|
||||
.await;
|
||||
|
||||
if let Err(e) = &res {
|
||||
be_req_log.request_error = format!("{:#}", e);
|
||||
}
|
||||
|
||||
if let Some(log_fn) = &self.config.request_log_fn {
|
||||
if let Err(e) = log_fn(be_req_log).await {
|
||||
error!(error = %e, "Log request error");
|
||||
}
|
||||
}
|
||||
|
||||
res
|
||||
}
|
||||
|
||||
async fn _request<S, D>(
|
||||
&self,
|
||||
target_role: Option<Role>,
|
||||
pl: &S,
|
||||
ans: &mut D,
|
||||
async_resp: Option<Receiver<Vec<u8>>>,
|
||||
be_req_log: &mut stream::BackendInterfacesRequest,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: ?Sized + serde::ser::Serialize + BasePayloadProvider,
|
||||
D: serde::de::DeserializeOwned + BasePayloadResultProvider,
|
||||
@ -319,10 +368,15 @@ impl Client {
|
||||
};
|
||||
|
||||
let bp = pl.base_payload().clone();
|
||||
be_req_log.sender_id = hex::encode(&bp.sender_id);
|
||||
be_req_log.receiver_id = hex::encode(&bp.receiver_id);
|
||||
be_req_log.transaction_id = bp.transaction_id;
|
||||
be_req_log.message_type = format!("{:?}", bp.message_type);
|
||||
|
||||
let body = serde_json::to_string(&pl)?;
|
||||
be_req_log.request_body = body.clone();
|
||||
|
||||
info!(receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, server = %server, async_interface = %async_resp.is_some(), "Making request");
|
||||
debug!("JSON: {}", body);
|
||||
|
||||
let res = self
|
||||
.client
|
||||
@ -350,8 +404,11 @@ impl Client {
|
||||
None => res.text().await?,
|
||||
};
|
||||
|
||||
debug!("JSON: {}", resp_json);
|
||||
be_req_log.response_body = resp_json.clone();
|
||||
|
||||
let base: BasePayloadResult = serde_json::from_str(&resp_json)?;
|
||||
be_req_log.result_code = format!("{:?}", base.result.result_code);
|
||||
|
||||
if base.result.result_code != ResultCode::Success {
|
||||
error!(result_code = ?base.result.result_code, description = %base.result.description, receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, "Response error");
|
||||
return Err(anyhow!(
|
||||
@ -362,6 +419,7 @@ impl Client {
|
||||
}
|
||||
|
||||
*ans = serde_json::from_str(&resp_json)?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
@ -1148,7 +1206,7 @@ mod hex_encode {
|
||||
pub mod test {
|
||||
use super::*;
|
||||
use httpmock::prelude::*;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio::sync::{mpsc, oneshot};
|
||||
|
||||
#[test]
|
||||
fn test_key_envelope() {
|
||||
@ -1373,4 +1431,118 @@ pub mod test {
|
||||
mock.delete();
|
||||
assert!(resp.is_err());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_log_fn_ok() {
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
let server = MockServer::start();
|
||||
|
||||
let c = Client::new(ClientConfig {
|
||||
sender_id: vec![1, 2, 3],
|
||||
server: server.url("/"),
|
||||
request_log_fn: Some(Box::new(move |log| {
|
||||
let tx = tx.clone();
|
||||
Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) })
|
||||
})),
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut req = HomeNSReqPayload {
|
||||
base: BasePayload {
|
||||
sender_id: vec![1, 2, 3],
|
||||
receiver_id: vec![1, 2, 3, 4, 5, 6, 7, 8],
|
||||
message_type: MessageType::HomeNSReq,
|
||||
transaction_id: 1234,
|
||||
..Default::default()
|
||||
},
|
||||
dev_eui: vec![8, 7, 6, 5, 4, 3, 2, 1],
|
||||
};
|
||||
|
||||
let ans = HomeNSAnsPayload {
|
||||
base: BasePayloadResult {
|
||||
base: BasePayload {
|
||||
sender_id: vec![1, 2, 3, 4, 5, 6, 7, 8],
|
||||
receiver_id: vec![1, 2, 3],
|
||||
message_type: MessageType::HomeNSAns,
|
||||
transaction_id: 1234,
|
||||
..Default::default()
|
||||
},
|
||||
result: ResultPayload {
|
||||
result_code: ResultCode::Success,
|
||||
description: "".into(),
|
||||
},
|
||||
},
|
||||
h_net_id: vec![3, 2, 1],
|
||||
};
|
||||
|
||||
// OK
|
||||
let mut mock = server.mock(|when, then| {
|
||||
when.method(POST)
|
||||
.path("/")
|
||||
.body(serde_json::to_string(&req).unwrap());
|
||||
then.body(serde_json::to_vec(&ans).unwrap()).status(200);
|
||||
});
|
||||
|
||||
c.home_ns_req(vec![1, 2, 3, 4, 5, 6, 7, 8], &mut req, None)
|
||||
.await
|
||||
.unwrap();
|
||||
mock.assert();
|
||||
mock.delete();
|
||||
|
||||
let be_req_log = rx.recv().await.unwrap();
|
||||
assert_eq!("010203", be_req_log.sender_id);
|
||||
assert_eq!("0102030405060708", be_req_log.receiver_id);
|
||||
assert_eq!(1234, be_req_log.transaction_id);
|
||||
assert!(be_req_log.request_error.is_empty());
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_log_fn_error() {
|
||||
let (tx, mut rx) = mpsc::channel(1);
|
||||
let server = MockServer::start();
|
||||
|
||||
let c = Client::new(ClientConfig {
|
||||
sender_id: vec![1, 2, 3],
|
||||
server: server.url("/"),
|
||||
request_log_fn: Some(Box::new(move |log| {
|
||||
let tx = tx.clone();
|
||||
Box::pin(async move { tx.send(log).await.map_err(|e| anyhow!("{}", e)) })
|
||||
})),
|
||||
..Default::default()
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let mut req = HomeNSReqPayload {
|
||||
base: BasePayload {
|
||||
sender_id: vec![1, 2, 3],
|
||||
receiver_id: vec![1, 2, 3, 4, 5, 6, 7, 8],
|
||||
message_type: MessageType::HomeNSReq,
|
||||
transaction_id: 1234,
|
||||
..Default::default()
|
||||
},
|
||||
dev_eui: vec![8, 7, 6, 5, 4, 3, 2, 1],
|
||||
};
|
||||
|
||||
// OK
|
||||
let mut mock = server.mock(|when, then| {
|
||||
when.method(POST)
|
||||
.path("/")
|
||||
.body(serde_json::to_string(&req).unwrap());
|
||||
then.status(500);
|
||||
});
|
||||
|
||||
assert!(c
|
||||
.home_ns_req(vec![1, 2, 3, 4, 5, 6, 7, 8], &mut req, None)
|
||||
.await
|
||||
.is_err());
|
||||
mock.assert();
|
||||
mock.delete();
|
||||
|
||||
let be_req_log = rx.recv().await.unwrap();
|
||||
assert_eq!("010203", be_req_log.sender_id);
|
||||
assert_eq!("0102030405060708", be_req_log.receiver_id);
|
||||
assert_eq!(1234, be_req_log.transaction_id);
|
||||
assert!(!be_req_log.request_error.is_empty());
|
||||
}
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user