use std::sync::{Arc, OnceLock}; use std::time::Duration; use tokio::sync::RwLock; use imks::database::{Database, DatabaseConfig}; use imks::engine::server::EngineConfig; use imks::etcd::{EtcdConfig, ServiceRegistry}; use imks::repo::MessageRepo; use imks::rpc::{AppksClients, RpcConfig}; use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter}; use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus}; use imks::socket::server::SocketServerBuilder; use imks::svc::{DeployConfig, MessageService}; use imks::telemetry; #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); // Initialize observability stack (traces, metrics, logs, health) let telemetry_guard = telemetry::init(); telemetry::health::init_counters(); let deploy = DeployConfig::from_env(); // Read etcd bootstrap config from env (these MUST come from env, not etcd) let etcd_endpoints: Vec = std::env::var("ETCD_ENDPOINTS") .unwrap_or_else(|_| "http://localhost:2379".to_string()) .split(',') .map(|s| s.trim().to_string()) .filter(|s| !s.is_empty()) .collect(); let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string()); tracing::info!( adapter = %deploy.adapter_mode, server_id = %deploy.server_id, wt_enabled = deploy.webtransport_enabled, "Starting imks server" ); let addr = "0.0.0.0:50048"; // --- etcd: connect, register, discover appks --- let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix) .await .unwrap_or_else(|e| { tracing::error!(error = %e, "etcd required but unavailable"); panic!("etcd required: {e}") }); // Register this service so others can discover us let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await; registry.register("imks", &imks_addr).await.ok(); // Discover appks from etcd (priority > env). // etcd-registered addresses are bare "host:port" — prepend http:// for gRPC. let appks_addr = etcd .discover_service("appks") .await .ok() .and_then(|addrs| addrs.into_iter().next()) .map(|addr| format!("http://{}", addr)) .unwrap_or_else(|| { std::env::var("APPKS_GRPC_ADDR") .unwrap_or_else(|_| "http://localhost:50051".to_string()) }); tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd"); // Track the currently active appks address for health checks. // Updated on successful connect, cleared on health failure. let current_appks_addr: Arc>> = Arc::new(RwLock::new(None)); // Start imks's own gRPC health server early so Redis/NATS stalls do not hide it. // Default: imks RPC health 50047, separate from HTTP (50048) and appks gRPC (50049). { let grpc_health_addr: std::net::SocketAddr = std::env::var("IMKS_GRPC_HEALTH_ADDR") .unwrap_or_else(|_| "0.0.0.0:50047".to_string()) .parse() .expect("Invalid IMKS_GRPC_HEALTH_ADDR"); tokio::spawn(async move { if let Err(e) = imks::rpc::health::start_health_server(grpc_health_addr).await { tracing::error!(error = %e, "imks gRPC health server failed"); } }); } let engine_config = EngineConfig::default(); let mut builder = SocketServerBuilder::new(engine_config); let namespace_holder: Arc>> = Arc::new(OnceLock::new()); // Pre-configure adapter for Redis/NATS mode. match deploy.adapter_mode.as_str() { "redis" => { let redis_url = deploy .redis_url() .map_err(|e| format!("Invalid Redis configuration: {e}"))?; tracing::info!( cluster_enabled = deploy.redis_cluster_enabled, "Configuring Redis adapter" ); let message_bus = Arc::new( RedisMessageBus::new(&redis_url) .await .map_err(|e| format!("Failed to connect to Redis: {e}"))?, ); let redis_client = message_bus.client().clone(); let server_id = deploy.server_id.clone(); let adapter = Arc::new(RedisAdapter::new( message_bus.clone() as Arc<_>, redis_client, server_id, "/".into(), make_local_broadcast_fn(namespace_holder.clone()), )); adapter .init() .await .map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?; builder = builder.adapter(adapter); tracing::info!("Redis adapter configured for multi-node"); } "nats" => { let message_bus = Arc::new( NatsMessageBus::new(&deploy.nats_url) .await .map_err(|e| format!("Failed to connect to NATS: {e}"))?, ); let server_id = deploy.server_id.clone(); let adapter = Arc::new(NatsAdapter::new( message_bus.clone() as Arc<_>, server_id, "/".into(), make_local_broadcast_fn(namespace_holder.clone()), )); adapter .init() .await .map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?; builder = builder.adapter(adapter); tracing::info!("NATS adapter configured for multi-node"); } _ => { tracing::info!("Local adapter (single-node mode)"); } }; let socket_server = Arc::new(builder.build()); let _ = namespace_holder.set(socket_server.namespaces.clone()); // Connect to database (independent of appks gRPC availability) let db_config = DatabaseConfig::from_env(); let db = Database::connect(&db_config) .await .map_err(|e| format!("Database connection failed: {e}"))?; imks::database::run_migrations(db.pool()) .await .map_err(|e| format!("Database migration failed: {e}"))?; let repo = Arc::new(MessageRepo::new(db.pool().clone())); tracing::info!("Database connected and migrated"); // Shared service handle — swapped atomically when appks comes/goes let service: Arc>>> = Arc::new(RwLock::new(None)); // Helper to try creating a MessageService given an appks address. // Returns true on success. let try_connect_service = { let service = service.clone(); let repo = repo.clone(); let namespaces = socket_server.namespaces.clone(); let current_addr = current_appks_addr.clone(); let rpc_config = RpcConfig { appks_addr: appks_addr.clone(), ..RpcConfig::from_env() }; move |addr: String| { let service = service.clone(); let repo = repo.clone(); let namespaces = namespaces.clone(); let current_addr = current_addr.clone(); let mut rpc = rpc_config.clone(); // etcd-registered address is bare "host:port" — prepend scheme for gRPC rpc.appks_addr = if addr.starts_with("http") { addr } else { format!("http://{}", addr) }; async move { match AppksClients::connect(&rpc).await { Ok(clients) => { match MessageService::new((*repo).clone(), clients, namespaces.clone()) .await { Ok(svc) => { let svc = Arc::new(svc); let mut guard = service.write().await; *guard = Some(svc); // Update the active appks address for health checker let mut addr_guard = current_addr.write().await; *addr_guard = Some(rpc.appks_addr.clone()); tracing::info!( addr = %rpc.appks_addr, "Message service initialized" ); true } Err(e) => { tracing::warn!( addr = %rpc.appks_addr, error = %e, "Failed to init message service" ); false } } } Err(e) => { tracing::warn!( addr = %rpc.appks_addr, error = %e, "gRPC connect failed" ); false } } } } }; // Try initial connection: etcd-discovered addr first, then env fallback let env_fallback = std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string()); if !try_connect_service(appks_addr.clone()).await { if appks_addr != env_fallback { tracing::info!( etcd_addr = %appks_addr, fallback = %env_fallback, "etcd-discovered appks unreachable, trying env fallback" ); try_connect_service(env_fallback).await; } else { tracing::warn!( %appks_addr, "Initial connection to appks failed — will retry when etcd discovers new instances" ); } } // Watch etcd for appks join/leave — reconnect dynamically { let try_connect = try_connect_service.clone(); let svc_clear = service.clone(); let addr_clear = current_appks_addr.clone(); etcd.start_service_watcher( "appks", move |addr| { tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection"); let try_connect = try_connect.clone(); tokio::spawn(async move { let ok = try_connect(addr).await; tracing::info!(ok, "etcd watcher: reconnection result"); }); }, move |_addr| { let svc_clear = svc_clear.clone(); let addr_clear = addr_clear.clone(); tokio::spawn(async move { let mut guard = svc_clear.write().await; *guard = None; // Also clear the tracked address so health checker stops probing let mut addr_guard = addr_clear.write().await; *addr_guard = None; tracing::warn!( "etcd watcher: appks instance DOWN — running without permission checks" ); }); }, ); } // Health check loop: probe appks gRPC health every 5 seconds. // If appks becomes unhealthy, clear the MessageService (degraded mode) // and trigger an immediate reconnection attempt. { let health_service = service.clone(); let health_addr = current_appks_addr.clone(); let health_reconnect = try_connect_service.clone(); tokio::spawn(async move { tracing::info!( interval_secs = 5, "Health check loop started — probing appks gRPC health every 5s" ); let mut last_logged_ok = false; let mut waiting_ticks = 0u64; loop { tokio::time::sleep(Duration::from_secs(5)).await; // Snapshot the current appks address let addr = health_addr.read().await.clone(); let Some(ref addr) = addr else { // No appks connected yet waiting_ticks += 1; if !last_logged_ok && waiting_ticks % 12 == 1 { // Log every ~60s while waiting (12 × 5s) tracing::info!( waiting_secs = waiting_ticks * 5, "No appks connection — health check paused, waiting for etcd discovery" ); } continue; }; waiting_ticks = 0; match imks::rpc::health::check_appks_health(addr).await { Ok(true) => { if !last_logged_ok { tracing::info!(%addr, "appks health check OK"); last_logged_ok = true; } } Ok(false) | Err(_) => { tracing::warn!(%addr, "appks health check FAILED — entering degraded mode"); // Clear the MessageService so event handlers drop requests let mut guard = health_service.write().await; *guard = None; // Clear the tracked address so the next cycle skips // health checks until a new connection is established let mut addr_guard = health_addr.write().await; *addr_guard = None; last_logged_ok = false; // Trigger an immediate reconnection attempt. // If the same address recovers, try_connect will restore // the service. If not, we wait for etcd discovery. let reconnect = health_reconnect.clone(); let reconnect_addr = addr.clone(); tokio::spawn(async move { if !reconnect(reconnect_addr).await { tracing::warn!( "Immediate reconnection after health failure failed, waiting for etcd discovery" ); } }); } } } }); } // Register connect handler let namespace = socket_server.of("/"); let svc_connect = service.clone(); namespace .on_connect(move |socket, auth_data| { let svc = svc_connect.blocking_read(); if let Some(ref svc) = *svc { svc.authenticate_socket(socket, auth_data) .map_err(|e| e.to_string())?; } // Increment connection metrics let m = telemetry::metrics::get(); m.connections_active.add( 1, &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), ); m.connections_total.add( 1, &telemetry::MetricsInstruments::namespace_attrs(&socket.namespace), ); telemetry::health::connection_connected(); tracing::info!( socket_sid = %socket.sid, engine_sid = %socket.engine_sid, namespace = %socket.namespace, "Socket connected" ); Ok(()) }) .await; // Register Socket.IO event handlers (always register — each handler reads the latest service) { let svc = service.clone(); macro_rules! register_event { ($svc:expr, $ns:expr, $event:expr, $method:ident) => { let s = $svc.clone(); let event_name = $event.to_string(); $ns.on_event($event, Arc::new(move |socket, data| { let s = s.clone(); let data = data.clone(); let event = event_name.clone(); tokio::spawn(async move { let _span = tracing::info_span!( "socket_event", otel.name = format!("handle {event}"), event = %event, socket_sid = %socket.sid, ); let _enter = _span.enter(); let svc_guard = s.read().await; let Some(ref svc) = *svc_guard else { tracing::warn!(event = %event, "No message service available, dropping event"); return; }; let start = std::time::Instant::now(); if let Err(e) = svc.$method(socket, &data).await { tracing::error!(event = %event, error = %e, "Event handler failed"); } let elapsed = start.elapsed().as_secs_f64(); telemetry::metrics::get().event_handling_duration.record( elapsed, &telemetry::MetricsInstruments::event_attrs(&event), ); }); })).await; }; } register_event!(svc, namespace, "channel:join", join_channel); register_event!(svc, namespace, "channel:leave", leave_channel); register_event!(svc, namespace, "message:send", send_message); register_event!(svc, namespace, "message:edit", edit_message); register_event!(svc, namespace, "message:delete", delete_message); register_event!(svc, namespace, "reaction:add", toggle_reaction); register_event!(svc, namespace, "pin:add", pin_message); register_event!(svc, namespace, "pin:remove", unpin_message); register_event!(svc, namespace, "poll:vote", poll_vote); register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote); register_event!(svc, namespace, "typing:start", typing_start); register_event!(svc, namespace, "typing:stop", typing_stop); register_event!(svc, namespace, "presence:update", presence_update); register_event!(svc, namespace, "draft:save", save_draft); register_event!(svc, namespace, "draft:get", get_draft); register_event!(svc, namespace, "draft:delete", delete_draft); register_event!(svc, namespace, "read_state:mark", mark_read); register_event!(svc, namespace, "read_state:get", get_read_state); register_event!(svc, namespace, "notification:list", list_notifications); register_event!( svc, namespace, "notification:mark_read", mark_notification_read ); register_event!( svc, namespace, "notification:mark_all_read", mark_all_notifications_read ); register_event!(svc, namespace, "bookmark:add", add_bookmark); register_event!(svc, namespace, "bookmark:remove", remove_bookmark); register_event!(svc, namespace, "bookmark:list", list_bookmarks); register_event!(svc, namespace, "thread:create", create_thread); register_event!(svc, namespace, "thread:resolve", resolve_thread); register_event!(svc, namespace, "thread:join", join_thread); register_event!(svc, namespace, "thread:leave", leave_thread); register_event!(svc, namespace, "thread:list", list_threads); register_event!(svc, namespace, "article:create", create_article); register_event!(svc, namespace, "article:update", update_article); register_event!(svc, namespace, "article:list", list_articles); register_event!(svc, namespace, "article:delete", delete_article); register_event!(svc, namespace, "component:interact", interact_component); register_event!(svc, namespace, "component:update", update_component); tracing::info!("Registered Socket.IO event handlers"); } // Start scheduled message dispatcher (once) if let Some(ref svc) = *service.read().await { svc.clone().start_scheduled_dispatcher(); } // Start servers if deploy.webtransport_enabled && !deploy.cert_path.is_empty() { let engine = socket_server.engine.clone(); let wt_port = deploy.webtransport_port; let cert_path = deploy.cert_path.clone(); let key_path = deploy.key_path.clone(); let server = socket_server.clone(); tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port); tokio::select! { result = server.run_http(addr) => { result?; } result = engine.run_webtransport(wt_port, &cert_path, &key_path) => { result?; } } } else { tracing::info!("Socket.IO HTTP server listening on {}", addr); socket_server.run_http(addr).await?; } // Graceful telemetry shutdown telemetry_guard.shutdown(); Ok(()) } /// Create a local broadcast function for Redis/NATS adapters. /// /// The callback is used both for same-node delivery and for cross-node messages /// received from the message bus. fn make_local_broadcast_fn( namespaces: Arc>>, ) -> LocalBroadcastFn { Arc::new(move |packet, opts| { let Some(manager) = namespaces.get() else { tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized"); return; }; let Some(namespace) = manager.get_namespace(&packet.namespace) else { tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast"); return; }; namespace.emit_local_filtered(packet, opts); }) }