diff --git a/lib.rs b/lib.rs index 9c71af1..6ccaf17 100644 --- a/lib.rs +++ b/lib.rs @@ -12,3 +12,627 @@ pub mod svc; pub mod telemetry; pub use error::{ImksError, ImksResult}; + +use std::sync::{Arc, OnceLock}; +use std::time::Duration; + +use tokio::sync::RwLock; + +use crate::database::{Database, DatabaseConfig}; +use crate::engine::server::EngineConfig; +use crate::repo::MessageRepo; +use crate::rpc::{AppksClients, RpcConfig}; +use crate::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter}; +use crate::socket::message_bus::{NatsMessageBus, RedisMessageBus}; +use crate::socket::server::SocketServerBuilder; +use crate::svc::{DeployConfig, MessageService}; + +/// Combined configuration for an imks server. +/// +/// Use [`ImksConfig::from_env`] for microservice deployments, or construct +/// manually when embedding as a library. +#[derive(Debug, Clone)] +pub struct ImksConfig { + pub deploy: DeployConfig, + pub rpc: RpcConfig, + pub db: DatabaseConfig, + pub etcd_endpoints: Vec, + pub etcd_prefix: String, + pub http_addr: String, + pub grpc_health_addr: String, + pub engine: EngineConfig, +} + +impl ImksConfig { + /// Build config from environment variables with defaults. + pub fn from_env() -> Self { + Self { + deploy: DeployConfig::from_env(), + rpc: RpcConfig::from_env(), + db: DatabaseConfig::from_env(), + etcd_endpoints: std::env::var("ETCD_ENDPOINTS") + .unwrap_or_else(|_| "http://localhost:2379".to_string()) + .split(',') + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty()) + .collect(), + etcd_prefix: std::env::var("ETCD_KEY_PREFIX") + .unwrap_or_else(|_| "/appks/".to_string()), + http_addr: std::env::var("IMKS_HTTP_ADDR") + .unwrap_or_else(|_| "0.0.0.0:50048".to_string()), + grpc_health_addr: std::env::var("IMKS_GRPC_HEALTH_ADDR") + .unwrap_or_else(|_| "0.0.0.0:50047".to_string()), + engine: EngineConfig::default(), + } + } +} + +/// A ready-to-run imks Socket.IO + gRPC server. +pub struct ImksServer { + socket_server: Arc, + http_addr: String, + webtransport_enabled: bool, + webtransport_port: u16, + cert_path: String, + key_path: String, + telemetry_guard: Option, +} + +/// Builder for [`ImksServer`]. +/// +/// # Examples +/// +/// ```no_run +/// use imks::{ImksServer, ImksConfig}; +/// +/// # async fn example() -> Result<(), Box> { +/// let config = ImksConfig::from_env(); +/// let server = ImksServer::builder() +/// .config(config) +/// .build() +/// .await?; +/// server.serve().await?; +/// # Ok(()) +/// # } +/// ``` +pub struct ImksServerBuilder { + config: Option, +} + +impl ImksServer { + /// Create a new builder. + pub fn builder() -> ImksServerBuilder { + ImksServerBuilder::default() + } + + /// Start the HTTP/WebTransport server and block until shutdown. + pub async fn serve(self) -> Result<(), Box> { + let addr = &self.http_addr; + let engine = self.socket_server.engine.clone(); + let wt_port = self.webtransport_port; + let cert_path = self.cert_path.clone(); + let key_path = self.key_path.clone(); + let server = self.socket_server.clone(); + + if self.webtransport_enabled && !cert_path.is_empty() { + tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port); + + tokio::select! { + result = server.run_http(addr) => { + result?; + } + result = engine.run_webtransport(wt_port, &cert_path, &key_path) => { + result?; + } + } + } else { + tracing::info!("Socket.IO HTTP server listening on {}", addr); + server.run_http(addr).await?; + } + + // Graceful telemetry shutdown + if let Some(guard) = self.telemetry_guard { + guard.shutdown(); + } + + Ok(()) + } +} + +impl ImksServerBuilder { + /// Set the server configuration. + pub fn config(mut self, config: ImksConfig) -> Self { + self.config = Some(config); + self + } + + /// Build the server: connect to etcd, database, set up adapters, register event handlers. + pub async fn build(self) -> Result> { + dotenvy::dotenv().ok(); + + let config = self.config.unwrap_or_else(ImksConfig::from_env); + + // Initialize observability + let telemetry_guard = telemetry::init(); + telemetry::health::init_counters(); + + let deploy = &config.deploy; + + tracing::info!( + adapter = %deploy.adapter_mode, + server_id = %deploy.server_id, + wt_enabled = deploy.webtransport_enabled, + "Starting imks server" + ); + + // etcd: connect, register, discover appks + let etcd = crate::etcd::EtcdConfig::connect( + config.etcd_endpoints.clone(), + &config.etcd_prefix, + ) + .await + .unwrap_or_else(|e| { + tracing::error!(error = %e, "etcd required but unavailable"); + panic!("etcd required: {e}") + }); + + let registry = crate::etcd::ServiceRegistry::new(etcd.client(), &config.etcd_prefix); + let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; + registry.register("imks", &imks_addr).await.ok(); + + let appks_addr = etcd + .discover_service("appks") + .await + .ok() + .and_then(|addrs| addrs.into_iter().next()) + .map(|addr| { + let normalized = addr.replace("0.0.0.0", "127.0.0.1"); + format!("http://{}", normalized) + }) + .unwrap_or_else(|| config.rpc.appks_addr.clone()); + + tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); + + let current_appks_addr: Arc>> = Arc::new(RwLock::new(None)); + + // Start gRPC health server + { + let grpc_health_addr: std::net::SocketAddr = config + .grpc_health_addr + .parse() + .expect("Invalid IMKS_GRPC_HEALTH_ADDR"); + tokio::spawn(async move { + if let Err(e) = crate::rpc::health::start_health_server(grpc_health_addr).await { + tracing::error!(error = %e, "imks gRPC health server failed"); + } + }); + } + + let engine_config = config.engine.clone(); + let mut builder = SocketServerBuilder::new(engine_config); + let namespace_holder: Arc>> = + Arc::new(OnceLock::new()); + + // Configure adapter + match deploy.adapter_mode.as_str() { + "redis" => { + let redis_url = deploy + .redis_url() + .map_err(|e| format!("Invalid Redis configuration: {e}"))?; + tracing::info!( + cluster_enabled = deploy.redis_cluster_enabled, + "Configuring Redis adapter" + ); + let message_bus = Arc::new( + RedisMessageBus::new(&redis_url) + .await + .map_err(|e| format!("Failed to connect to Redis: {e}"))?, + ); + let redis_client = message_bus.client().clone(); + let server_id = deploy.server_id.clone(); + let adapter = Arc::new(RedisAdapter::new( + message_bus.clone() as Arc<_>, + redis_client, + server_id, + "/".into(), + make_local_broadcast_fn(namespace_holder.clone()), + )); + adapter + .init() + .await + .map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?; + builder = builder.adapter(adapter); + tracing::info!("Redis adapter configured for multi-node"); + } + "nats" => { + let message_bus = Arc::new( + NatsMessageBus::new(&deploy.nats_url) + .await + .map_err(|e| format!("Failed to connect to NATS: {e}"))?, + ); + let server_id = deploy.server_id.clone(); + let adapter = Arc::new(NatsAdapter::new( + message_bus.clone() as Arc<_>, + server_id, + "/".into(), + make_local_broadcast_fn(namespace_holder.clone()), + )); + adapter + .init() + .await + .map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?; + builder = builder.adapter(adapter); + tracing::info!("NATS adapter configured for multi-node"); + } + _ => { + tracing::info!("Local adapter (single-node mode)"); + } + } + + let socket_server = Arc::new(builder.build()); + let _ = namespace_holder.set(socket_server.namespaces.clone()); + + // Connect to database + let db = Database::connect(&config.db) + .await + .map_err(|e| format!("Database connection failed: {e}"))?; + crate::database::run_migrations(db.pool()) + .await + .map_err(|e| format!("Database migration failed: {e}"))?; + let repo = Arc::new(MessageRepo::new(db.pool().clone())); + tracing::info!("Database connected and migrated"); + + // Shared service handle + let service: Arc>>> = Arc::new(RwLock::new(None)); + + // Helper to try creating a MessageService + let try_connect_service = { + let service = service.clone(); + let repo = repo.clone(); + let namespaces = socket_server.namespaces.clone(); + let current_addr = current_appks_addr.clone(); + let rpc_config = config.rpc.clone(); + move |addr: String| { + let service = service.clone(); + let repo = repo.clone(); + let namespaces = namespaces.clone(); + let current_addr = current_addr.clone(); + let mut rpc = rpc_config.clone(); + let normalized = addr.replace("0.0.0.0", "127.0.0.1"); + rpc.appks_addr = if normalized.starts_with("http") { + normalized + } else { + format!("http://{}", normalized) + }; + async move { + let max_retries = 5; + let mut delay = Duration::from_millis(500); + + for attempt in 1..=max_retries { + match AppksClients::connect(&rpc).await { + Ok(clients) => match MessageService::new( + (*repo).clone(), + clients, + namespaces.clone(), + ) + .await + { + Ok(svc) => { + let svc = Arc::new(svc); + let mut guard = service.write().await; + *guard = Some(svc); + + let mut addr_guard = current_addr.write().await; + *addr_guard = Some(rpc.appks_addr.clone()); + + tracing::info!( + addr = %rpc.appks_addr, + attempt, + "Message service initialized" + ); + return true; + } + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + attempt, + error = %e, + "Failed to init message service" + ); + if attempt < max_retries { + tokio::time::sleep(delay).await; + delay *= 2; + } + } + }, + Err(e) => { + tracing::warn!( + addr = %rpc.appks_addr, + attempt, + error = %e, + "gRPC connect failed" + ); + if attempt < max_retries { + tokio::time::sleep(delay).await; + delay *= 2; + } + } + } + } + + tracing::error!( + addr = %rpc.appks_addr, + max_retries, + "All connection attempts to appks exhausted" + ); + false + } + } + }; + + // Try initial connection + let env_fallback = std::env::var("APPKS_GRPC_ADDR") + .unwrap_or_else(|_| "http://localhost:50051".to_string()); + if !try_connect_service(appks_addr.clone()).await { + if appks_addr != env_fallback { + tracing::info!( + etcd_addr = %appks_addr, + fallback = %env_fallback, + "etcd-discovered appks unreachable, trying env fallback" + ); + try_connect_service(env_fallback).await; + } else { + tracing::warn!( + %appks_addr, + "Initial connection to appks failed — will retry when etcd discovers new instances" + ); + } + } + + // Watch etcd for appks join/leave + { + let try_connect = try_connect_service.clone(); + let svc_clear = service.clone(); + let addr_clear = current_appks_addr.clone(); + etcd.start_service_watcher( + "appks", + move |addr| { + tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection"); + let try_connect = try_connect.clone(); + tokio::spawn(async move { + let ok = try_connect(addr).await; + tracing::info!(ok, "etcd watcher: reconnection result"); + }); + }, + move |_addr| { + let svc_clear = svc_clear.clone(); + let addr_clear = addr_clear.clone(); + tokio::spawn(async move { + let mut guard = svc_clear.write().await; + *guard = None; + let mut addr_guard = addr_clear.write().await; + *addr_guard = None; + tracing::warn!( + "etcd watcher: appks instance DOWN — running without permission checks" + ); + }); + }, + ); + } + + // Health check loop + { + let health_service = service.clone(); + let health_addr = current_appks_addr.clone(); + let health_reconnect = try_connect_service.clone(); + tokio::spawn(async move { + tracing::info!( + interval_secs = 5, + "Health check loop started — probing appks gRPC health every 5s" + ); + + let mut last_logged_ok = false; + let mut waiting_ticks = 0u64; + + loop { + tokio::time::sleep(Duration::from_secs(5)).await; + + let addr = health_addr.read().await.clone(); + let Some(ref addr) = addr else { + waiting_ticks += 1; + if !last_logged_ok && waiting_ticks % 12 == 1 { + tracing::info!( + waiting_secs = waiting_ticks * 5, + "No appks connection — health check paused, waiting for etcd discovery" + ); + } + continue; + }; + + waiting_ticks = 0; + + match crate::rpc::health::check_appks_health(addr).await { + Ok(true) => { + if !last_logged_ok { + tracing::info!(%addr, "appks health check OK"); + last_logged_ok = true; + } + } + Ok(false) | Err(_) => { + tracing::warn!(%addr, "appks health check FAILED — entering degraded mode"); + + let mut guard = health_service.write().await; + *guard = None; + + let mut addr_guard = health_addr.write().await; + *addr_guard = None; + + last_logged_ok = false; + + let reconnect = health_reconnect.clone(); + let reconnect_addr = addr.clone(); + tokio::spawn(async move { + if !reconnect(reconnect_addr).await { + tracing::warn!( + "Immediate reconnection after health failure failed, waiting for etcd discovery" + ); + } + }); + } + } + } + }); + } + + // Register connect handler + let namespace = socket_server.of("/"); + let svc_connect = service.clone(); + namespace + .on_connect(move |socket, auth_data| { + let svc = svc_connect.blocking_read(); + if let Some(ref svc) = *svc { + svc.authenticate_socket(socket, auth_data) + .map_err(|e| e.to_string())?; + } + + let m = telemetry::metrics::get(); + m.connections_active.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + m.connections_total.add( + 1, + &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), + ); + telemetry::health::connection_connected(); + + tracing::info!( + socket_sid = %socket.sid, + engine_sid = %socket.engine_sid, + namespace = %socket.namespace, + "Socket connected" + ); + Ok(()) + }) + .await; + + // Register Socket.IO event handlers + { + let svc = service.clone(); + macro_rules! register_event { + ($svc:expr, $ns:expr, $event:expr, $method:ident) => { + let s = $svc.clone(); + let event_name = $event.to_string(); + $ns.on_event($event, Arc::new(move |socket, data| { + let s = s.clone(); + let data = data.clone(); + let event = event_name.clone(); + tokio::spawn(async move { + let _span = tracing::info_span!( + "socket_event", + otel.name = format!("handle {event}"), + event = %event, + socket_sid = %socket.sid, + ); + let _ = _span.enter(); + + let svc_guard = s.read().await; + let Some(ref svc) = *svc_guard else { + tracing::warn!(event = %event, "No message service available, dropping event"); + return; + }; + let start = std::time::Instant::now(); + if let Err(e) = svc.$method(socket, &data).await { + tracing::error!(event = %event, error = %e, "Event handler failed"); + } + let elapsed = start.elapsed().as_secs_f64(); + telemetry::metrics::get().event_handling_duration.record( + elapsed, + &telemetry::MetricsInstruments::event_attrs(&event), + ); + }); + })).await; + }; + } + + register_event!(svc, namespace, "channel:join", join_channel); + register_event!(svc, namespace, "channel:leave", leave_channel); + register_event!(svc, namespace, "message:send", send_message); + register_event!(svc, namespace, "message:edit", edit_message); + register_event!(svc, namespace, "message:delete", delete_message); + register_event!(svc, namespace, "reaction:add", toggle_reaction); + register_event!(svc, namespace, "pin:add", pin_message); + register_event!(svc, namespace, "pin:remove", unpin_message); + register_event!(svc, namespace, "poll:vote", poll_vote); + register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote); + register_event!(svc, namespace, "typing:start", typing_start); + register_event!(svc, namespace, "typing:stop", typing_stop); + register_event!(svc, namespace, "presence:update", presence_update); + register_event!(svc, namespace, "draft:save", save_draft); + register_event!(svc, namespace, "draft:get", get_draft); + register_event!(svc, namespace, "draft:delete", delete_draft); + register_event!(svc, namespace, "read_state:mark", mark_read); + register_event!(svc, namespace, "read_state:get", get_read_state); + register_event!(svc, namespace, "notification:list", list_notifications); + register_event!(svc, namespace, "notification:mark_read", mark_notification_read); + register_event!(svc, namespace, "notification:mark_all_read", mark_all_notifications_read); + register_event!(svc, namespace, "bookmark:add", add_bookmark); + register_event!(svc, namespace, "bookmark:remove", remove_bookmark); + register_event!(svc, namespace, "bookmark:list", list_bookmarks); + register_event!(svc, namespace, "thread:create", create_thread); + register_event!(svc, namespace, "thread:resolve", resolve_thread); + register_event!(svc, namespace, "thread:join", join_thread); + register_event!(svc, namespace, "thread:leave", leave_thread); + register_event!(svc, namespace, "thread:list", list_threads); + register_event!(svc, namespace, "article:create", create_article); + register_event!(svc, namespace, "article:update", update_article); + register_event!(svc, namespace, "article:list", list_articles); + register_event!(svc, namespace, "article:delete", delete_article); + register_event!(svc, namespace, "component:interact", interact_component); + register_event!(svc, namespace, "component:update", update_component); + + tracing::info!("Registered Socket.IO event handlers"); + } + + // Start scheduled message dispatcher + if let Some(ref svc) = *service.read().await { + svc.clone().start_scheduled_dispatcher(); + } + + Ok(ImksServer { + socket_server, + http_addr: config.http_addr, + webtransport_enabled: deploy.webtransport_enabled, + webtransport_port: deploy.webtransport_port, + cert_path: deploy.cert_path.clone(), + key_path: deploy.key_path.clone(), + telemetry_guard: Some(telemetry_guard), + }) + } +} + +impl Default for ImksServerBuilder { + fn default() -> Self { + Self { config: None } + } +} + +// Re-export telemetry guard for the public API +pub use telemetry::TelemetryGuard; + +/// Create a local broadcast function for Redis/NATS adapters. +fn make_local_broadcast_fn( + namespaces: Arc>>, +) -> LocalBroadcastFn { + Arc::new(move |packet, opts| { + let Some(manager) = namespaces.get() else { + tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized"); + return; + }; + let Some(namespace) = manager.get_namespace(&packet.namespace) else { + tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast"); + return; + }; + namespace.emit_local_filtered(packet, opts); + }) +} diff --git a/main.rs b/main.rs index 22e2871..78c0b94 100644 --- a/main.rs +++ b/main.rs @@ -1,564 +1,14 @@ -use std::sync::{Arc, OnceLock}; -use std::time::Duration; - -use tokio::sync::RwLock; - -use imks::database::{Database, DatabaseConfig}; -use imks::engine::server::EngineConfig; -use imks::etcd::{EtcdConfig, ServiceRegistry}; -use imks::repo::MessageRepo; -use imks::rpc::{AppksClients, RpcConfig}; -use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter}; -use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus}; - -use imks::socket::server::SocketServerBuilder; -use imks::svc::{DeployConfig, MessageService}; -use imks::telemetry; +use imks::ImksServer; #[tokio::main] async fn main() -> Result<(), Box> { - dotenvy::dotenv().ok(); + let config = imks::ImksConfig::from_env(); - // Initialize observability stack (traces, metrics, logs, health) - let telemetry_guard = telemetry::init(); - telemetry::health::init_counters(); - - let deploy = DeployConfig::from_env(); - - // Read etcd bootstrap config from env (these MUST come from env, not etcd) - let etcd_endpoints: Vec = std::env::var("ETCD_ENDPOINTS") - .unwrap_or_else(|_| "http://localhost:2379".to_string()) - .split(',') - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - .collect(); - let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string()); - - tracing::info!( - adapter = %deploy.adapter_mode, - server_id = %deploy.server_id, - wt_enabled = deploy.webtransport_enabled, - "Starting imks server" - ); - - let addr = "0.0.0.0:50048"; - - // --- etcd: connect, register, discover appks --- - let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix) - .await - .unwrap_or_else(|e| { - tracing::error!(error = %e, "etcd required but unavailable"); - panic!("etcd required: {e}") - }); - - // Register this service so others can discover us - let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); - let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; - registry.register("imks", &imks_addr).await.ok(); - - // Discover appks from etcd (priority > env). - // etcd-registered addresses are bare "host:port" — prepend http:// for gRPC. - // Bind address 0.0.0.0 is not connectable; replace with localhost. - let appks_addr = etcd - .discover_service("appks") - .await - .ok() - .and_then(|addrs| addrs.into_iter().next()) - .map(|addr| { - let normalized = addr.replace("0.0.0.0", "127.0.0.1"); - format!("http://{}", normalized) - }) - .unwrap_or_else(|| { - std::env::var("APPKS_GRPC_ADDR") - .unwrap_or_else(|_| "http://localhost:50051".to_string()) - }); - tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); - - // Track the currently active appks address for health checks. - // Updated on successful connect, cleared on health failure. - let current_appks_addr: Arc>> = Arc::new(RwLock::new(None)); - - // Start imks's own gRPC health server early so Redis/NATS stalls do not hide it. - // Default: imks RPC health 50047, separate from HTTP (50048) and appks gRPC (50049). - { - let grpc_health_addr: std::net::SocketAddr = std::env::var("IMKS_GRPC_HEALTH_ADDR") - .unwrap_or_else(|_| "0.0.0.0:50047".to_string()) - .parse() - .expect("Invalid IMKS_GRPC_HEALTH_ADDR"); - tokio::spawn(async move { - if let Err(e) = imks::rpc::health::start_health_server(grpc_health_addr).await { - tracing::error!(error = %e, "imks gRPC health server failed"); - } - }); - } - - let engine_config = EngineConfig::default(); - let mut builder = SocketServerBuilder::new(engine_config); - let namespace_holder: Arc>> = - Arc::new(OnceLock::new()); - - // Pre-configure adapter for Redis/NATS mode. - match deploy.adapter_mode.as_str() { - "redis" => { - let redis_url = deploy - .redis_url() - .map_err(|e| format!("Invalid Redis configuration: {e}"))?; - tracing::info!( - cluster_enabled = deploy.redis_cluster_enabled, - "Configuring Redis adapter" - ); - let message_bus = Arc::new( - RedisMessageBus::new(&redis_url) - .await - .map_err(|e| format!("Failed to connect to Redis: {e}"))?, - ); - let redis_client = message_bus.client().clone(); - let server_id = deploy.server_id.clone(); - let adapter = Arc::new(RedisAdapter::new( - message_bus.clone() as Arc<_>, - redis_client, - server_id, - "/".into(), - make_local_broadcast_fn(namespace_holder.clone()), - )); - adapter - .init() - .await - .map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?; - builder = builder.adapter(adapter); - tracing::info!("Redis adapter configured for multi-node"); - } - "nats" => { - let message_bus = Arc::new( - NatsMessageBus::new(&deploy.nats_url) - .await - .map_err(|e| format!("Failed to connect to NATS: {e}"))?, - ); - let server_id = deploy.server_id.clone(); - let adapter = Arc::new(NatsAdapter::new( - message_bus.clone() as Arc<_>, - server_id, - "/".into(), - make_local_broadcast_fn(namespace_holder.clone()), - )); - adapter - .init() - .await - .map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?; - builder = builder.adapter(adapter); - tracing::info!("NATS adapter configured for multi-node"); - } - _ => { - tracing::info!("Local adapter (single-node mode)"); - } - }; - - let socket_server = Arc::new(builder.build()); - let _ = namespace_holder.set(socket_server.namespaces.clone()); - - // Connect to database (independent of appks gRPC availability) - let db_config = DatabaseConfig::from_env(); - let db = Database::connect(&db_config) - .await - .map_err(|e| format!("Database connection failed: {e}"))?; - imks::database::run_migrations(db.pool()) - .await - .map_err(|e| format!("Database migration failed: {e}"))?; - let repo = Arc::new(MessageRepo::new(db.pool().clone())); - tracing::info!("Database connected and migrated"); - - // Shared service handle — swapped atomically when appks comes/goes - let service: Arc>>> = Arc::new(RwLock::new(None)); - - // Helper to try creating a MessageService given an appks address. - // Returns true on success. - let try_connect_service = { - let service = service.clone(); - let repo = repo.clone(); - let namespaces = socket_server.namespaces.clone(); - let current_addr = current_appks_addr.clone(); - let rpc_config = RpcConfig { - appks_addr: appks_addr.clone(), - ..RpcConfig::from_env() - }; - move |addr: String| { - let service = service.clone(); - let repo = repo.clone(); - let namespaces = namespaces.clone(); - let current_addr = current_addr.clone(); - let mut rpc = rpc_config.clone(); - // etcd-registered address is bare "host:port" — prepend scheme for gRPC. - // Bind address 0.0.0.0 is not connectable; replace with localhost. - let normalized = addr.replace("0.0.0.0", "127.0.0.1"); - rpc.appks_addr = if normalized.starts_with("http") { - normalized - } else { - format!("http://{}", normalized) - }; - async move { - // Retry with backoff — appks may have registered in etcd - // before its gRPC server finished binding. - let max_retries = 5; - let mut delay = std::time::Duration::from_millis(500); - - for attempt in 1..=max_retries { - match AppksClients::connect(&rpc).await { - Ok(clients) => { - match MessageService::new((*repo).clone(), clients, namespaces.clone()) - .await - { - Ok(svc) => { - let svc = Arc::new(svc); - let mut guard = service.write().await; - *guard = Some(svc); - - // Update the active appks address for health checker - let mut addr_guard = current_addr.write().await; - *addr_guard = Some(rpc.appks_addr.clone()); - - tracing::info!( - addr = %rpc.appks_addr, - attempt, - "Message service initialized" - ); - return true; - } - Err(e) => { - tracing::warn!( - addr = %rpc.appks_addr, - attempt, - error = %e, - "Failed to init message service" - ); - if attempt < max_retries { - tokio::time::sleep(delay).await; - delay *= 2; - } - } - } - } - Err(e) => { - tracing::warn!( - addr = %rpc.appks_addr, - attempt, - error = %e, - "gRPC connect failed" - ); - if attempt < max_retries { - tokio::time::sleep(delay).await; - delay *= 2; - } - } - } - } - - tracing::error!( - addr = %rpc.appks_addr, - max_retries, - "All connection attempts to appks exhausted" - ); - false - } - } - }; - - // Try initial connection: etcd-discovered addr first, then env fallback - let env_fallback = - std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string()); - if !try_connect_service(appks_addr.clone()).await { - if appks_addr != env_fallback { - tracing::info!( - etcd_addr = %appks_addr, - fallback = %env_fallback, - "etcd-discovered appks unreachable, trying env fallback" - ); - try_connect_service(env_fallback).await; - } else { - tracing::warn!( - %appks_addr, - "Initial connection to appks failed — will retry when etcd discovers new instances" - ); - } - } - - // Watch etcd for appks join/leave — reconnect dynamically - { - let try_connect = try_connect_service.clone(); - let svc_clear = service.clone(); - let addr_clear = current_appks_addr.clone(); - etcd.start_service_watcher( - "appks", - move |addr| { - tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection"); - let try_connect = try_connect.clone(); - tokio::spawn(async move { - let ok = try_connect(addr).await; - tracing::info!(ok, "etcd watcher: reconnection result"); - }); - }, - move |_addr| { - let svc_clear = svc_clear.clone(); - let addr_clear = addr_clear.clone(); - tokio::spawn(async move { - let mut guard = svc_clear.write().await; - *guard = None; - // Also clear the tracked address so health checker stops probing - let mut addr_guard = addr_clear.write().await; - *addr_guard = None; - tracing::warn!( - "etcd watcher: appks instance DOWN — running without permission checks" - ); - }); - }, - ); - } - - // Health check loop: probe appks gRPC health every 5 seconds. - // If appks becomes unhealthy, clear the MessageService (degraded mode) - // and trigger an immediate reconnection attempt. - { - let health_service = service.clone(); - let health_addr = current_appks_addr.clone(); - let health_reconnect = try_connect_service.clone(); - tokio::spawn(async move { - tracing::info!( - interval_secs = 5, - "Health check loop started — probing appks gRPC health every 5s" - ); - - let mut last_logged_ok = false; - let mut waiting_ticks = 0u64; - - loop { - tokio::time::sleep(Duration::from_secs(5)).await; - - // Snapshot the current appks address - let addr = health_addr.read().await.clone(); - let Some(ref addr) = addr else { - // No appks connected yet - waiting_ticks += 1; - if !last_logged_ok && waiting_ticks % 12 == 1 { - // Log every ~60s while waiting (12 × 5s) - tracing::info!( - waiting_secs = waiting_ticks * 5, - "No appks connection — health check paused, waiting for etcd discovery" - ); - } - continue; - }; - - waiting_ticks = 0; - - match imks::rpc::health::check_appks_health(addr).await { - Ok(true) => { - if !last_logged_ok { - tracing::info!(%addr, "appks health check OK"); - last_logged_ok = true; - } - } - Ok(false) | Err(_) => { - tracing::warn!(%addr, "appks health check FAILED — entering degraded mode"); - - // Clear the MessageService so event handlers drop requests - let mut guard = health_service.write().await; - *guard = None; - - // Clear the tracked address so the next cycle skips - // health checks until a new connection is established - let mut addr_guard = health_addr.write().await; - *addr_guard = None; - - last_logged_ok = false; - - // Trigger an immediate reconnection attempt. - // If the same address recovers, try_connect will restore - // the service. If not, we wait for etcd discovery. - let reconnect = health_reconnect.clone(); - let reconnect_addr = addr.clone(); - tokio::spawn(async move { - if !reconnect(reconnect_addr).await { - tracing::warn!( - "Immediate reconnection after health failure failed, waiting for etcd discovery" - ); - } - }); - } - } - } - }); - } - - // Register connect handler - let namespace = socket_server.of("/"); - let svc_connect = service.clone(); - namespace - .on_connect(move |socket, auth_data| { - let svc = svc_connect.blocking_read(); - if let Some(ref svc) = *svc { - svc.authenticate_socket(socket, auth_data) - .map_err(|e| e.to_string())?; - } - - // Increment connection metrics - let m = telemetry::metrics::get(); - m.connections_active.add( - 1, - &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), - ); - m.connections_total.add( - 1, - &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), - ); - telemetry::health::connection_connected(); - - tracing::info!( - socket_sid = %socket.sid, - engine_sid = %socket.engine_sid, - namespace = %socket.namespace, - "Socket connected" - ); - Ok(()) - }) - .await; - - // Register Socket.IO event handlers (always register — each handler reads the latest service) - { - let svc = service.clone(); - macro_rules! register_event { - ($svc:expr, $ns:expr, $event:expr, $method:ident) => { - let s = $svc.clone(); - let event_name = $event.to_string(); - $ns.on_event($event, Arc::new(move |socket, data| { - let s = s.clone(); - let data = data.clone(); - let event = event_name.clone(); - tokio::spawn(async move { - let _span = tracing::info_span!( - "socket_event", - otel.name = format!("handle {event}"), - event = %event, - socket_sid = %socket.sid, - ); - let _enter = _span.enter(); - - let svc_guard = s.read().await; - let Some(ref svc) = *svc_guard else { - tracing::warn!(event = %event, "No message service available, dropping event"); - return; - }; - let start = std::time::Instant::now(); - if let Err(e) = svc.$method(socket, &data).await { - tracing::error!(event = %event, error = %e, "Event handler failed"); - } - let elapsed = start.elapsed().as_secs_f64(); - telemetry::metrics::get().event_handling_duration.record( - elapsed, - &telemetry::MetricsInstruments::event_attrs(&event), - ); - }); - })).await; - }; - } - - register_event!(svc, namespace, "channel:join", join_channel); - register_event!(svc, namespace, "channel:leave", leave_channel); - register_event!(svc, namespace, "message:send", send_message); - register_event!(svc, namespace, "message:edit", edit_message); - register_event!(svc, namespace, "message:delete", delete_message); - register_event!(svc, namespace, "reaction:add", toggle_reaction); - register_event!(svc, namespace, "pin:add", pin_message); - register_event!(svc, namespace, "pin:remove", unpin_message); - register_event!(svc, namespace, "poll:vote", poll_vote); - register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote); - register_event!(svc, namespace, "typing:start", typing_start); - register_event!(svc, namespace, "typing:stop", typing_stop); - register_event!(svc, namespace, "presence:update", presence_update); - register_event!(svc, namespace, "draft:save", save_draft); - register_event!(svc, namespace, "draft:get", get_draft); - register_event!(svc, namespace, "draft:delete", delete_draft); - register_event!(svc, namespace, "read_state:mark", mark_read); - register_event!(svc, namespace, "read_state:get", get_read_state); - register_event!(svc, namespace, "notification:list", list_notifications); - register_event!( - svc, - namespace, - "notification:mark_read", - mark_notification_read - ); - register_event!( - svc, - namespace, - "notification:mark_all_read", - mark_all_notifications_read - ); - register_event!(svc, namespace, "bookmark:add", add_bookmark); - register_event!(svc, namespace, "bookmark:remove", remove_bookmark); - register_event!(svc, namespace, "bookmark:list", list_bookmarks); - register_event!(svc, namespace, "thread:create", create_thread); - register_event!(svc, namespace, "thread:resolve", resolve_thread); - register_event!(svc, namespace, "thread:join", join_thread); - register_event!(svc, namespace, "thread:leave", leave_thread); - register_event!(svc, namespace, "thread:list", list_threads); - register_event!(svc, namespace, "article:create", create_article); - register_event!(svc, namespace, "article:update", update_article); - register_event!(svc, namespace, "article:list", list_articles); - register_event!(svc, namespace, "article:delete", delete_article); - register_event!(svc, namespace, "component:interact", interact_component); - register_event!(svc, namespace, "component:update", update_component); - - tracing::info!("Registered Socket.IO event handlers"); - } - - // Start scheduled message dispatcher (once) - if let Some(ref svc) = *service.read().await { - svc.clone().start_scheduled_dispatcher(); - } - - // Start servers - if deploy.webtransport_enabled && !deploy.cert_path.is_empty() { - let engine = socket_server.engine.clone(); - let wt_port = deploy.webtransport_port; - let cert_path = deploy.cert_path.clone(); - let key_path = deploy.key_path.clone(); - let server = socket_server.clone(); - - tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port); - - tokio::select! { - result = server.run_http(addr) => { - result?; - } - result = engine.run_webtransport(wt_port, &cert_path, &key_path) => { - result?; - } - } - } else { - tracing::info!("Socket.IO HTTP server listening on {}", addr); - socket_server.run_http(addr).await?; - } - - // Graceful telemetry shutdown - telemetry_guard.shutdown(); + let server = ImksServer::builder() + .config(config) + .build() + .await?; + server.serve().await?; Ok(()) } - -/// Create a local broadcast function for Redis/NATS adapters. -/// -/// The callback is used both for same-node delivery and for cross-node messages -/// received from the message bus. -fn make_local_broadcast_fn( - namespaces: Arc>>, -) -> LocalBroadcastFn { - Arc::new(move |packet, opts| { - let Some(manager) = namespaces.get() else { - tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized"); - return; - }; - let Some(namespace) = manager.get_namespace(&packet.namespace) else { - tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast"); - return; - }; - namespace.emit_local_filtered(packet, opts); - }) -}