use async_trait::async_trait; use futures_util::StreamExt; use redis::aio::ConnectionManager; use redis::cluster::ClusterClient; use redis::cluster_async::ClusterConnection; use redis::{Client, FromRedisValue}; use tokio::sync::mpsc; use tokio::time::{Duration, timeout}; use crate::socket::message_bus::{MessageBus, MessageBusError}; const REDIS_CONNECT_TIMEOUT_SECS: u64 = 5; #[derive(Clone)] pub enum RedisCommandClient { Single(ConnectionManager), Cluster(ClusterConnection), } impl RedisCommandClient { pub async fn query(&self, cmd: &mut redis::Cmd) -> redis::RedisResult { match self { Self::Single(conn) => { let mut conn = conn.clone(); cmd.query_async(&mut conn).await } Self::Cluster(conn) => { let mut conn = conn.clone(); cmd.query_async(&mut conn).await } } } } pub struct RedisMessageBus { command_client: RedisCommandClient, pubsub_client: Client, } impl RedisMessageBus { /// Connect to Redis using the same `redis` crate as appks. /// /// Supports both single-node `redis://host:port` and cluster /// `redis-cluster://host1:6379?node=host2:6379` URLs. pub async fn new(redis_url: &str) -> Result { tracing::info!("Connecting to Redis"); let connect_timeout = Duration::from_secs(REDIS_CONNECT_TIMEOUT_SECS); let parsed = parse_redis_url(redis_url)?; let (command_client, pubsub_url) = match parsed { ParsedRedisConfig::Single(url) => { let client = Client::open(url.as_str()) .map_err(|e| MessageBusError::Redis(e.to_string()))?; let conn = timeout(connect_timeout, client.get_connection_manager()) .await .map_err(|_| { MessageBusError::Redis(format!( "Redis connection timeout after {REDIS_CONNECT_TIMEOUT_SECS}s" )) })? .map_err(|e| MessageBusError::Redis(e.to_string()))?; (RedisCommandClient::Single(conn), url) } ParsedRedisConfig::Cluster(nodes) => { let cluster_client = ClusterClient::new(nodes.iter().map(String::as_str).collect::>()) .map_err(|e| MessageBusError::Redis(e.to_string()))?; let conn = timeout(connect_timeout, cluster_client.get_async_connection()) .await .map_err(|_| { MessageBusError::Redis(format!( "Redis cluster connection timeout after {REDIS_CONNECT_TIMEOUT_SECS}s" )) })? .map_err(|e| MessageBusError::Redis(e.to_string()))?; let pubsub_url = nodes .first() .cloned() .ok_or_else(|| MessageBusError::Redis("Redis cluster nodes are empty".into()))?; (RedisCommandClient::Cluster(conn), pubsub_url) } }; let pubsub_client = Client::open(pubsub_url.as_str()) .map_err(|e| MessageBusError::Redis(e.to_string()))?; tracing::info!("Redis connected"); Ok(Self { command_client, pubsub_client, }) } pub fn client(&self) -> RedisCommandClient { self.command_client.clone() } } #[async_trait] impl MessageBus for RedisMessageBus { async fn publish(&self, channel: &str, message: &[u8]) -> Result<(), MessageBusError> { self.command_client .query::<()>(redis::cmd("PUBLISH").arg(channel).arg(message)) .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; Ok(()) } async fn subscribe(&self, channel: &str) -> Result>, MessageBusError> { let (tx, rx) = mpsc::channel::>(256); let mut pubsub = self .pubsub_client .get_async_pubsub() .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; pubsub .subscribe(channel) .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; let channel_owned = channel.to_string(); tokio::spawn(async move { let mut stream = pubsub.on_message(); while let Some(message) = stream.next().await { if message.get_channel_name() != channel_owned { continue; } let payload = message.get_payload::>().unwrap_or_default(); if tx.send(payload).await.is_err() { break; } } }); Ok(rx) } async fn unsubscribe(&self, _channel: &str) -> Result<(), MessageBusError> { // Each subscription owns its dedicated async PubSub connection inside // the spawned listener task. Dropping the receiver stops local delivery. Ok(()) } async fn close(&self) -> Result<(), MessageBusError> { Ok(()) } } enum ParsedRedisConfig { Single(String), Cluster(Vec), } fn parse_redis_url(redis_url: &str) -> Result { let Some(rest) = redis_url.strip_prefix("redis-cluster://") else { return Ok(ParsedRedisConfig::Single(redis_url.to_string())); }; let (first, query) = rest.split_once('?').unwrap_or((rest, "")); let (auth, first_node) = split_auth(first); let mut nodes = Vec::new(); if !first_node.is_empty() { nodes.push(to_redis_node_url(auth, first_node)); } for part in query.split('&') { let Some(node) = part.strip_prefix("node=") else { continue; }; if !node.is_empty() { nodes.push(to_redis_node_url(auth, node)); } } if nodes.is_empty() { return Err(MessageBusError::Redis("Redis cluster URL has no nodes".into())); } Ok(ParsedRedisConfig::Cluster(nodes)) } fn split_auth(value: &str) -> (&str, &str) { value .rsplit_once('@') .map(|(auth, host)| (auth, host)) .unwrap_or(("", value)) } fn to_redis_node_url(auth: &str, node: &str) -> String { if node.starts_with("redis://") || node.starts_with("rediss://") { return node.to_string(); } if auth.is_empty() { format!("redis://{node}") } else { format!("redis://{auth}@{node}") } } #[cfg(test)] mod tests { use super::*; #[test] fn parses_single_redis_url() { match parse_redis_url("redis://127.0.0.1:6379").unwrap() { ParsedRedisConfig::Single(url) => assert_eq!(url, "redis://127.0.0.1:6379"), ParsedRedisConfig::Cluster(_) => panic!("expected single redis config"), } } #[test] fn parses_cluster_url_for_redis_rs() { match parse_redis_url("redis-cluster://:pass@127.0.0.1:6380?node=127.0.0.1:6381").unwrap() { ParsedRedisConfig::Cluster(nodes) => assert_eq!( nodes, vec![ "redis://:pass@127.0.0.1:6380".to_string(), "redis://:pass@127.0.0.1:6381".to_string() ] ), ParsedRedisConfig::Single(_) => panic!("expected cluster redis config"), } } }