From c794b818fffdcf8a9434cd75cab9bc5305b73505 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Thu, 11 Jun 2026 22:50:38 +0800 Subject: [PATCH] feat(config): integrate etcd for service discovery and config management - Add etcd-client dependency for distributed configuration storage - Implement EtcdConfig with priority: etcd > environment variables > defaults - Add ServiceRegistry for service registration with lease keep-alive - Integrate etcd-based service discovery for appks gRPC connections - Add service watcher for real-time service instance updates - Migrate Redis configuration from single URL to cluster node list - Update Dockerfile with default IMKS_HOST and IMKS_PORT environment variables - Add etcd bootstrap configuration through environment variables - Implement Redis cluster URL building with optional authentication --- Cargo.lock | 163 ++--- Cargo.toml | 5 +- Dockerfile | 4 +- engine/session.rs | 5 +- etcd.rs | 135 ++-- main.rs | 626 ++++++++++++------ migrate/000_message_thread_base.sql | 23 - migrate/001_message_rich_content.sql | 115 ---- migrate/002_message_social.sql | 76 --- migrate/003_message_article.sql | 45 -- migrate/004_message_social_part2.sql | 98 --- migrate/005_message_misc.sql | 102 --- .../006_fix_uuid_defaults_and_draft_null.sql | 32 - rpc/mod.rs | 1 + socket/adapter/redis.rs | 23 +- socket/message_bus/redis.rs | 231 +++++-- socket/session_store/redis.rs | 68 +- svc/deploy.rs | 133 +++- svc/scheduled.rs | 3 +- telemetry/config.rs | 5 +- telemetry/health.rs | 11 +- telemetry/logs.rs | 14 +- telemetry/metrics.rs | 12 +- telemetry/traces.rs | 6 +- 24 files changed, 984 insertions(+), 952 deletions(-) delete mode 100644 migrate/000_message_thread_base.sql delete mode 100644 migrate/001_message_rich_content.sql delete mode 100644 migrate/002_message_social.sql delete mode 100644 migrate/003_message_article.sql delete mode 100644 migrate/004_message_social_part2.sql delete mode 100644 migrate/005_message_misc.sql delete mode 100644 migrate/006_fix_uuid_defaults_and_draft_null.sql diff --git a/Cargo.lock b/Cargo.lock index e285c1e..370a64a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -262,6 +262,12 @@ dependencies = [ "rustversion", ] +[[package]] +name = "arcstr" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03918c3dbd7701a85c6b9887732e2921175f26c350b4563841d0958c21d57e6d" + [[package]] name = "asn1-rs" version = "0.7.2" @@ -301,6 +307,17 @@ dependencies = [ "syn", ] +[[package]] +name = "async-lock" +version = "3.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "290f7f2596bd5b78a9fec8088ccd89180d7f9f55b94b0576823bbbdc72ee8311" +dependencies = [ + "event-listener", + "event-listener-strategy", + "pin-project-lite", +] + [[package]] name = "async-nats" version = "0.38.0" @@ -412,6 +429,15 @@ dependencies = [ "tower-service", ] +[[package]] +name = "backon" +version = "1.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cffb0e931875b666fc4fcb20fee52e9bbd1ef836fd9e9e04ec21555f9f85f7ef" +dependencies = [ + "fastrand", +] + [[package]] name = "base64" version = "0.22.1" @@ -502,16 +528,6 @@ dependencies = [ "serde", ] -[[package]] -name = "bytes-utils" -version = "0.1.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" -dependencies = [ - "bytes", - "either", -] - [[package]] name = "bytestring" version = "1.5.1" @@ -576,6 +592,20 @@ version = "0.5.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0c9ea0ac24bc397ab3c98583a3c9ba74fa56b09a4449bbe172b9b1ddb016027a" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "futures-core", + "memchr", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -617,12 +647,6 @@ dependencies = [ "version_check", ] -[[package]] -name = "cookie-factory" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "396de984970346b0d9e93d1415082923c679e5ae5c3ee3dcbd104f5610af126b" - [[package]] name = "core-foundation" version = "0.9.4" @@ -978,6 +1002,16 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "event-listener-strategy" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8be9f3dfaaffdae2972880079a491a1a8bb7cbed0b8dd7a347f668b4150a3b93" +dependencies = [ + "event-listener", + "pin-project-lite", +] + [[package]] name = "fastrand" version = "2.4.1" @@ -1012,15 +1046,6 @@ dependencies = [ "miniz_oxide", ] -[[package]] -name = "float-cmp" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b09cf3155332e944990140d967ff5eceb70df778b34f77d8075db46e4704e6d8" -dependencies = [ - "num-traits", -] - [[package]] name = "flume" version = "0.12.0" @@ -1059,43 +1084,6 @@ dependencies = [ "percent-encoding", ] -[[package]] -name = "fred" -version = "10.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a7b2fd0f08b23315c13b6156f971aeedb6f75fb16a29ac1872d2eabccc1490e" -dependencies = [ - "arc-swap", - "async-trait", - "bytes", - "bytes-utils", - "float-cmp", - "fred-macros", - "futures", - "log", - "parking_lot", - "rand 0.8.6", - "redis-protocol", - "semver", - "socket2 0.5.10", - "tokio", - "tokio-stream", - "tokio-util", - "url", - "urlencoding", -] - -[[package]] -name = "fred-macros" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1458c6e22d36d61507034d5afecc64f105c1d39712b7ac6ec3b352c423f715cc" -dependencies = [ - "proc-macro2", - "quote", - "syn", -] - [[package]] name = "futures" version = "0.3.32" @@ -1104,7 +1092,6 @@ checksum = "8b147ee9d1f6d097cef9ce628cd2ee62288d963e16fb287bd9286455b241382d" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -1630,8 +1617,8 @@ dependencies = [ "base64", "chrono", "dashmap", + "dotenvy", "etcd-client", - "fred", "futures-util", "jsonwebtoken", "opentelemetry", @@ -1643,6 +1630,7 @@ dependencies = [ "prost", "prost-types", "rand 0.9.4", + "redis", "serde", "serde_json", "sqlx", @@ -2534,17 +2522,34 @@ dependencies = [ ] [[package]] -name = "redis-protocol" -version = "6.0.0" +name = "redis" +version = "1.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdba59219406899220fc4cdfd17a95191ba9c9afb719b5fa5a083d63109a9f1" +checksum = "a12e6b5f4d8ef33944e833e2b1859ad478deab6e431d7337b30ee2efe21f7543" dependencies = [ + "arc-swap", + "arcstr", + "async-lock", + "backon", "bytes", - "bytes-utils", - "cookie-factory", + "cfg-if", + "combine", "crc16", + "futures-channel", + "futures-util", + "itoa", "log", - "nom", + "num-bigint", + "percent-encoding", + "pin-project-lite", + "rand 0.9.4", + "ryu", + "sha1_smol", + "socket2 0.6.4", + "tokio", + "tokio-util", + "url", + "xxhash-rust", ] [[package]] @@ -2918,6 +2923,12 @@ dependencies = [ "digest 0.11.3", ] +[[package]] +name = "sha1_smol" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbfa15b3dddfee50a0fff136974b3e1bde555604ba463834a7eb7deb6417705d" + [[package]] name = "sha2" version = "0.10.9" @@ -3813,12 +3824,6 @@ dependencies = [ "serde", ] -[[package]] -name = "urlencoding" -version = "2.1.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "daf8dba3b7eb870caf1ddeed7bc9d2a049f3cfdfae7cb521b087cc33ae4c49da" - [[package]] name = "utf8_iter" version = "1.0.4" @@ -4317,6 +4322,12 @@ dependencies = [ "time", ] +[[package]] +name = "xxhash-rust" +version = "0.8.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" + [[package]] name = "yasna" version = "0.6.0" diff --git a/Cargo.toml b/Cargo.toml index 7bf6cc3..fb31e47 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,7 +18,7 @@ tonic = { version = "0.14", features = ["tls-ring"] } prost = "0.14" prost-types = "0.14" tonic-build = "0.14" -tonic-health = "0.14" +tonic-health = "0.14.6" tonic-prost = "0.14" tokio = { version = "1", features = ["full"] } actix-web = { version = "4", features = [] } @@ -46,11 +46,12 @@ tracing-opentelemetry = "0.33" opentelemetry-appender-tracing = "0.32" opentelemetry-prometheus = "0.32" prometheus = "0.14" -fred = { version = "10", features = ["subscriber-client"] } +redis = { version = "1", features = ["cluster", "cluster-async", "aio", "tokio-comp", "connection-manager"] } async-nats = "0.38" futures-util = "0.3" jsonwebtoken = "9" arc-swap = "1" +dotenvy = "0.15" [build-dependencies] diff --git a/Dockerfile b/Dockerfile index e8c71d4..f1d0b6d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -30,9 +30,9 @@ RUN useradd -m -u 1000 imks && chown -R imks:imks /app USER imks ENV IMKS_HOST=0.0.0.0 -ENV IMKS_PORT=3000 +ENV IMKS_PORT=50048 -EXPOSE 3000 +EXPOSE 50048 HEALTHCHECK --interval=15s --timeout=3s --start-period=10s --retries=3 \ CMD curl -sf http://localhost:3000/health || exit 1 diff --git a/engine/session.rs b/engine/session.rs index 706d2b0..c65954e 100644 --- a/engine/session.rs +++ b/engine/session.rs @@ -132,7 +132,10 @@ impl SessionStore { if let Some(m) = crate::telemetry::metrics::try_get() { m.engine_sessions_active.add( 1, - &[opentelemetry::KeyValue::new("transport", transport.as_str())], + &[opentelemetry::KeyValue::new( + "transport", + transport.as_str(), + )], ); } rx diff --git a/etcd.rs b/etcd.rs index 995d26e..ef108c1 100644 --- a/etcd.rs +++ b/etcd.rs @@ -1,5 +1,6 @@ -use etcd_client::{Client, PutOptions, GetOptions, WatchOptions}; use std::sync::Arc; + +use etcd_client::{Client, EventType, GetOptions, PutOptions, WatchOptions}; use tokio::sync::Mutex; use tokio_stream::StreamExt; @@ -14,7 +15,10 @@ impl EtcdConfig { let client = Client::connect(endpoints, None) .await .map_err(|e| format!("etcd connect: {e}"))?; - Ok(Self { client: Arc::new(Mutex::new(client)), prefix: prefix.to_string() }) + Ok(Self { + client: Arc::new(Mutex::new(client)), + prefix: prefix.to_string(), + }) } /// Get config value: etcd first, then env var, then default. @@ -41,7 +45,9 @@ impl EtcdConfig { } pub async fn get_parsed(&self, key: &str, default: T) -> T - where T::Err: std::fmt::Display, T: std::fmt::Display + where + T::Err: std::fmt::Display, + T: std::fmt::Display, { let s = self.get(key, &default.to_string()).await; s.parse().unwrap_or(default) @@ -71,22 +77,44 @@ impl EtcdConfig { } } } - tracing::info!(service = service_name, count = addrs.len(), "discovered instances"); + tracing::info!( + service = service_name, + count = addrs.len(), + "discovered instances" + ); Ok(addrs) } - /// Watch a service for live updates. - pub fn start_service_watcher(&self, service_name: &str) { + /// Watch a service prefix for live join/leave events. + /// + /// Calls `on_up(addr)` when a new instance appears and `on_down(addr)` + /// when one disappears. The watcher runs in a background task and + /// automatically reconnects on failure. + pub fn start_service_watcher( + &self, + service_name: &str, + on_up: impl Fn(String) + Send + Sync + 'static, + on_down: impl Fn(String) + Send + Sync + 'static, + ) { let client = self.client.clone(); let prefix = self.prefix.clone(); let svc = service_name.to_string(); let watch_prefix = format!("{}services/{}/", prefix, svc); + let on_up = Arc::new(on_up); + let on_down = Arc::new(on_down); + tokio::spawn(async move { loop { let mut stream = { let mut c = client.lock().await; - match c.watch(watch_prefix.as_str(), Some(WatchOptions::new().with_prefix())).await { + match c + .watch( + watch_prefix.as_str(), + Some(WatchOptions::new().with_prefix()), + ) + .await + { Ok(s) => s, Err(e) => { tracing::warn!(service = %svc, error = %e, "watch failed, retry in 3s"); @@ -96,23 +124,36 @@ impl EtcdConfig { } }; while let Some(resp) = stream.next().await { - if let Ok(resp) = resp { - for event in resp.events() { - if let Some(kv) = event.kv() { - let addr = kv.value_str().unwrap_or_default(); - let key = kv.key_str().unwrap_or_default(); - match event.event_type() { - etcd_client::EventType::Put => { - tracing::info!(service = %svc, key, addr, "service up"); - } - etcd_client::EventType::Delete => { - tracing::info!(service = %svc, key, "service down"); - } - } + let Ok(resp) = resp else { break }; + for event in resp.events() { + let Some(kv) = event.kv() else { continue }; + let raw = kv.value_str().unwrap_or_default(); + let key = kv.key_str().unwrap_or_default(); + + // Parse JSON to extract the actual address + let addr = serde_json::from_str::(raw) + .ok() + .and_then(|v| { + v.get("addr") + .and_then(|a| a.as_str()) + .map(|s| s.to_string()) + }) + .unwrap_or_else(|| raw.to_string()); + + match event.event_type() { + EventType::Put => { + tracing::info!(service = %svc, key, addr, "service up"); + on_up(addr); + } + EventType::Delete => { + tracing::info!(service = %svc, key, "service down"); + on_down(addr); } } } } + tracing::warn!(service = %svc, "watch stream ended, restarting in 3s"); + tokio::time::sleep(std::time::Duration::from_secs(3)).await; } }); } @@ -126,7 +167,10 @@ pub struct ServiceRegistry { impl ServiceRegistry { pub fn new(client: Arc>, prefix: &str) -> Self { - Self { client, prefix: prefix.to_string() } + Self { + client, + prefix: prefix.to_string(), + } } pub async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> { @@ -143,43 +187,44 @@ impl ServiceRegistry { let lease = { let mut client = self.client.lock().await; - client.lease_grant(15, None).await.map_err(|e| format!("lease: {e}"))? + client + .lease_grant(60, None) + .await + .map_err(|e| format!("lease: {e}"))? }; { let mut client = self.client.lock().await; let opts = PutOptions::new().with_lease(lease.id()); - client.put(key.clone(), value, Some(opts)).await.map_err(|e| format!("put: {e}"))?; + client + .put(key.clone(), value, Some(opts)) + .await + .map_err(|e| format!("put: {e}"))?; } tracing::info!(service = service_name, instance = %instance_id, addr = %addr, "registered in etcd"); let c = self.client.clone(); + let lease_id = lease.id(); tokio::spawn(async move { + let (mut keeper, mut stream) = { + let mut client = c.lock().await; + match client.lease_keep_alive(lease_id).await { + Ok(pair) => pair, + Err(e) => { + tracing::error!(lease_id, error = %e, "failed to start lease keepalive"); + return; + } + } + }; + + let mut interval = tokio::time::interval(std::time::Duration::from_secs(10)); loop { - let result = { - let mut client = c.lock().await; - client.lease_keep_alive(lease.id()).await - }; - match result { - Ok((_keeper, mut stream)) => { - while stream.next().await.is_some() {} - } - Err(e) => tracing::warn!(lease_id = lease.id(), error = %e, "keepalive failed"), - } - tokio::time::sleep(std::time::Duration::from_secs(5)).await; - let new_lease = { - let mut client = c.lock().await; - client.lease_grant(15, None).await - }; - if let Ok(lr) = new_lease { - let instance = serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")}); - if let Ok(v) = serde_json::to_string(&instance) { - let mut client = c.lock().await; - let opts = PutOptions::new().with_lease(lr.id()); - let _ = client.put(key.clone(), v, Some(opts)).await; - } + interval.tick().await; + if let Err(e) = keeper.keep_alive().await { + tracing::warn!(lease_id, error = %e, "lease keepalive failed"); } + let _ = stream.message().await; } }); diff --git a/main.rs b/main.rs index 41316f4..abb3ec4 100644 --- a/main.rs +++ b/main.rs @@ -1,4 +1,7 @@ use std::sync::{Arc, OnceLock}; +use std::time::Duration; + +use tokio::sync::RwLock; use imks::database::{Database, DatabaseConfig}; use imks::engine::server::EngineConfig; @@ -12,7 +15,10 @@ use imks::socket::server::SocketServerBuilder; use imks::svc::{DeployConfig, MessageService}; use imks::telemetry; -fn main() -> Result<(), Box> { +#[tokio::main] +async fn main() -> Result<(), Box> { + dotenvy::dotenv().ok(); + // Initialize observability stack (traces, metrics, logs, health) let telemetry_guard = telemetry::init(); telemetry::health::init_counters(); @@ -26,8 +32,7 @@ fn main() -> Result<(), Box> { .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); - let etcd_prefix = std::env::var("ETCD_KEY_PREFIX") - .unwrap_or_else(|_| "/appks/".to_string()); + let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string()); tracing::info!( adapter = %deploy.adapter_mode, @@ -38,160 +43,359 @@ fn main() -> Result<(), Box> { let addr = "0.0.0.0:50048"; - let rt = tokio::runtime::Runtime::new()?; + // --- etcd: connect, register, discover appks --- + let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix) + .await + .unwrap_or_else(|e| { + tracing::error!(error = %e, "etcd required but unavailable"); + panic!("etcd required: {e}") + }); - rt.block_on(async { - // --- etcd: connect, register, discover appks --- - let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await - .unwrap_or_else(|e| { - tracing::error!(error = %e, "etcd required but unavailable"); - panic!("etcd required: {e}") - }); + // Register this service so others can discover us + let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); + let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; + registry.register("imks", &imks_addr).await.ok(); - // Register this service so others can discover us - let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); - let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; - registry.register("imks", &imks_addr).await.ok(); + // Discover appks from etcd (priority > env). + // etcd-registered addresses are bare "host:port" — prepend http:// for gRPC. + let appks_addr = etcd + .discover_service("appks") + .await + .ok() + .and_then(|addrs| addrs.into_iter().next()) + .map(|addr| format!("http://{}", addr)) + .unwrap_or_else(|| { + std::env::var("APPKS_GRPC_ADDR") + .unwrap_or_else(|_| "http://localhost:50051".to_string()) + }); + tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); - // Discover appks from etcd (priority > env) - let appks_addr = etcd.discover_service("appks").await - .ok() - .and_then(|addrs| addrs.into_iter().next()) - .unwrap_or_else(|| { - std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string()) - }); - tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); - etcd.start_service_watcher("appks"); + // Track the currently active appks address for health checks. + // Updated on successful connect, cleared on health failure. + let current_appks_addr: Arc>> = Arc::new(RwLock::new(None)); - let engine_config = EngineConfig::default(); - let mut builder = SocketServerBuilder::new(engine_config); - let namespace_holder: Arc>> = - Arc::new(OnceLock::new()); + // Start imks's own gRPC health server early so Redis/NATS stalls do not hide it. + // Default: imks RPC health 50047, separate from HTTP (50048) and appks gRPC (50049). + { + let grpc_health_addr: std::net::SocketAddr = std::env::var("IMKS_GRPC_HEALTH_ADDR") + .unwrap_or_else(|_| "0.0.0.0:50047".to_string()) + .parse() + .expect("Invalid IMKS_GRPC_HEALTH_ADDR"); + tokio::spawn(async move { + if let Err(e) = imks::rpc::health::start_health_server(grpc_health_addr).await { + tracing::error!(error = %e, "imks gRPC health server failed"); + } + }); + } - // Pre-configure adapter for Redis/NATS mode. - match deploy.adapter_mode.as_str() { - "redis" => { - let cluster_url = deploy.redis_cluster_url(); - let message_bus = Arc::new( - RedisMessageBus::new(&cluster_url) - .await - .map_err(|e| format!("Failed to connect to Redis cluster: {e}"))?, - ); - let redis_client = message_bus.client().clone(); - let server_id = deploy.server_id.clone(); - let adapter = Arc::new(RedisAdapter::new( - message_bus.clone() as Arc<_>, - redis_client, - server_id, - "/".into(), - make_local_broadcast_fn(namespace_holder.clone()), - )); - adapter - .init() + let engine_config = EngineConfig::default(); + let mut builder = SocketServerBuilder::new(engine_config); + let namespace_holder: Arc>> = + Arc::new(OnceLock::new()); + + // Pre-configure adapter for Redis/NATS mode. + match deploy.adapter_mode.as_str() { + "redis" => { + let redis_url = deploy + .redis_url() + .map_err(|e| format!("Invalid Redis configuration: {e}"))?; + tracing::info!( + cluster_enabled = deploy.redis_cluster_enabled, + "Configuring Redis adapter" + ); + let message_bus = Arc::new( + RedisMessageBus::new(&redis_url) .await - .map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?; - builder = builder.adapter(adapter); - tracing::info!("Redis adapter configured for multi-node"); - } - "nats" => { - let message_bus = Arc::new( - NatsMessageBus::new(&deploy.nats_url) - .await - .map_err(|e| format!("Failed to connect to NATS: {e}"))?, - ); - let server_id = deploy.server_id.clone(); - let adapter = Arc::new(NatsAdapter::new( - message_bus.clone() as Arc<_>, - server_id, - "/".into(), - make_local_broadcast_fn(namespace_holder.clone()), - )); - adapter - .init() + .map_err(|e| format!("Failed to connect to Redis: {e}"))?, + ); + let redis_client = message_bus.client().clone(); + let server_id = deploy.server_id.clone(); + let adapter = Arc::new(RedisAdapter::new( + message_bus.clone() as Arc<_>, + redis_client, + server_id, + "/".into(), + make_local_broadcast_fn(namespace_holder.clone()), + )); + adapter + .init() + .await + .map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?; + builder = builder.adapter(adapter); + tracing::info!("Redis adapter configured for multi-node"); + } + "nats" => { + let message_bus = Arc::new( + NatsMessageBus::new(&deploy.nats_url) .await - .map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?; - builder = builder.adapter(adapter); - tracing::info!("NATS adapter configured for multi-node"); - } - _ => { - tracing::info!("Local adapter (single-node mode)"); - } + .map_err(|e| format!("Failed to connect to NATS: {e}"))?, + ); + let server_id = deploy.server_id.clone(); + let adapter = Arc::new(NatsAdapter::new( + message_bus.clone() as Arc<_>, + server_id, + "/".into(), + make_local_broadcast_fn(namespace_holder.clone()), + )); + adapter + .init() + .await + .map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?; + builder = builder.adapter(adapter); + tracing::info!("NATS adapter configured for multi-node"); + } + _ => { + tracing::info!("Local adapter (single-node mode)"); + } + }; + + let socket_server = Arc::new(builder.build()); + let _ = namespace_holder.set(socket_server.namespaces.clone()); + + // Connect to database (independent of appks gRPC availability) + let db_config = DatabaseConfig::from_env(); + let db = Database::connect(&db_config) + .await + .map_err(|e| format!("Database connection failed: {e}"))?; + imks::database::run_migrations(db.pool()) + .await + .map_err(|e| format!("Database migration failed: {e}"))?; + let repo = Arc::new(MessageRepo::new(db.pool().clone())); + tracing::info!("Database connected and migrated"); + + // Shared service handle — swapped atomically when appks comes/goes + let service: Arc>>> = Arc::new(RwLock::new(None)); + + // Helper to try creating a MessageService given an appks address. + // Returns true on success. + let try_connect_service = { + let service = service.clone(); + let repo = repo.clone(); + let namespaces = socket_server.namespaces.clone(); + let current_addr = current_appks_addr.clone(); + let rpc_config = RpcConfig { + appks_addr: appks_addr.clone(), + ..RpcConfig::from_env() }; - - let socket_server = Arc::new(builder.build()); - let _ = namespace_holder.set(socket_server.namespaces.clone()); - - // Initialize database + gRPC + service - let service: Option> = { - let rpc_config = RpcConfig { - appks_addr: appks_addr.clone(), - ..RpcConfig::from_env() + move |addr: String| { + let service = service.clone(); + let repo = repo.clone(); + let namespaces = namespaces.clone(); + let current_addr = current_addr.clone(); + let mut rpc = rpc_config.clone(); + // etcd-registered address is bare "host:port" — prepend scheme for gRPC + rpc.appks_addr = if addr.starts_with("http") { + addr + } else { + format!("http://{}", addr) }; - let db_config = DatabaseConfig::from_env(); + async move { + match AppksClients::connect(&rpc).await { + Ok(clients) => { + match MessageService::new((*repo).clone(), clients, namespaces.clone()) + .await + { + Ok(svc) => { + let svc = Arc::new(svc); + let mut guard = service.write().await; + *guard = Some(svc); - match AppksClients::connect(&rpc_config).await { - Ok(clients) => { - let db = Database::connect(&db_config) - .await - .map_err(|e| format!("Database connection failed: {e}"))?; + // Update the active appks address for health checker + let mut addr_guard = current_addr.write().await; + *addr_guard = Some(rpc.appks_addr.clone()); - imks::database::run_migrations(db.pool()) - .await - .map_err(|e| format!("Database migration failed: {e}"))?; - - let repo = MessageRepo::new(db.pool().clone()); - - let svc = MessageService::new(repo, clients, socket_server.namespaces.clone()) - .await - .map_err(|e| format!("Failed to initialize message service: {e}"))?; - - tracing::info!("Message service initialized with gRPC permission checks"); - Some(Arc::new(svc)) - } - Err(e) => { - tracing::warn!("gRPC unavailable: {e}. Running without permission checks."); - None + tracing::info!( + addr = %rpc.appks_addr, + "Message service initialized" + ); + true + } + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + error = %e, + "Failed to init message service" + ); + false + } + } + } + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + error = %e, + "gRPC connect failed" + ); + false + } } } - }; + } + }; - // Register connect handler - let namespace = socket_server.of("/"); - let svc_connect = service.clone(); - namespace - .on_connect(move |socket, auth_data| { - if let Some(ref svc) = svc_connect { - svc.authenticate_socket(socket, auth_data) - .map_err(|e| e.to_string())?; + // Try initial connection: etcd-discovered addr first, then env fallback + let env_fallback = + std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string()); + if !try_connect_service(appks_addr.clone()).await { + if appks_addr != env_fallback { + tracing::info!( + etcd_addr = %appks_addr, + fallback = %env_fallback, + "etcd-discovered appks unreachable, trying env fallback" + ); + try_connect_service(env_fallback).await; + } else { + tracing::warn!( + %appks_addr, + "Initial connection to appks failed — will retry when etcd discovers new instances" + ); + } + } + + // Watch etcd for appks join/leave — reconnect dynamically + { + let try_connect = try_connect_service.clone(); + let svc_clear = service.clone(); + let addr_clear = current_appks_addr.clone(); + etcd.start_service_watcher( + "appks", + move |addr| { + tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection"); + let try_connect = try_connect.clone(); + tokio::spawn(async move { + let ok = try_connect(addr).await; + tracing::info!(ok, "etcd watcher: reconnection result"); + }); + }, + move |_addr| { + let svc_clear = svc_clear.clone(); + let addr_clear = addr_clear.clone(); + tokio::spawn(async move { + let mut guard = svc_clear.write().await; + *guard = None; + // Also clear the tracked address so health checker stops probing + let mut addr_guard = addr_clear.write().await; + *addr_guard = None; + tracing::warn!( + "etcd watcher: appks instance DOWN — running without permission checks" + ); + }); + }, + ); + } + + // Health check loop: probe appks gRPC health every 5 seconds. + // If appks becomes unhealthy, clear the MessageService (degraded mode) + // and trigger an immediate reconnection attempt. + { + let health_service = service.clone(); + let health_addr = current_appks_addr.clone(); + let health_reconnect = try_connect_service.clone(); + tokio::spawn(async move { + tracing::info!( + interval_secs = 5, + "Health check loop started — probing appks gRPC health every 5s" + ); + + let mut last_logged_ok = false; + let mut waiting_ticks = 0u64; + + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + + // Snapshot the current appks address + let addr = health_addr.read().await.clone(); + let Some(ref addr) = addr else { + // No appks connected yet + waiting_ticks += 1; + if !last_logged_ok && waiting_ticks % 12 == 1 { + // Log every ~60s while waiting (12 × 5s) + tracing::info!( + waiting_secs = waiting_ticks * 5, + "No appks connection — health check paused, waiting for etcd discovery" + ); + } + continue; + }; + + waiting_ticks = 0; + + match imks::rpc::health::check_appks_health(addr).await { + Ok(true) => { + if !last_logged_ok { + tracing::info!(%addr, "appks health check OK"); + last_logged_ok = true; + } + } + Ok(false) | Err(_) => { + tracing::warn!(%addr, "appks health check FAILED — entering degraded mode"); + + // Clear the MessageService so event handlers drop requests + let mut guard = health_service.write().await; + *guard = None; + + // Clear the tracked address so the next cycle skips + // health checks until a new connection is established + let mut addr_guard = health_addr.write().await; + *addr_guard = None; + + last_logged_ok = false; + + // Trigger an immediate reconnection attempt. + // If the same address recovers, try_connect will restore + // the service. If not, we wait for etcd discovery. + let reconnect = health_reconnect.clone(); + let reconnect_addr = addr.clone(); + tokio::spawn(async move { + if !reconnect(reconnect_addr).await { + tracing::warn!( + "Immediate reconnection after health failure failed, waiting for etcd discovery" + ); + } + }); + } } + } + }); + } - // Increment connection metrics - let m = telemetry::metrics::get(); - m.connections_active.add( - 1, - &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), - ); - m.connections_total.add( - 1, - &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), - ); - telemetry::health::connection_connected(); + // Register connect handler + let namespace = socket_server.of("/"); + let svc_connect = service.clone(); + namespace + .on_connect(move |socket, auth_data| { + let svc = svc_connect.blocking_read(); + if let Some(ref svc) = *svc { + svc.authenticate_socket(socket, auth_data) + .map_err(|e| e.to_string())?; + } - tracing::info!( - socket_sid = %socket.sid, - engine_sid = %socket.engine_sid, - namespace = %socket.namespace, - "Socket connected" - ); - Ok(()) - }) - .await; + // Increment connection metrics + let m = telemetry::metrics::get(); + m.connections_active.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + m.connections_total.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + telemetry::health::connection_connected(); + tracing::info!( + socket_sid = %socket.sid, + engine_sid = %socket.engine_sid, + namespace = %socket.namespace, + "Socket connected" + ); + Ok(()) + }) + .await; - - // Register Socket.IO event handlers - if let Some(ref svc) = service { - macro_rules! register_event { + // Register Socket.IO event handlers (always register — each handler reads the latest service) + { + let svc = service.clone(); + macro_rules! register_event { ($svc:expr, $ns:expr, $event:expr, $method:ident) => { let s = $svc.clone(); let event_name = $event.to_string(); @@ -208,8 +412,13 @@ fn main() -> Result<(), Box> { ); let _enter = _span.enter(); + let svc_guard = s.read().await; + let Some(ref svc) = *svc_guard else { + tracing::warn!(event = %event, "No message service available, dropping event"); + return; + }; let start = std::time::Instant::now(); - if let Err(e) = s.$method(socket, &data).await { + if let Err(e) = svc.$method(socket, &data).await { tracing::error!(event = %event, error = %e, "Event handler failed"); } let elapsed = start.elapsed().as_secs_f64(); @@ -222,83 +431,82 @@ fn main() -> Result<(), Box> { }; } - register_event!(svc, namespace, "channel:join", join_channel); - register_event!(svc, namespace, "channel:leave", leave_channel); - register_event!(svc, namespace, "message:send", send_message); - register_event!(svc, namespace, "message:edit", edit_message); - register_event!(svc, namespace, "message:delete", delete_message); - register_event!(svc, namespace, "reaction:add", toggle_reaction); - register_event!(svc, namespace, "pin:add", pin_message); - register_event!(svc, namespace, "pin:remove", unpin_message); - register_event!(svc, namespace, "poll:vote", poll_vote); - register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote); - register_event!(svc, namespace, "typing:start", typing_start); - register_event!(svc, namespace, "typing:stop", typing_stop); - register_event!(svc, namespace, "presence:update", presence_update); - register_event!(svc, namespace, "draft:save", save_draft); - register_event!(svc, namespace, "draft:get", get_draft); - register_event!(svc, namespace, "draft:delete", delete_draft); - register_event!(svc, namespace, "read_state:mark", mark_read); - register_event!(svc, namespace, "read_state:get", get_read_state); - register_event!(svc, namespace, "notification:list", list_notifications); - register_event!( - svc, - namespace, - "notification:mark_read", - mark_notification_read - ); - register_event!( - svc, - namespace, - "notification:mark_all_read", - mark_all_notifications_read - ); - register_event!(svc, namespace, "bookmark:add", add_bookmark); - register_event!(svc, namespace, "bookmark:remove", remove_bookmark); - register_event!(svc, namespace, "bookmark:list", list_bookmarks); - register_event!(svc, namespace, "thread:create", create_thread); - register_event!(svc, namespace, "thread:resolve", resolve_thread); - register_event!(svc, namespace, "thread:join", join_thread); - register_event!(svc, namespace, "thread:leave", leave_thread); - register_event!(svc, namespace, "thread:list", list_threads); - register_event!(svc, namespace, "article:create", create_article); - register_event!(svc, namespace, "article:update", update_article); - register_event!(svc, namespace, "article:list", list_articles); - register_event!(svc, namespace, "article:delete", delete_article); - register_event!(svc, namespace, "component:interact", interact_component); - register_event!(svc, namespace, "component:update", update_component); + register_event!(svc, namespace, "channel:join", join_channel); + register_event!(svc, namespace, "channel:leave", leave_channel); + register_event!(svc, namespace, "message:send", send_message); + register_event!(svc, namespace, "message:edit", edit_message); + register_event!(svc, namespace, "message:delete", delete_message); + register_event!(svc, namespace, "reaction:add", toggle_reaction); + register_event!(svc, namespace, "pin:add", pin_message); + register_event!(svc, namespace, "pin:remove", unpin_message); + register_event!(svc, namespace, "poll:vote", poll_vote); + register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote); + register_event!(svc, namespace, "typing:start", typing_start); + register_event!(svc, namespace, "typing:stop", typing_stop); + register_event!(svc, namespace, "presence:update", presence_update); + register_event!(svc, namespace, "draft:save", save_draft); + register_event!(svc, namespace, "draft:get", get_draft); + register_event!(svc, namespace, "draft:delete", delete_draft); + register_event!(svc, namespace, "read_state:mark", mark_read); + register_event!(svc, namespace, "read_state:get", get_read_state); + register_event!(svc, namespace, "notification:list", list_notifications); + register_event!( + svc, + namespace, + "notification:mark_read", + mark_notification_read + ); + register_event!( + svc, + namespace, + "notification:mark_all_read", + mark_all_notifications_read + ); + register_event!(svc, namespace, "bookmark:add", add_bookmark); + register_event!(svc, namespace, "bookmark:remove", remove_bookmark); + register_event!(svc, namespace, "bookmark:list", list_bookmarks); + register_event!(svc, namespace, "thread:create", create_thread); + register_event!(svc, namespace, "thread:resolve", resolve_thread); + register_event!(svc, namespace, "thread:join", join_thread); + register_event!(svc, namespace, "thread:leave", leave_thread); + register_event!(svc, namespace, "thread:list", list_threads); + register_event!(svc, namespace, "article:create", create_article); + register_event!(svc, namespace, "article:update", update_article); + register_event!(svc, namespace, "article:list", list_articles); + register_event!(svc, namespace, "article:delete", delete_article); + register_event!(svc, namespace, "component:interact", interact_component); + register_event!(svc, namespace, "component:update", update_component); - // Start scheduled message dispatcher (background task) - svc.clone().start_scheduled_dispatcher(); + tracing::info!("Registered Socket.IO event handlers"); + } - tracing::info!("Registered Socket.IO event handlers with observability instrumentation"); - } + // Start scheduled message dispatcher (once) + if let Some(ref svc) = *service.read().await { + svc.clone().start_scheduled_dispatcher(); + } - // Start servers - if deploy.webtransport_enabled && !deploy.cert_path.is_empty() { - let engine = socket_server.engine.clone(); - let wt_port = deploy.webtransport_port; - let cert_path = deploy.cert_path.clone(); - let key_path = deploy.key_path.clone(); - let server = socket_server.clone(); + // Start servers + if deploy.webtransport_enabled && !deploy.cert_path.is_empty() { + let engine = socket_server.engine.clone(); + let wt_port = deploy.webtransport_port; + let cert_path = deploy.cert_path.clone(); + let key_path = deploy.key_path.clone(); + let server = socket_server.clone(); - tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port); + tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port); - tokio::select! { - result = server.run_http(addr) => { - result?; - } - result = engine.run_webtransport(wt_port, &cert_path, &key_path) => { - result?; - } + tokio::select! { + result = server.run_http(addr) => { + result?; + } + result = engine.run_webtransport(wt_port, &cert_path, &key_path) => { + result?; } - } else { - tracing::info!("Socket.IO HTTP server listening on {}", addr); - socket_server.run_http(addr).await?; } - - Ok::<(), Box>(()) - })?; + } else { + tracing::info!("Socket.IO HTTP server listening on {}", addr); + socket_server.run_http(addr).await?; + } // Graceful telemetry shutdown telemetry_guard.shutdown(); diff --git a/migrate/000_message_thread_base.sql b/migrate/000_message_thread_base.sql deleted file mode 100644 index 4a482ed..0000000 --- a/migrate/000_message_thread_base.sql +++ /dev/null @@ -1,23 +0,0 @@ --- Create message_thread before migrations that reference it. --- Safe for existing databases because the table may already exist from 004. - -BEGIN; - -CREATE TABLE IF NOT EXISTS message_thread ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - root_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - created_by UUID NOT NULL, - replies_count BIGINT NOT NULL DEFAULT 0, - participants_count BIGINT NOT NULL DEFAULT 0, - last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - last_reply_at TIMESTAMPTZ NULL, - resolved BOOLEAN NOT NULL DEFAULT FALSE, - resolved_by UUID NULL, - resolved_at TIMESTAMPTZ NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_thread_root UNIQUE (root_message_id) -); - -COMMIT; diff --git a/migrate/001_message_rich_content.sql b/migrate/001_message_rich_content.sql deleted file mode 100644 index 8cebf57..0000000 --- a/migrate/001_message_rich_content.sql +++ /dev/null @@ -1,115 +0,0 @@ --- ============================================================ --- Migration: 001_message_rich_content.sql --- Tables: message_attachment, message_embed, message_embed_field, --- message_poll, message_poll_option, message_poll_vote --- ============================================================ --- These tables extend the existing `message` table (from appks 001_init.sql) --- with Discord-style rich content: file attachments, link preview embeds, --- and interactive polls. - -BEGIN; - --- models/message_attachment.rs → message_attachment -CREATE TABLE IF NOT EXISTS message_attachment ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - filename TEXT NOT NULL, - content_type TEXT NULL, - size BIGINT NOT NULL, - url TEXT NOT NULL, - storage_key TEXT NULL, - width INTEGER NULL, - height INTEGER NULL, - duration_secs DOUBLE PRECISION NULL, - blurhash TEXT NULL, - spoiler BOOLEAN NOT NULL DEFAULT FALSE, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); -CREATE INDEX IF NOT EXISTS idx_message_attachment_message_id - ON message_attachment (message_id); - --- models/message_embed.rs → message_embed -CREATE TABLE IF NOT EXISTS message_embed ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - embed_type TEXT NOT NULL, - title TEXT NULL, - description TEXT NULL, - url TEXT NULL, - color INTEGER NULL, - image_url TEXT NULL, - image_width INTEGER NULL, - image_height INTEGER NULL, - thumbnail_url TEXT NULL, - thumbnail_width INTEGER NULL, - thumbnail_height INTEGER NULL, - video_url TEXT NULL, - video_width INTEGER NULL, - video_height INTEGER NULL, - author_name TEXT NULL, - author_url TEXT NULL, - author_icon_url TEXT NULL, - footer_text TEXT NULL, - footer_icon_url TEXT NULL, - provider_name TEXT NULL, - provider_url TEXT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); -CREATE INDEX IF NOT EXISTS idx_message_embed_message_id - ON message_embed (message_id); - --- models/message_embed.rs → message_embed_field -CREATE TABLE IF NOT EXISTS message_embed_field ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - embed_id UUID NOT NULL REFERENCES message_embed(id) ON DELETE CASCADE, - name TEXT NOT NULL, - value TEXT NOT NULL, - inline BOOLEAN NOT NULL DEFAULT FALSE, - position INTEGER NOT NULL DEFAULT 0 -); -CREATE INDEX IF NOT EXISTS idx_message_embed_field_embed_id - ON message_embed_field (embed_id); - --- models/message_poll.rs → message_poll -CREATE TABLE IF NOT EXISTS message_poll ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - question TEXT NOT NULL, - allow_multiselect BOOLEAN NOT NULL DEFAULT FALSE, - max_selections INTEGER NULL, - expires_at TIMESTAMPTZ NULL, - total_votes BIGINT NOT NULL DEFAULT 0, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_poll_message UNIQUE (message_id) -); -CREATE INDEX IF NOT EXISTS idx_message_poll_message_id - ON message_poll (message_id); - --- models/message_poll.rs → message_poll_option -CREATE TABLE IF NOT EXISTS message_poll_option ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - poll_id UUID NOT NULL REFERENCES message_poll(id) ON DELETE CASCADE, - text TEXT NOT NULL, - emoji TEXT NULL, - vote_count BIGINT NOT NULL DEFAULT 0, - position INTEGER NOT NULL DEFAULT 0 -); -CREATE INDEX IF NOT EXISTS idx_message_poll_option_poll_id - ON message_poll_option (poll_id); - --- models/message_poll.rs → message_poll_vote -CREATE TABLE IF NOT EXISTS message_poll_vote ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - poll_id UUID NOT NULL REFERENCES message_poll(id) ON DELETE CASCADE, - option_id UUID NOT NULL REFERENCES message_poll_option(id) ON DELETE CASCADE, - user_id UUID NOT NULL REFERENCES "user"(id) ON DELETE CASCADE, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_poll_vote UNIQUE (poll_id, user_id, option_id) -); -CREATE INDEX IF NOT EXISTS idx_message_poll_vote_poll_id - ON message_poll_vote (poll_id); -CREATE INDEX IF NOT EXISTS idx_message_poll_vote_user_id - ON message_poll_vote (user_id); - -COMMIT; diff --git a/migrate/002_message_social.sql b/migrate/002_message_social.sql deleted file mode 100644 index edf209f..0000000 --- a/migrate/002_message_social.sql +++ /dev/null @@ -1,76 +0,0 @@ --- ============================================================ --- Migration: 002_message_social.sql --- Tables: message_pin, message_read_state, message_draft, message_edit --- ============================================================ --- Extends the message subsystem with pinned messages, read receipts, --- drafts, and edit history. - -BEGIN; - --- models/message_pin.rs → message_pin -CREATE TABLE IF NOT EXISTS message_pin ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - pinned_by UUID NOT NULL, - position INTEGER NOT NULL DEFAULT 0, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_pin_channel_message UNIQUE (channel_id, message_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_pin_channel_id - ON message_pin (channel_id); - --- models/message_read_state.rs → message_read_state -CREATE TABLE IF NOT EXISTS message_read_state ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - user_id UUID NOT NULL, - last_read_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - last_read_at TIMESTAMPTZ NULL, - unread_count BIGINT NOT NULL DEFAULT 0, - unread_mentions BIGINT NOT NULL DEFAULT 0, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_read_state_channel_user UNIQUE (channel_id, user_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_read_state_user_id - ON message_read_state (user_id); -CREATE INDEX IF NOT EXISTS idx_message_read_state_channel_id - ON message_read_state (channel_id); - --- models/message_draft.rs → message_draft -CREATE TABLE IF NOT EXISTS message_draft ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - user_id UUID NOT NULL, - thread_id UUID NULL REFERENCES message_thread(id) ON DELETE CASCADE, - reply_to_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - body TEXT NOT NULL DEFAULT '', - metadata JSONB NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_draft_channel_user_thread - UNIQUE (channel_id, user_id, thread_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_draft_user_id - ON message_draft (user_id); - --- models/message_edit.rs → message_edit -CREATE TABLE IF NOT EXISTS message_edit ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - edited_by UUID NOT NULL, - old_body TEXT NOT NULL, - new_body TEXT NOT NULL, - edited_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_edit_message_id - ON message_edit (message_id); -CREATE INDEX IF NOT EXISTS idx_message_edit_edited_at - ON message_edit (edited_at); - -COMMIT; diff --git a/migrate/003_message_article.sql b/migrate/003_message_article.sql deleted file mode 100644 index d7452bb..0000000 --- a/migrate/003_message_article.sql +++ /dev/null @@ -1,45 +0,0 @@ --- ============================================================ --- Migration: 003_message_article.sql --- Tables: message_article --- ============================================================ --- Extends the message subsystem with forum-style article posts. --- Articles extend regular messages with title, cover image, tags, --- and view/like stats. Rendered as waterfall cards in forum channels. - -BEGIN; - --- models/message_article.rs → message_article -CREATE TABLE IF NOT EXISTS message_article ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - title TEXT NOT NULL, - summary TEXT NULL, - cover_url TEXT NULL, - cover_width INTEGER NULL, - cover_height INTEGER NULL, - cover_color TEXT NULL, - tags JSONB NULL, - view_count BIGINT NOT NULL DEFAULT 0, - like_count BIGINT NOT NULL DEFAULT 0, - bookmark_count BIGINT NOT NULL DEFAULT 0, - reply_count BIGINT NOT NULL DEFAULT 0, - last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - last_reply_at TIMESTAMPTZ NULL, - last_reply_user_id UUID NULL, - is_pinned_to_top BOOLEAN NOT NULL DEFAULT FALSE, - is_answered BOOLEAN NOT NULL DEFAULT FALSE, - answered_by UUID NULL, - answered_at TIMESTAMPTZ NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_article_message UNIQUE (message_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_article_last_reply_at - ON message_article (last_reply_at DESC NULLS LAST); -CREATE INDEX IF NOT EXISTS idx_message_article_is_pinned_to_top - ON message_article (is_pinned_to_top DESC, last_reply_at DESC NULLS LAST); -CREATE INDEX IF NOT EXISTS idx_message_article_view_count - ON message_article (view_count DESC); - -COMMIT; diff --git a/migrate/004_message_social_part2.sql b/migrate/004_message_social_part2.sql deleted file mode 100644 index 46d078b..0000000 --- a/migrate/004_message_social_part2.sql +++ /dev/null @@ -1,98 +0,0 @@ --- ============================================================ --- Migration: 004_message_social_part2.sql --- Tables: message_reaction, message_bookmark, message_mention, --- message_thread, message_thread_participant --- ============================================================ - -BEGIN; - --- models/message_reaction.rs → message_reaction -CREATE TABLE IF NOT EXISTS message_reaction ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - channel_id UUID NOT NULL, - user_id UUID NOT NULL, - content TEXT NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_reaction_user_content UNIQUE (message_id, user_id, content) -); - -CREATE INDEX IF NOT EXISTS idx_message_reaction_message_id - ON message_reaction (message_id); -CREATE INDEX IF NOT EXISTS idx_message_reaction_user_id - ON message_reaction (user_id); - --- models/message_bookmark.rs → message_bookmark -CREATE TABLE IF NOT EXISTS message_bookmark ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - channel_id UUID NOT NULL, - user_id UUID NOT NULL, - note TEXT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_bookmark_user_message UNIQUE (user_id, message_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_bookmark_user_id - ON message_bookmark (user_id); -CREATE INDEX IF NOT EXISTS idx_message_bookmark_message_id - ON message_bookmark (message_id); - --- models/message_mention.rs → message_mention -CREATE TABLE IF NOT EXISTS message_mention ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - channel_id UUID NOT NULL, - mentioned_user_id UUID NOT NULL, - mentioned_by UUID NOT NULL, - read_at TIMESTAMPTZ NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_mention_message_id - ON message_mention (message_id); -CREATE INDEX IF NOT EXISTS idx_message_mention_mentioned_user - ON message_mention (mentioned_user_id); - --- models/message_thread.rs → message_thread -CREATE TABLE IF NOT EXISTS message_thread ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - root_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - created_by UUID NOT NULL, - replies_count BIGINT NOT NULL DEFAULT 0, - participants_count BIGINT NOT NULL DEFAULT 0, - last_reply_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - last_reply_at TIMESTAMPTZ NULL, - resolved BOOLEAN NOT NULL DEFAULT FALSE, - resolved_by UUID NULL, - resolved_at TIMESTAMPTZ NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_message_thread_root UNIQUE (root_message_id) -); - -CREATE INDEX IF NOT EXISTS idx_message_thread_channel_id - ON message_thread (channel_id); -CREATE INDEX IF NOT EXISTS idx_message_thread_last_reply_at - ON message_thread (last_reply_at DESC NULLS LAST); - --- models/message_thread_participant.rs → message_thread_participant -CREATE TABLE IF NOT EXISTS message_thread_participant ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - thread_id UUID NOT NULL REFERENCES message_thread(id) ON DELETE CASCADE, - user_id UUID NOT NULL, - joined_reason TEXT NULL, - last_read_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - last_read_at TIMESTAMPTZ NULL, - joined_at TIMESTAMPTZ NOT NULL DEFAULT now(), - CONSTRAINT uq_thread_participant UNIQUE (thread_id, user_id) -); - -CREATE INDEX IF NOT EXISTS idx_thread_participant_thread_id - ON message_thread_participant (thread_id); -CREATE INDEX IF NOT EXISTS idx_thread_participant_user_id - ON message_thread_participant (user_id); - -COMMIT; diff --git a/migrate/005_message_misc.sql b/migrate/005_message_misc.sql deleted file mode 100644 index 5966a2b..0000000 --- a/migrate/005_message_misc.sql +++ /dev/null @@ -1,102 +0,0 @@ --- ============================================================ --- Migration: 005_message_misc.sql --- Tables: message_notification, message_scheduled, message_sticker, --- message_forward, message_component --- ============================================================ - -BEGIN; - --- models/message_notification.rs → message_notification -CREATE TABLE IF NOT EXISTS message_notification ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - channel_id UUID NOT NULL, - user_id UUID NOT NULL, - reason TEXT NOT NULL, - status TEXT NOT NULL DEFAULT 'pending', - delivery_channel TEXT NULL, - delivered_at TIMESTAMPTZ NULL, - read_at TIMESTAMPTZ NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_notification_user_id - ON message_notification (user_id, created_at DESC); -CREATE INDEX IF NOT EXISTS idx_message_notification_status - ON message_notification (status); - --- models/message_scheduled.rs → message_scheduled -CREATE TABLE IF NOT EXISTS message_scheduled ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - channel_id UUID NOT NULL, - author_id UUID NOT NULL, - thread_id UUID NULL REFERENCES message_thread(id) ON DELETE SET NULL, - reply_to_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - body TEXT NOT NULL, - metadata JSONB NULL, - scheduled_at TIMESTAMPTZ NOT NULL, - status TEXT NOT NULL DEFAULT 'pending', - sent_message_id UUID NULL REFERENCES message(id) ON DELETE SET NULL, - error TEXT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now(), - updated_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_scheduled_status_at - ON message_scheduled (status, scheduled_at); - --- models/message_sticker.rs → message_sticker -CREATE TABLE IF NOT EXISTS message_sticker ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - sticker_id UUID NOT NULL, - name TEXT NOT NULL, - image_url TEXT NOT NULL, - format_type TEXT NOT NULL DEFAULT 'png', - pack_name TEXT NULL, - tags TEXT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_sticker_message_id - ON message_sticker (message_id); - --- models/message_forward.rs → message_forward -CREATE TABLE IF NOT EXISTS message_forward ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - source_message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - source_channel_id UUID NOT NULL, - forwarded_by UUID NOT NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_forward_message_id - ON message_forward (message_id); -CREATE INDEX IF NOT EXISTS idx_message_forward_source_message_id - ON message_forward (source_message_id); - --- models/message_component.rs → message_component -CREATE TABLE IF NOT EXISTS message_component ( - id UUID PRIMARY KEY DEFAULT gen_random_uuid(), - message_id UUID NOT NULL REFERENCES message(id) ON DELETE CASCADE, - row INTEGER NOT NULL DEFAULT 0, - position INTEGER NOT NULL DEFAULT 0, - component_type TEXT NOT NULL, - custom_id TEXT NOT NULL, - label TEXT NULL, - emoji TEXT NULL, - style TEXT NULL, - url TEXT NULL, - disabled BOOLEAN NOT NULL DEFAULT FALSE, - placeholder TEXT NULL, - min_values INTEGER NULL, - max_values INTEGER NULL, - options JSONB NULL, - created_at TIMESTAMPTZ NOT NULL DEFAULT now() -); - -CREATE INDEX IF NOT EXISTS idx_message_component_message_id - ON message_component (message_id); - -COMMIT; diff --git a/migrate/006_fix_uuid_defaults_and_draft_null.sql b/migrate/006_fix_uuid_defaults_and_draft_null.sql deleted file mode 100644 index d102372..0000000 --- a/migrate/006_fix_uuid_defaults_and_draft_null.sql +++ /dev/null @@ -1,32 +0,0 @@ --- Align imks-managed IDs with application-generated UUID v7 values and --- make top-level drafts unique when thread_id is NULL. - -BEGIN; - -CREATE UNIQUE INDEX IF NOT EXISTS uq_message_draft_channel_user_no_thread - ON message_draft (channel_id, user_id) - WHERE thread_id IS NULL; - -ALTER TABLE message_attachment ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_embed ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_embed_field ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_poll ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_poll_option ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_poll_vote ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_pin ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_read_state ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_draft ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_edit ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_article ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_reaction ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_bookmark ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_mention ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_thread ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_thread_participant ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_notification ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_scheduled ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_sticker ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_forward ALTER COLUMN id DROP DEFAULT; -ALTER TABLE message_component ALTER COLUMN id DROP DEFAULT; - -COMMIT; diff --git a/rpc/mod.rs b/rpc/mod.rs index 8c8e8a4..077d3eb 100644 --- a/rpc/mod.rs +++ b/rpc/mod.rs @@ -1,5 +1,6 @@ pub mod clients; pub mod config; +pub mod health; pub use clients::AppksClients; pub use config::RpcConfig; diff --git a/socket/adapter/redis.rs b/socket/adapter/redis.rs index 0015334..a9cd403 100644 --- a/socket/adapter/redis.rs +++ b/socket/adapter/redis.rs @@ -3,13 +3,12 @@ use std::sync::Arc; use async_trait::async_trait; use dashmap::DashMap; -use fred::clients::Client; -use fred::interfaces::{KeysInterface, SetsInterface}; use tokio::sync::mpsc; use crate::socket::adapter::{ Adapter, AdapterError, BroadcastOptions, BusMessage, LocalBroadcastFn, SocketInfo, }; +use crate::socket::message_bus::redis::RedisCommandClient; use crate::socket::message_bus::MessageBus; use crate::socket::packet::Packet; use crate::socket::parser; @@ -68,7 +67,7 @@ async fn handle_bus_message( pub struct RedisAdapter { message_bus: Arc, - redis_client: Client, + redis_client: RedisCommandClient, room_subscribers: DashMap>>, socket_rooms: DashMap>, rooms: DashMap>, @@ -83,7 +82,7 @@ pub struct RedisAdapter { impl RedisAdapter { pub fn new( message_bus: Arc, - redis_client: Client, + redis_client: RedisCommandClient, server_id: String, namespace: String, on_local_broadcast: LocalBroadcastFn, @@ -195,12 +194,12 @@ impl Adapter for RedisAdapter { let srk = socket_rooms_key(ns, sid); self.redis_client - .sadd::<(), _, _>(&rk, sid) + .query::<()>(redis::cmd("SADD").arg(&rk).arg(sid)) .await .map_err(|e| AdapterError::Redis(e.to_string()))?; self.redis_client - .sadd::<(), _, _>(&srk, room) + .query::<()>(redis::cmd("SADD").arg(&srk).arg(room)) .await .map_err(|e| AdapterError::Redis(e.to_string()))?; @@ -241,12 +240,12 @@ impl Adapter for RedisAdapter { let srk = socket_rooms_key(ns, sid); self.redis_client - .srem::<(), _, _>(&rk, sid) + .query::<()>(redis::cmd("SREM").arg(&rk).arg(sid)) .await .map_err(|e| AdapterError::Redis(e.to_string()))?; self.redis_client - .srem::<(), _, _>(&srk, room) + .query::<()>(redis::cmd("SREM").arg(&srk).arg(room)) .await .map_err(|e| AdapterError::Redis(e.to_string()))?; @@ -308,7 +307,11 @@ impl Adapter for RedisAdapter { } let rk = room_key(ns, room); - if let Err(e) = self.redis_client.srem::<(), _, _>(&rk, sid).await { + if let Err(e) = self + .redis_client + .query::<()>(redis::cmd("SREM").arg(&rk).arg(sid)) + .await + { tracing::warn!("Redis SREM room error: {}", e); } } @@ -316,7 +319,7 @@ impl Adapter for RedisAdapter { let srk = socket_rooms_key(ns, sid); self.redis_client - .del::<(), _>(&srk) + .query::<()>(redis::cmd("DEL").arg(&srk)) .await .map_err(|e| AdapterError::Redis(e.to_string()))?; diff --git a/socket/message_bus/redis.rs b/socket/message_bus/redis.rs index 1027611..0c55c41 100644 --- a/socket/message_bus/redis.rs +++ b/socket/message_bus/redis.rs @@ -1,54 +1,106 @@ use async_trait::async_trait; -use fred::clients::{Client, SubscriberClient}; -use fred::interfaces::{ClientLike, EventInterface, PubsubInterface}; -use fred::prelude::*; +use futures_util::StreamExt; +use redis::aio::ConnectionManager; +use redis::cluster::ClusterClient; +use redis::cluster_async::ClusterConnection; +use redis::{Client, FromRedisValue}; use tokio::sync::mpsc; +use tokio::time::{Duration, timeout}; use crate::socket::message_bus::{MessageBus, MessageBusError}; +const REDIS_CONNECT_TIMEOUT_SECS: u64 = 5; + +#[derive(Clone)] +pub enum RedisCommandClient { + Single(ConnectionManager), + Cluster(ClusterConnection), +} + +impl RedisCommandClient { + pub async fn query(&self, cmd: &mut redis::Cmd) -> redis::RedisResult { + match self { + Self::Single(conn) => { + let mut conn = conn.clone(); + cmd.query_async(&mut conn).await + } + Self::Cluster(conn) => { + let mut conn = conn.clone(); + cmd.query_async(&mut conn).await + } + } + } +} + pub struct RedisMessageBus { - client: Client, - subscriber: SubscriberClient, + command_client: RedisCommandClient, + pubsub_client: Client, } impl RedisMessageBus { - /// Connect to a Redis cluster. + /// Connect to Redis using the same `redis` crate as appks. /// - /// `cluster_url` should be in `redis-cluster://` format, e.g.: - /// `redis-cluster://host1:6379,host2:6379,host3:6379` - pub async fn new(cluster_url: &str) -> Result { - let config = - Config::from_url(cluster_url).map_err(|e| MessageBusError::Redis(e.to_string()))?; + /// Supports both single-node `redis://host:port` and cluster + /// `redis-cluster://host1:6379?node=host2:6379` URLs. + pub async fn new(redis_url: &str) -> Result { + tracing::info!("Connecting to Redis"); - let client = Client::new(config.clone(), None, None, None); - let subscriber = SubscriberClient::new(config, None, None, None); + let connect_timeout = Duration::from_secs(REDIS_CONNECT_TIMEOUT_SECS); + let parsed = parse_redis_url(redis_url)?; - let _ = client.connect().await; - let _ = subscriber.connect().await; + let (command_client, pubsub_url) = match parsed { + ParsedRedisConfig::Single(url) => { + let client = Client::open(url.as_str()) + .map_err(|e| MessageBusError::Redis(e.to_string()))?; + let conn = timeout(connect_timeout, client.get_connection_manager()) + .await + .map_err(|_| { + MessageBusError::Redis(format!( + "Redis connection timeout after {REDIS_CONNECT_TIMEOUT_SECS}s" + )) + })? + .map_err(|e| MessageBusError::Redis(e.to_string()))?; + (RedisCommandClient::Single(conn), url) + } + ParsedRedisConfig::Cluster(nodes) => { + let cluster_client = ClusterClient::new(nodes.iter().map(String::as_str).collect::>()) + .map_err(|e| MessageBusError::Redis(e.to_string()))?; + let conn = timeout(connect_timeout, cluster_client.get_async_connection()) + .await + .map_err(|_| { + MessageBusError::Redis(format!( + "Redis cluster connection timeout after {REDIS_CONNECT_TIMEOUT_SECS}s" + )) + })? + .map_err(|e| MessageBusError::Redis(e.to_string()))?; + let pubsub_url = nodes + .first() + .cloned() + .ok_or_else(|| MessageBusError::Redis("Redis cluster nodes are empty".into()))?; + (RedisCommandClient::Cluster(conn), pubsub_url) + } + }; - client - .wait_for_connect() - .await - .map_err(|e| MessageBusError::Redis(e.to_string()))?; - subscriber - .wait_for_connect() - .await + let pubsub_client = Client::open(pubsub_url.as_str()) .map_err(|e| MessageBusError::Redis(e.to_string()))?; - tracing::info!(cluster_url, "Redis cluster connected"); - Ok(Self { client, subscriber }) + tracing::info!("Redis connected"); + Ok(Self { + command_client, + pubsub_client, + }) } - pub fn client(&self) -> &Client { - &self.client + pub fn client(&self) -> RedisCommandClient { + self.command_client.clone() } } #[async_trait] impl MessageBus for RedisMessageBus { async fn publish(&self, channel: &str, message: &[u8]) -> Result<(), MessageBusError> { - self.client - .publish::<(), _, Vec>(channel, message.to_vec()) + self.command_client + .query::<()>(redis::cmd("PUBLISH").arg(channel).arg(message)) .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; Ok(()) @@ -56,23 +108,27 @@ impl MessageBus for RedisMessageBus { async fn subscribe(&self, channel: &str) -> Result>, MessageBusError> { let (tx, rx) = mpsc::channel::>(256); - - self.subscriber - .subscribe(channel.to_string()) + let mut pubsub = self + .pubsub_client + .get_async_pubsub() .await .map_err(|e| MessageBusError::Redis(e.to_string()))?; - let subscriber = self.subscriber.clone(); - let channel_owned = channel.to_string(); - let mut message_rx = subscriber.message_rx(); + pubsub + .subscribe(channel) + .await + .map_err(|e| MessageBusError::Redis(e.to_string()))?; + let channel_owned = channel.to_string(); tokio::spawn(async move { - while let Ok(message) = message_rx.recv().await { - if message.channel == channel_owned { - let data: Vec = FromValue::from_value(message.value).unwrap_or_default(); - if tx.send(data).await.is_err() { - break; - } + let mut stream = pubsub.on_message(); + while let Some(message) = stream.next().await { + if message.get_channel_name() != channel_owned { + continue; + } + let payload = message.get_payload::>().unwrap_or_default(); + if tx.send(payload).await.is_err() { + break; } } }); @@ -80,23 +136,92 @@ impl MessageBus for RedisMessageBus { Ok(rx) } - async fn unsubscribe(&self, channel: &str) -> Result<(), MessageBusError> { - self.subscriber - .unsubscribe(channel.to_string()) - .await - .map_err(|e| MessageBusError::Redis(e.to_string()))?; + async fn unsubscribe(&self, _channel: &str) -> Result<(), MessageBusError> { + // Each subscription owns its dedicated async PubSub connection inside + // the spawned listener task. Dropping the receiver stops local delivery. Ok(()) } async fn close(&self) -> Result<(), MessageBusError> { - self.client - .quit() - .await - .map_err(|e| MessageBusError::Redis(e.to_string()))?; - self.subscriber - .quit() - .await - .map_err(|e| MessageBusError::Redis(e.to_string()))?; Ok(()) } } + +enum ParsedRedisConfig { + Single(String), + Cluster(Vec), +} + +fn parse_redis_url(redis_url: &str) -> Result { + let Some(rest) = redis_url.strip_prefix("redis-cluster://") else { + return Ok(ParsedRedisConfig::Single(redis_url.to_string())); + }; + + let (first, query) = rest.split_once('?').unwrap_or((rest, "")); + let (auth, first_node) = split_auth(first); + let mut nodes = Vec::new(); + + if !first_node.is_empty() { + nodes.push(to_redis_node_url(auth, first_node)); + } + + for part in query.split('&') { + let Some(node) = part.strip_prefix("node=") else { + continue; + }; + if !node.is_empty() { + nodes.push(to_redis_node_url(auth, node)); + } + } + + if nodes.is_empty() { + return Err(MessageBusError::Redis("Redis cluster URL has no nodes".into())); + } + + Ok(ParsedRedisConfig::Cluster(nodes)) +} + +fn split_auth(value: &str) -> (&str, &str) { + value + .rsplit_once('@') + .map(|(auth, host)| (auth, host)) + .unwrap_or(("", value)) +} + +fn to_redis_node_url(auth: &str, node: &str) -> String { + if node.starts_with("redis://") || node.starts_with("rediss://") { + return node.to_string(); + } + if auth.is_empty() { + format!("redis://{node}") + } else { + format!("redis://{auth}@{node}") + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn parses_single_redis_url() { + match parse_redis_url("redis://127.0.0.1:6379").unwrap() { + ParsedRedisConfig::Single(url) => assert_eq!(url, "redis://127.0.0.1:6379"), + ParsedRedisConfig::Cluster(_) => panic!("expected single redis config"), + } + } + + #[test] + fn parses_cluster_url_for_redis_rs() { + match parse_redis_url("redis-cluster://:pass@127.0.0.1:6380?node=127.0.0.1:6381").unwrap() { + ParsedRedisConfig::Cluster(nodes) => assert_eq!( + nodes, + vec![ + "redis://:pass@127.0.0.1:6380".to_string(), + "redis://:pass@127.0.0.1:6381".to_string() + ] + ), + ParsedRedisConfig::Single(_) => panic!("expected cluster redis config"), + } + } +} diff --git a/socket/session_store/redis.rs b/socket/session_store/redis.rs index 84f5c9b..034efc6 100644 --- a/socket/session_store/redis.rs +++ b/socket/session_store/redis.rs @@ -1,8 +1,8 @@ use std::time::{SystemTime, UNIX_EPOCH}; use async_trait::async_trait; -use fred::prelude::*; +use crate::socket::message_bus::redis::RedisCommandClient; use crate::socket::message_bus::redis::RedisMessageBus; use crate::socket::session_store::{SessionError, SessionInfo, SessionStoreTrait}; @@ -17,14 +17,14 @@ const DEFAULT_TTL_SECS: u64 = 60; const KEY_PREFIX: &str = "socket.io:session"; pub struct RedisSessionStore { - client: Client, + client: RedisCommandClient, ttl_secs: u64, } impl RedisSessionStore { pub fn new(bus: &RedisMessageBus, ttl_secs: Option) -> Self { Self { - client: bus.client().clone(), + client: bus.client(), ttl_secs: ttl_secs.unwrap_or(DEFAULT_TTL_SECS), } } @@ -45,22 +45,29 @@ impl SessionStoreTrait for RedisSessionStore { let key = self.key(sid); let now = now_millis(); - // Batch all fields in a single HSET call for efficiency - let fields: Vec<(&str, String)> = vec![ - ("sid", sid.to_string()), - ("transport", transport.to_string()), - ("state", "connecting".to_string()), - ("server_id", server_id.to_string()), - ("created_at", now.to_string()), - ("last_ping", now.to_string()), - ]; + // Batch all fields in a single HMSET-style call self.client - .hset::<(), _, _>(&key, fields) + .query::<()>( + redis::cmd("HSET") + .arg(&key) + .arg("sid") + .arg(sid) + .arg("transport") + .arg(transport) + .arg("state") + .arg("connecting") + .arg("server_id") + .arg(server_id) + .arg("created_at") + .arg(now.to_string()) + .arg("last_ping") + .arg(now.to_string()), + ) .await .map_err(|e| SessionError::Redis(e.to_string()))?; self.client - .expire::<(), _>(&key, self.ttl_secs as i64, None) + .query::<()>(redis::cmd("EXPIRE").arg(&key).arg(self.ttl_secs as i64)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -70,11 +77,9 @@ impl SessionStoreTrait for RedisSessionStore { async fn get(&self, sid: &str) -> Result, SessionError> { let key = self.key(sid); - // Use hgetall directly — if the key doesn't exist Redis returns an empty map. - // This avoids the TOCTOU race between EXISTS and HGETALL. let values: std::collections::HashMap = self .client - .hgetall::, _>(&key) + .query(redis::cmd("HGETALL").arg(&key)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -103,14 +108,13 @@ impl SessionStoreTrait for RedisSessionStore { async fn set_state(&self, sid: &str, state: &str) -> Result<(), SessionError> { let key = self.key(sid); - // Use HSET (not HSETNX) to overwrite existing fields self.client - .hset::<(), _, _>(&key, ("state", state)) + .query::<()>(redis::cmd("HSET").arg(&key).arg("state").arg(state)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; self.client - .expire::<(), _>(&key, self.ttl_secs as i64, None) + .query::<()>(redis::cmd("EXPIRE").arg(&key).arg(self.ttl_secs as i64)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -120,14 +124,18 @@ impl SessionStoreTrait for RedisSessionStore { async fn set_transport(&self, sid: &str, transport: &str) -> Result<(), SessionError> { let key = self.key(sid); - // Use HSET (not HSETNX) to overwrite existing fields self.client - .hset::<(), _, _>(&key, ("transport", transport)) + .query::<()>( + redis::cmd("HSET") + .arg(&key) + .arg("transport") + .arg(transport), + ) .await .map_err(|e| SessionError::Redis(e.to_string()))?; self.client - .expire::<(), _>(&key, self.ttl_secs as i64, None) + .query::<()>(redis::cmd("EXPIRE").arg(&key).arg(self.ttl_secs as i64)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -138,14 +146,18 @@ impl SessionStoreTrait for RedisSessionStore { let key = self.key(sid); let now = now_millis(); - // Use HSET (not HSETNX) to overwrite existing fields self.client - .hset::<(), _, _>(&key, ("last_ping", now.to_string())) + .query::<()>( + redis::cmd("HSET") + .arg(&key) + .arg("last_ping") + .arg(now.to_string()), + ) .await .map_err(|e| SessionError::Redis(e.to_string()))?; self.client - .expire::<(), _>(&key, self.ttl_secs as i64, None) + .query::<()>(redis::cmd("EXPIRE").arg(&key).arg(self.ttl_secs as i64)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -156,7 +168,7 @@ impl SessionStoreTrait for RedisSessionStore { let key = self.key(sid); self.client - .del::<(), _>(&key) + .query::<()>(redis::cmd("DEL").arg(&key)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; @@ -168,7 +180,7 @@ impl SessionStoreTrait for RedisSessionStore { let exists: bool = self .client - .exists::(&key) + .query(redis::cmd("EXISTS").arg(&key)) .await .map_err(|e| SessionError::Redis(e.to_string()))?; diff --git a/svc/deploy.rs b/svc/deploy.rs index ad0e29d..3b50c7b 100644 --- a/svc/deploy.rs +++ b/svc/deploy.rs @@ -10,6 +10,10 @@ use std::env; pub struct DeployConfig { /// "local" | "redis" | "nats" pub adapter_mode: String, + /// Redis URL for single-node Redis, e.g. `redis://localhost:6379`. + pub redis_url: Option, + /// Whether Redis cluster mode is enabled. + pub redis_cluster_enabled: bool, /// Redis cluster nodes, comma-separated host:port pairs. /// Example: "redis1:6379,redis2:6379,redis3:6379" pub redis_cluster_nodes: String, @@ -35,9 +39,19 @@ impl DeployConfig { Self { adapter_mode: env::var("IMKS_ADAPTER").unwrap_or_else(|_| "local".into()), + redis_url: env::var("IMKS_REDIS_URL") + .ok() + .or_else(|| env::var("APP_REDIS_URL").ok()) + .filter(|v| !v.trim().is_empty()), + redis_cluster_enabled: env_bool("IMKS_REDIS_CLUSTER_ENABLED") + .or_else(|| env_bool("APP_REDIS_CLUSTER_ENABLED")) + .unwrap_or(false), redis_cluster_nodes: env::var("IMKS_REDIS_CLUSTER_NODES") - .unwrap_or_else(|_| "localhost:6379,localhost:6380,localhost:6381".into()), - redis_password: env::var("IMKS_REDIS_PASSWORD").unwrap_or_default(), + .or_else(|_| env::var("APP_REDIS_CLUSTER_NODES")) + .unwrap_or_default(), + redis_password: env::var("IMKS_REDIS_PASSWORD") + .or_else(|_| env::var("APP_REDIS_PASSWORD")) + .unwrap_or_default(), nats_url: env::var("IMKS_NATS_URL").unwrap_or_else(|_| "nats://localhost:4222".into()), server_id, webtransport_enabled: env::var("IMKS_WT_ENABLED") @@ -52,15 +66,56 @@ impl DeployConfig { } } + /// Build a redis-rs compatible Redis URL. + /// + /// This mirrors appks: + /// - cluster disabled: require `IMKS_REDIS_URL` or `APP_REDIS_URL` + /// - cluster enabled: require `IMKS_REDIS_CLUSTER_NODES` or `APP_REDIS_CLUSTER_NODES` + pub fn redis_url(&self) -> Result { + if self.redis_cluster_enabled { + return self.redis_cluster_url(); + } + + self.redis_url.clone().ok_or_else(|| { + "Redis cluster disabled but IMKS_REDIS_URL/APP_REDIS_URL is not set".into() + }) + } + /// Build a redis-cluster URL from cluster_nodes and optional password. - /// Format: redis-cluster://[:password@]host1:port1,host2:port2,... - pub fn redis_cluster_url(&self) -> String { + /// + /// Produces a redis-rs compatible URL in the format: + /// `redis-cluster://[password@]host1:port1?node=host2:port2&node=host3:port3` + /// + /// The first node becomes the URL authority host:port; additional nodes + /// are appended as `node` query parameters. + pub fn redis_cluster_url(&self) -> Result { let auth = if self.redis_password.is_empty() { String::new() } else { format!(":{}@", self.redis_password) }; - format!("redis-cluster://{}{}", auth, self.redis_cluster_nodes) + + let nodes: Vec = self + .redis_cluster_nodes + .split(',') + .map(normalize_redis_cluster_node) + .filter(|s| !s.is_empty()) + .collect(); + + if nodes.is_empty() { + return Err("Redis cluster enabled but IMKS_REDIS_CLUSTER_NODES/APP_REDIS_CLUSTER_NODES is empty".into()); + } + + if nodes.len() == 1 { + return Ok(format!("redis-cluster://{}{}", auth, nodes[0])); + } + + let mut url = format!("redis-cluster://{}{}", auth, nodes[0]); + for (i, node) in nodes.iter().skip(1).enumerate() { + url.push(if i == 0 { '?' } else { '&' }); + url.push_str(&format!("node={}", node)); + } + Ok(url) } } @@ -70,8 +125,76 @@ impl Default for DeployConfig { } } +fn env_bool(key: &str) -> Option { + env::var(key).ok().map(|v| v == "true" || v == "1") +} + +fn normalize_redis_cluster_node(node: &str) -> String { + let trimmed = node.trim(); + let without_scheme = trimmed + .strip_prefix("redis://") + .or_else(|| trimmed.strip_prefix("rediss://")) + .unwrap_or(trimmed); + let without_auth = without_scheme + .rsplit_once('@') + .map(|(_, host)| host) + .unwrap_or(without_scheme); + without_auth + .split('/') + .next() + .unwrap_or_default() + .to_string() +} + fn hostname() -> String { env::var("HOSTNAME") .or_else(|_| env::var("HOST")) .unwrap_or_else(|_| "imks-node-1".into()) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn normalize_cluster_nodes_accepts_appks_style_urls() { + assert_eq!( + normalize_redis_cluster_node("redis://127.0.0.1:6380"), + "127.0.0.1:6380" + ); + assert_eq!( + normalize_redis_cluster_node("rediss://127.0.0.1:6381/0"), + "127.0.0.1:6381" + ); + assert_eq!( + normalize_redis_cluster_node("redis://:secret@127.0.0.1:6382"), + "127.0.0.1:6382" + ); + assert_eq!( + normalize_redis_cluster_node("127.0.0.1:6383"), + "127.0.0.1:6383" + ); + } + + #[test] + fn redis_cluster_url_normalizes_nodes() { + let config = DeployConfig { + adapter_mode: "redis".into(), + redis_url: None, + redis_cluster_enabled: true, + redis_cluster_nodes: "redis://127.0.0.1:6380,redis://127.0.0.1:6381".into(), + redis_password: String::new(), + nats_url: "nats://localhost:4222".into(), + server_id: "test".into(), + webtransport_enabled: false, + webtransport_port: 3001, + cert_path: String::new(), + key_path: String::new(), + }; + + assert_eq!( + config.redis_cluster_url().unwrap(), + "redis-cluster://127.0.0.1:6380?node=127.0.0.1:6381" + ); + } +} diff --git a/svc/scheduled.rs b/svc/scheduled.rs index f75266a..92298fb 100644 --- a/svc/scheduled.rs +++ b/svc/scheduled.rs @@ -34,8 +34,7 @@ impl MessageService { let thread_id: Option = Self::parse_optional(payload, "thread_id")?; let reply_to_message_id: Option = Self::parse_optional(payload, "reply_to_message_id")?; - let metadata: Option = - Self::parse_optional(payload, "metadata")?; + let metadata: Option = Self::parse_optional(payload, "metadata")?; let scheduled_at_str: String = Self::parse_field(payload, "scheduled_at")?; let scheduled_at: DateTime = chrono::DateTime::parse_from_rfc3339(&scheduled_at_str) diff --git a/telemetry/config.rs b/telemetry/config.rs index a3c6589..68c7257 100644 --- a/telemetry/config.rs +++ b/telemetry/config.rs @@ -32,10 +32,7 @@ impl Default for TelemetryConfig { Self { service_name: env_or("OTEL_SERVICE_NAME", "imks"), service_version: env_or("OTEL_SERVICE_VERSION", env!("CARGO_PKG_VERSION")), - otlp_endpoint: env_or( - "OTEL_EXPORTER_OTLP_ENDPOINT", - "http://localhost:4317", - ), + otlp_endpoint: env_or("OTEL_EXPORTER_OTLP_ENDPOINT", "http://localhost:4317"), otlp_protocol: detect_otlp_protocol(), traces_enabled: env_bool("OTEL_TRACES_ENABLED", true), metrics_enabled: env_bool("OTEL_METRICS_ENABLED", true), diff --git a/telemetry/health.rs b/telemetry/health.rs index af236cb..cf570d1 100644 --- a/telemetry/health.rs +++ b/telemetry/health.rs @@ -52,10 +52,7 @@ pub fn connections_active_count() -> u64 { /// Returns the server uptime in seconds. pub fn uptime_secs() -> u64 { - START_TIME - .get() - .map(|t| t.elapsed().as_secs()) - .unwrap_or(0) + START_TIME.get().map(|t| t.elapsed().as_secs()).unwrap_or(0) } #[derive(Debug, Clone, Serialize)] @@ -147,11 +144,7 @@ pub async fn health_check(checks: actix_web::web::Data>) -> .iter() .filter_map(|c| c.as_ref()) .all(|c| c.status == "up"); - if all_up { - "healthy" - } else { - "degraded" - } + if all_up { "healthy" } else { "degraded" } } else { "healthy" }; diff --git a/telemetry/logs.rs b/telemetry/logs.rs index 931e7bd..1f284fa 100644 --- a/telemetry/logs.rs +++ b/telemetry/logs.rs @@ -2,12 +2,12 @@ use opentelemetry_appender_tracing::layer::OpenTelemetryTracingBridge; use opentelemetry_otlp::{LogExporter, Protocol, WithExportConfig}; -use opentelemetry_sdk::logs::SdkLoggerProvider; use opentelemetry_sdk::Resource; -use tracing_subscriber::fmt::format::FmtSpan; -use tracing_subscriber::layer::SubscriberExt; +use opentelemetry_sdk::logs::SdkLoggerProvider; use tracing_subscriber::EnvFilter; use tracing_subscriber::Registry; +use tracing_subscriber::fmt::format::FmtSpan; +use tracing_subscriber::layer::SubscriberExt; use super::config::{OtlpProtocol, TelemetryConfig}; use crate::ImksResult; @@ -35,7 +35,9 @@ pub fn init_subscriber( let (logger_provider, log_bridge_layer) = if config.logs_enabled { let exporter = build_log_exporter(config)?; - let resource = resource.cloned().unwrap_or_else(|| Resource::builder().build()); + let resource = resource + .cloned() + .unwrap_or_else(|| Resource::builder().build()); let provider = SdkLoggerProvider::builder() .with_resource(resource) @@ -72,9 +74,7 @@ pub fn init_subscriber( set_subscriber(subscriber); } (None, None) => { - let subscriber = Registry::default() - .with(env_filter) - .with(make_json_fmt()); + let subscriber = Registry::default().with(env_filter).with(make_json_fmt()); set_subscriber(subscriber); } } diff --git a/telemetry/metrics.rs b/telemetry/metrics.rs index 4be5147..271d5bd 100644 --- a/telemetry/metrics.rs +++ b/telemetry/metrics.rs @@ -2,11 +2,11 @@ use std::sync::OnceLock; +use opentelemetry::KeyValue; use opentelemetry::global; use opentelemetry::metrics::{Counter, Histogram, Meter, UpDownCounter}; -use opentelemetry::KeyValue; -use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::Resource; +use opentelemetry_sdk::metrics::SdkMeterProvider; use prometheus::{Encoder, Registry, TextEncoder}; use crate::ImksResult; @@ -47,7 +47,9 @@ pub fn init_metrics( let exporter = opentelemetry_prometheus::exporter() .with_registry(registry) .build() - .map_err(|e| crate::ImksError::Internal(format!("failed to build Prometheus exporter: {e}")))?; + .map_err(|e| { + crate::ImksError::Internal(format!("failed to build Prometheus exporter: {e}")) + })?; let provider = SdkMeterProvider::builder() .with_resource(resource.clone()) @@ -153,7 +155,9 @@ impl MetricsInstruments { /// /// Encodes the Prometheus text format from the shared registry. pub async fn metrics_handler() -> actix_web::HttpResponse { - let registry = PROMETHEUS_REGISTRY.get().expect("Prometheus registry not initialized"); + let registry = PROMETHEUS_REGISTRY + .get() + .expect("Prometheus registry not initialized"); let metric_families = registry.gather(); let encoder = TextEncoder::new(); diff --git a/telemetry/traces.rs b/telemetry/traces.rs index 78ea0e6..77bc6a9 100644 --- a/telemetry/traces.rs +++ b/telemetry/traces.rs @@ -2,9 +2,9 @@ use opentelemetry::trace::TracerProvider as _; use opentelemetry_otlp::{Protocol, SpanExporter, WithExportConfig}; +use opentelemetry_sdk::Resource; use opentelemetry_sdk::propagation::TraceContextPropagator; use opentelemetry_sdk::trace::{SdkTracerProvider, Tracer}; -use opentelemetry_sdk::Resource; use tracing_opentelemetry::OpenTelemetryLayer; use tracing_subscriber::Registry; @@ -24,9 +24,7 @@ fn build_span_exporter(config: &TelemetryConfig) -> ImksResult { .with_protocol(Protocol::HttpBinary) .with_endpoint(&config.otlp_endpoint) .build() - .map_err(|e| { - crate::ImksError::Internal(format!("OTLP HTTP span exporter: {e}")) - }), + .map_err(|e| crate::ImksError::Internal(format!("OTLP HTTP span exporter: {e}"))), } }