Merge deduplication_put and _locked queries.

This merges the deduplication_put and deduplication_locked Redis queries
into a single pipelined query. Simulating 400 uplinks / seconds through
3 gateways (thus 1200 messages / second), and an artificial TCP latency
of 5ms between ChirpStack <> Redis, this reduces the
storage_redis_conn_get_duration_seconds_sum metric by about 50%.

What we are trying to solve here is that under high (simulated) load, we
exhaust the Redis connection pool. This situation gets worse when there
is a significant latency between ChirpStack <> Redis, because the query
takes longer to complete and is reserved from the pool for a longer
time. The result can be that during the de-duplication process, the key
containing the uplink set has already expired before the
deduplicate_collect function is able to get a Redis connection from the
pool.

This query merge is a quick win, because each uplink can be received by
N gateways, thus this merge saves N Redis queries per uplink.
This commit is contained in:
Orne Brocaar 2024-01-31 11:06:03 +00:00
parent 9de86ffdec
commit b65faf7b98

View File

@ -169,8 +169,8 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
None => "".to_string(), None => "".to_string(),
}; };
let key = redis_key(format!("up:collect:{}:{}", tx_info_str, phy_str)); let key = redis_key(format!("up:collect:{{{}:{}}}", tx_info_str, phy_str));
let lock_key = redis_key(format!("up:collect:{}:{}:lock", tx_info_str, phy_str)); let lock_key = redis_key(format!("up:collect:{{{}:{}}}:lock", tx_info_str, phy_str));
let dedup_delay = config::get().network.deduplication_delay; let dedup_delay = config::get().network.deduplication_delay;
let mut dedup_ttl = dedup_delay * 2; let mut dedup_ttl = dedup_delay * 2;
@ -180,15 +180,10 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
trace!( trace!(
key = key.as_str(), key = key.as_str(),
"Adding uplink event to deduplication set" "Adding uplink event to deduplication set and getting lock"
); );
deduplicate_put(&key, dedup_ttl, &event).await?; let locked = deduplicate_put(&key, &lock_key, dedup_ttl, &event).await?;
if locked {
trace!(
lock_key = lock_key.as_str(),
"Requesting deduplication lock"
);
if deduplicate_locked(&lock_key, dedup_ttl).await? {
trace!( trace!(
lock_key = lock_key.as_str(), lock_key = lock_key.as_str(),
"Deduplication is already locked by an other process" "Deduplication is already locked by an other process"
@ -219,38 +214,36 @@ async fn _deduplicate_uplink(event: gw::UplinkFrame) -> Result<()> {
Ok(()) Ok(())
} }
async fn deduplicate_put(key: &str, ttl: Duration, event: &gw::UplinkFrame) -> Result<()> { async fn deduplicate_put(
collect_key: &str,
lock_key: &str,
ttl: Duration,
event: &gw::UplinkFrame,
) -> Result<bool> {
let event_b = event.encode_to_vec(); let event_b = event.encode_to_vec();
redis::pipe() let (lock_set,): (bool,) = redis::pipe()
.atomic() .atomic()
.cmd("SADD") .cmd("SADD")
.arg(key) .arg(collect_key)
.arg(event_b) .arg(event_b)
.ignore() .ignore()
.cmd("PEXPIRE") .cmd("PEXPIRE")
.arg(key) .arg(collect_key)
.arg(ttl.as_millis() as usize) .arg(ttl.as_millis() as usize)
.ignore() .ignore()
.query_async(&mut get_async_redis_conn().await?) .cmd("SET")
.await .arg(lock_key)
.context("Deduplication put")?;
Ok(())
}
async fn deduplicate_locked(key: &str, ttl: Duration) -> Result<bool> {
let set: bool = redis::cmd("SET")
.arg(key)
.arg("lock") .arg("lock")
.arg("PX") .arg("PX")
.arg(ttl.as_millis() as usize) .arg(ttl.as_millis() as usize)
.arg("NX") .arg("NX")
.query_async(&mut get_async_redis_conn().await?) .query_async(&mut get_async_redis_conn().await?)
.await .await
.context("Deduplication locked")?; .context("Deduplication put and get lock")?;
Ok(!set) // We get true if we were able to set the lock, thus true == not yet locked.
Ok(!lock_set)
} }
async fn deduplicate_collect(key: &str) -> Result<gw::UplinkFrameSet> { async fn deduplicate_collect(key: &str) -> Result<gw::UplinkFrameSet> {