Merge d34ac49ee62a55341e824122488cd747dbe3a7ff into 3e7f09db626e46092181714ccd5cef1aba507daa

This commit is contained in:
Eric de Ruiter 2025-03-13 09:41:17 +00:00 committed by GitHub
commit 4265e4bb49
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 32 additions and 2 deletions

1
Cargo.lock generated
View File

@ -1258,6 +1258,7 @@ dependencies = [
"deadpool",
"redis",
"serde",
"tokio",
]
[[package]]

View File

@ -38,7 +38,7 @@
tokio-postgres-rustls = { version = "0.13", optional = true }
bigdecimal = "0.4"
redis = { version = "0.27", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.18", features = ["cluster", "serde"] }
deadpool-redis = { version = "0.18", features = ["cluster", "serde", "sentinel"] }
# Logging
tracing = "0.1"

View File

@ -47,7 +47,7 @@ dist:
test:
cargo fmt --check
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE)"
RUST_MIN_STACK=2621440 TZ=UTC cargo test --no-default-features --features="$(DATABASE)"
test-all:
cargo fmt --check

View File

@ -105,6 +105,12 @@ r#"
# instance.
cluster={{ redis.cluster }}
# Redis sentinel master name.
#
# Set the master name when the provided URLs are pointing to a Redis Sentinel
# instance.
master_name="{{ redis.master_name }}"
# Key prefix.
#
# A key prefix can be used to avoid key collisions when multiple deployments

View File

@ -75,6 +75,7 @@ pub struct Redis {
pub servers: Vec<String>,
pub cluster: bool,
pub key_prefix: String,
pub master_name: String,
pub max_open_connections: u32,
pub min_idle_connections: u32,
}
@ -85,6 +86,7 @@ impl Default for Redis {
servers: vec!["redis://127.0.0.1/".into()],
cluster: false,
key_prefix: "".into(),
master_name: "".into(),
max_open_connections: 100,
min_idle_connections: 0,
}

View File

@ -78,11 +78,13 @@ pub use sqlite::{
pub enum AsyncRedisPool {
Client(deadpool_redis::Pool),
ClusterClient(deadpool_redis::cluster::Pool),
SentinelClient(deadpool_redis::sentinel::Pool),
}
pub enum AsyncRedisPoolConnection {
Client(deadpool_redis::Connection),
ClusterClient(deadpool_redis::cluster::Connection),
SentinelClient(deadpool_redis::sentinel::Connection),
}
impl ConnectionLike for AsyncRedisPoolConnection {
@ -93,6 +95,7 @@ impl ConnectionLike for AsyncRedisPoolConnection {
match self {
AsyncRedisPoolConnection::Client(v) => v.req_packed_command(cmd),
AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_command(cmd),
AsyncRedisPoolConnection::SentinelClient(v) => v.req_packed_command(cmd),
}
}
fn req_packed_commands<'a>(
@ -104,12 +107,16 @@ impl ConnectionLike for AsyncRedisPoolConnection {
match self {
AsyncRedisPoolConnection::Client(v) => v.req_packed_commands(cmd, offset, count),
AsyncRedisPoolConnection::ClusterClient(v) => v.req_packed_commands(cmd, offset, count),
AsyncRedisPoolConnection::SentinelClient(v) => {
v.req_packed_commands(cmd, offset, count)
}
}
}
fn get_db(&self) -> i64 {
match self {
AsyncRedisPoolConnection::Client(v) => v.get_db(),
AsyncRedisPoolConnection::ClusterClient(v) => v.get_db(),
AsyncRedisPoolConnection::SentinelClient(v) => v.get_db(),
}
}
}
@ -134,6 +141,17 @@ pub async fn setup() -> Result<()> {
.max_size(conf.redis.max_open_connections as usize)
.build()?;
set_async_redis_pool(AsyncRedisPool::ClusterClient(pool)).await;
} else if !conf.redis.master_name.is_empty() {
info!("Setting up Redis sentinel client");
let pool = deadpool_redis::sentinel::Config::from_urls(
conf.redis.servers.clone(),
conf.redis.master_name.clone(),
deadpool_redis::sentinel::SentinelServerType::Master,
)
.builder()?
.max_size(conf.redis.max_open_connections as usize)
.build()?;
set_async_redis_pool(AsyncRedisPool::SentinelClient(pool)).await;
} else {
let pool = deadpool_redis::Config::from_url(conf.redis.servers[0].clone())
.builder()?
@ -171,6 +189,9 @@ pub async fn get_async_redis_conn() -> Result<AsyncRedisPoolConnection> {
AsyncRedisPool::ClusterClient(v) => {
AsyncRedisPoolConnection::ClusterClient(v.clone().get().await?)
}
AsyncRedisPool::SentinelClient(v) => {
AsyncRedisPoolConnection::SentinelClient(v.clone().get().await?)
}
};
STORAGE_REDIS_CONN_GET.observe(start.elapsed().as_secs_f64());