Fix async receiver for passive-roaming HomeNSReq + add debug logs.

This commit is contained in:
Orne Brocaar 2022-08-03 15:18:31 +01:00
parent 7890dc7539
commit 3348ccf67d
3 changed files with 52 additions and 6 deletions
backend/src
chirpstack/src
api/backend
uplink

@ -12,7 +12,7 @@ use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE};
use reqwest::{Certificate, Identity}; use reqwest::{Certificate, Identity};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::oneshot::Receiver; use tokio::sync::oneshot::Receiver;
use tracing::{error, info, trace}; use tracing::{debug, error, info, trace};
const PROTOCOL_VERSION: &str = "1.0"; const PROTOCOL_VERSION: &str = "1.0";
@ -279,13 +279,15 @@ impl Client {
}; };
let bp = pl.base_payload(); let bp = pl.base_payload();
let body = serde_json::to_string(&pl)?;
info!(receiver_id = %hex::encode(&bp.base.receiver_id), transaction_id = bp.base.transaction_id, message_type = ?bp.base.message_type, server = %server, "Making request"); info!(receiver_id = %hex::encode(&bp.base.receiver_id), transaction_id = bp.base.transaction_id, message_type = ?bp.base.message_type, server = %server, "Making request");
debug!("JSON: {}", body);
self.client self.client
.post(&server) .post(&server)
.headers(self.headers.clone()) .headers(self.headers.clone())
.json(pl) .body(body)
.send() .send()
.await? .await?
.error_for_status()?; .error_for_status()?;
@ -315,14 +317,16 @@ impl Client {
}; };
let bp = pl.base_payload().clone(); let bp = pl.base_payload().clone();
let body = serde_json::to_string(&pl)?;
info!(receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, server = %server, async_interface = %async_resp.is_some(), "Making request"); info!(receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, server = %server, async_interface = %async_resp.is_some(), "Making request");
debug!("JSON: {}", body);
let res = self let res = self
.client .client
.post(&server) .post(&server)
.headers(self.headers.clone()) .headers(self.headers.clone())
.json(pl) .body(body)
.send() .send()
.await? .await?
.error_for_status()?; .error_for_status()?;
@ -344,6 +348,7 @@ impl Client {
None => res.text().await?, None => res.text().await?,
}; };
debug!("JSON: {}", resp_json);
let base: BasePayloadResult = serde_json::from_str(&resp_json)?; let base: BasePayloadResult = serde_json::from_str(&resp_json)?;
if base.result.result_code != ResultCode::Success { if base.result.result_code != ResultCode::Success {
error!(result_code = ?base.result.result_code, description = %base.result.description, receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, "Response error"); error!(result_code = ?base.result.result_code, description = %base.result.description, receiver_id = %hex::encode(&bp.receiver_id), transaction_id = bp.transaction_id, message_type = ?bp.message_type, "Response error");
@ -974,7 +979,12 @@ pub struct GWInfoElement {
pub id: Vec<u8>, pub id: Vec<u8>,
#[serde(rename = "FineRecvTime")] #[serde(rename = "FineRecvTime")]
pub fine_recv_time: Option<usize>, pub fine_recv_time: Option<usize>,
#[serde(default, rename = "RFRegion", skip_serializing_if = "String::is_empty")] #[serde(
default,
rename = "RFRegion",
with = "rf_region_encode",
skip_serializing_if = "String::is_empty"
)]
pub rf_region: String, pub rf_region: String,
#[serde(rename = "RSSI")] #[serde(rename = "RSSI")]
pub rssi: Option<isize>, pub rssi: Option<isize>,
@ -1079,6 +1089,27 @@ pub struct DLMetaData {
pub hi_priority_flag: bool, pub hi_priority_flag: bool,
} }
mod rf_region_encode {
use serde::{Deserializer, Serializer};
pub fn serialize<S>(s: &str, serializer: S) -> std::result::Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&s.replace('_', "-"))
}
pub fn deserialize<'a, D>(deserializer: D) -> Result<String, D::Error>
where
D: Deserializer<'a>,
{
let s: &str = serde::de::Deserialize::deserialize(deserializer)?;
// Some implementations use lowercase.
Ok(s.to_uppercase())
}
}
mod hex_encode { mod hex_encode {
use serde::{Deserializer, Serializer}; use serde::{Deserializer, Serializer};

@ -8,7 +8,7 @@ use anyhow::Result;
use redis::streams::StreamReadReply; use redis::streams::StreamReadReply;
use tokio::sync::oneshot; use tokio::sync::oneshot;
use tokio::task; use tokio::task;
use tracing::{error, info, warn}; use tracing::{debug, error, info, warn};
use uuid::Uuid; use uuid::Uuid;
use warp::{http::StatusCode, Filter, Reply}; use warp::{http::StatusCode, Filter, Reply};
@ -69,6 +69,8 @@ pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::B
body.advance(cnt); body.advance(cnt);
} }
debug!("JSON: {}", String::from_utf8(b.clone()).unwrap_or_default());
let bp: BasePayload = match serde_json::from_slice(&b) { let bp: BasePayload = match serde_json::from_slice(&b) {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {

@ -70,8 +70,21 @@ impl JoinRequest {
home_ns_req.base.transaction_id = 1234; home_ns_req.base.transaction_id = 1234;
} }
let async_receiver = match js_client.is_async() {
false => None,
true => Some(
get_async_receiver(
home_ns_req.base.transaction_id,
js_client.get_async_timeout(),
)
.await?,
),
};
trace!("Requesting home netid"); trace!("Requesting home netid");
let home_ns_ans = js_client.home_ns_req(&mut home_ns_req, None).await?; let home_ns_ans = js_client
.home_ns_req(&mut home_ns_req, async_receiver)
.await?;
self.home_net_id = Some(NetID::from_slice(&home_ns_ans.h_net_id)?); self.home_net_id = Some(NetID::from_slice(&home_ns_ans.h_net_id)?);
Ok(()) Ok(())