Files
zhenyi 1000f8a80d chore(infra): add gRPC layer, update protobufs, remove immediate module
- Add gRPC service modules: auth, channel, channel settings, member,
  permission
- Update protobuf definitions and generated code
- Remove immediate/ real-time module (superseded by IM service)
- Update etcd discovery and registration
- Update cache, error, config, and build infrastructure
- Add ADR documentation
- Update OpenAPI spec
2026-06-10 18:49:42 +08:00

103 lines
3.0 KiB
Rust

use crate::config::AppConfig;
use crate::error::{AppError, AppResult};
use futures_util::future::BoxFuture;
use redis::cluster::ClusterClient;
use redis::{Client, FromRedisValue};
#[derive(Clone)]
enum RedisBackend {
Single(redis::aio::ConnectionManager),
Cluster(redis::cluster_async::ClusterConnection),
}
#[derive(Clone)]
pub struct AppRedis {
backend: RedisBackend,
}
impl AppRedis {
pub async fn from_config(config: &AppConfig) -> AppResult<Self> {
let backend = if config.redis_cluster_enabled()? {
let nodes = config.redis_cluster_nodes()?;
let cluster_client =
ClusterClient::new(nodes.iter().map(|s| s.as_str()).collect::<Vec<_>>())?;
let conn = cluster_client.get_async_connection().await?;
RedisBackend::Cluster(conn)
} else {
let url = config
.redis_url()?
.ok_or_else(|| AppError::Config("APP_REDIS_URL is not set".into()))?;
let client = Client::open(url.as_str())?;
let conn = client.get_connection_manager().await?;
RedisBackend::Single(conn)
};
Ok(Self { backend })
}
pub fn get_connection(&self) -> RedisConnection {
match &self.backend {
RedisBackend::Single(cm) => RedisConnection::Single(cm.clone()),
RedisBackend::Cluster(cc) => RedisConnection::Cluster(cc.clone()),
}
}
}
pub enum RedisConnection {
Single(redis::aio::ConnectionManager),
Cluster(redis::cluster_async::ClusterConnection),
}
impl redis::aio::ConnectionLike for RedisConnection {
fn req_packed_command<'a>(
&'a mut self,
cmd: &'a redis::Cmd,
) -> BoxFuture<'a, redis::RedisResult<redis::Value>> {
match self {
Self::Single(c) => Box::pin(c.req_packed_command(cmd)),
Self::Cluster(c) => Box::pin(c.req_packed_command(cmd)),
}
}
fn req_packed_commands<'a>(
&'a mut self,
cmd: &'a redis::Pipeline,
offset: usize,
count: usize,
) -> BoxFuture<'a, redis::RedisResult<Vec<redis::Value>>> {
match self {
Self::Single(c) => Box::pin(c.req_packed_commands(cmd, offset, count)),
Self::Cluster(c) => Box::pin(c.req_packed_commands(cmd, offset, count)),
}
}
fn get_db(&self) -> i64 {
match self {
Self::Single(c) => c.get_db(),
Self::Cluster(c) => c.get_db(),
}
}
}
impl RedisConnection {
pub async fn query_async<T: FromRedisValue>(
&mut self,
cmd: &mut redis::Cmd,
) -> redis::RedisResult<T> {
match self {
Self::Single(c) => cmd.query_async(c).await,
Self::Cluster(c) => cmd.query_async(c).await,
}
}
pub async fn query_pipeline_async<T: FromRedisValue>(
&mut self,
pipe: &mut redis::Pipeline,
) -> redis::RedisResult<T> {
match self {
Self::Single(c) => pipe.query_async(c).await,
Self::Cluster(c) => pipe.query_async(c).await,
}
}
}