Files
imks/main.rs
T
zhenyi e72866db8d feat(config): integrate etcd for service discovery and config management
- Add etcd-client dependency for distributed configuration storage
- Implement EtcdConfig with priority: etcd > environment variables > defaults
- Add ServiceRegistry for service registration with lease keep-alive
- Integrate etcd-based service discovery for appks gRPC connections
- Add service watcher for real-time service instance updates
- Migrate Redis configuration from single URL to cluster node list
- Update Dockerfile with default IMKS_HOST and IMKS_PORT environment variables
- Add etcd bootstrap configuration through environment variables
- Implement Redis cluster URL building with optional authentication
2026-06-11 16:22:23 +08:00

328 lines
14 KiB
Rust

use std::sync::{Arc, OnceLock};
use imks::database::{Database, DatabaseConfig};
use imks::engine::server::EngineConfig;
use imks::etcd::{EtcdConfig, ServiceRegistry};
use imks::repo::MessageRepo;
use imks::rpc::{AppksClients, RpcConfig};
use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter};
use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus};
use imks::socket::server::SocketServerBuilder;
use imks::svc::{DeployConfig, MessageService};
use imks::telemetry;
fn main() -> Result<(), Box<dyn std::error::Error>> {
// Initialize observability stack (traces, metrics, logs, health)
let telemetry_guard = telemetry::init();
telemetry::health::init_counters();
let deploy = DeployConfig::from_env();
// Read etcd bootstrap config from env (these MUST come from env, not etcd)
let etcd_endpoints: Vec<String> = std::env::var("ETCD_ENDPOINTS")
.unwrap_or_else(|_| "http://localhost:2379".to_string())
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX")
.unwrap_or_else(|_| "/appks/".to_string());
tracing::info!(
adapter = %deploy.adapter_mode,
server_id = %deploy.server_id,
wt_enabled = deploy.webtransport_enabled,
"Starting imks server"
);
let addr = "0.0.0.0:50048";
let rt = tokio::runtime::Runtime::new()?;
rt.block_on(async {
// --- etcd: connect, register, discover appks ---
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await
.unwrap_or_else(|e| {
tracing::error!(error = %e, "etcd required but unavailable");
panic!("etcd required: {e}")
});
// Register this service so others can discover us
let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix);
let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await;
registry.register("imks", &imks_addr).await.ok();
// Discover appks from etcd (priority > env)
let appks_addr = etcd.discover_service("appks").await
.ok()
.and_then(|addrs| addrs.into_iter().next())
.unwrap_or_else(|| {
std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string())
});
tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd");
etcd.start_service_watcher("appks");
let engine_config = EngineConfig::default();
let mut builder = SocketServerBuilder::new(engine_config);
let namespace_holder: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>> =
Arc::new(OnceLock::new());
// Pre-configure adapter for Redis/NATS mode.
match deploy.adapter_mode.as_str() {
"redis" => {
let cluster_url = deploy.redis_cluster_url();
let message_bus = Arc::new(
RedisMessageBus::new(&cluster_url)
.await
.map_err(|e| format!("Failed to connect to Redis cluster: {e}"))?,
);
let redis_client = message_bus.client().clone();
let server_id = deploy.server_id.clone();
let adapter = Arc::new(RedisAdapter::new(
message_bus.clone() as Arc<_>,
redis_client,
server_id,
"/".into(),
make_local_broadcast_fn(namespace_holder.clone()),
));
adapter
.init()
.await
.map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?;
builder = builder.adapter(adapter);
tracing::info!("Redis adapter configured for multi-node");
}
"nats" => {
let message_bus = Arc::new(
NatsMessageBus::new(&deploy.nats_url)
.await
.map_err(|e| format!("Failed to connect to NATS: {e}"))?,
);
let server_id = deploy.server_id.clone();
let adapter = Arc::new(NatsAdapter::new(
message_bus.clone() as Arc<_>,
server_id,
"/".into(),
make_local_broadcast_fn(namespace_holder.clone()),
));
adapter
.init()
.await
.map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?;
builder = builder.adapter(adapter);
tracing::info!("NATS adapter configured for multi-node");
}
_ => {
tracing::info!("Local adapter (single-node mode)");
}
};
let socket_server = Arc::new(builder.build());
let _ = namespace_holder.set(socket_server.namespaces.clone());
// Initialize database + gRPC + service
let service: Option<Arc<MessageService>> = {
let rpc_config = RpcConfig {
appks_addr: appks_addr.clone(),
..RpcConfig::from_env()
};
let db_config = DatabaseConfig::from_env();
match AppksClients::connect(&rpc_config).await {
Ok(clients) => {
let db = Database::connect(&db_config)
.await
.map_err(|e| format!("Database connection failed: {e}"))?;
imks::database::run_migrations(db.pool())
.await
.map_err(|e| format!("Database migration failed: {e}"))?;
let repo = MessageRepo::new(db.pool().clone());
let svc = MessageService::new(repo, clients, socket_server.namespaces.clone())
.await
.map_err(|e| format!("Failed to initialize message service: {e}"))?;
tracing::info!("Message service initialized with gRPC permission checks");
Some(Arc::new(svc))
}
Err(e) => {
tracing::warn!("gRPC unavailable: {e}. Running without permission checks.");
None
}
}
};
// Register connect handler
let namespace = socket_server.of("/");
let svc_connect = service.clone();
namespace
.on_connect(move |socket, auth_data| {
if let Some(ref svc) = svc_connect {
svc.authenticate_socket(socket, auth_data)
.map_err(|e| e.to_string())?;
}
// Increment connection metrics
let m = telemetry::metrics::get();
m.connections_active.add(
1,
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
m.connections_total.add(
1,
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
telemetry::health::connection_connected();
tracing::info!(
socket_sid = %socket.sid,
engine_sid = %socket.engine_sid,
namespace = %socket.namespace,
"Socket connected"
);
Ok(())
})
.await;
// Register Socket.IO event handlers
if let Some(ref svc) = service {
macro_rules! register_event {
($svc:expr, $ns:expr, $event:expr, $method:ident) => {
let s = $svc.clone();
let event_name = $event.to_string();
$ns.on_event($event, Arc::new(move |socket, data| {
let s = s.clone();
let data = data.clone();
let event = event_name.clone();
tokio::spawn(async move {
let _span = tracing::info_span!(
"socket_event",
otel.name = format!("handle {event}"),
event = %event,
socket_sid = %socket.sid,
);
let _enter = _span.enter();
let start = std::time::Instant::now();
if let Err(e) = s.$method(socket, &data).await {
tracing::error!(event = %event, error = %e, "Event handler failed");
}
let elapsed = start.elapsed().as_secs_f64();
telemetry::metrics::get().event_handling_duration.record(
elapsed,
&telemetry::MetricsInstruments::event_attrs(&event),
);
});
})).await;
};
}
register_event!(svc, namespace, "channel:join", join_channel);
register_event!(svc, namespace, "channel:leave", leave_channel);
register_event!(svc, namespace, "message:send", send_message);
register_event!(svc, namespace, "message:edit", edit_message);
register_event!(svc, namespace, "message:delete", delete_message);
register_event!(svc, namespace, "reaction:add", toggle_reaction);
register_event!(svc, namespace, "pin:add", pin_message);
register_event!(svc, namespace, "pin:remove", unpin_message);
register_event!(svc, namespace, "poll:vote", poll_vote);
register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote);
register_event!(svc, namespace, "typing:start", typing_start);
register_event!(svc, namespace, "typing:stop", typing_stop);
register_event!(svc, namespace, "presence:update", presence_update);
register_event!(svc, namespace, "draft:save", save_draft);
register_event!(svc, namespace, "draft:get", get_draft);
register_event!(svc, namespace, "draft:delete", delete_draft);
register_event!(svc, namespace, "read_state:mark", mark_read);
register_event!(svc, namespace, "read_state:get", get_read_state);
register_event!(svc, namespace, "notification:list", list_notifications);
register_event!(
svc,
namespace,
"notification:mark_read",
mark_notification_read
);
register_event!(
svc,
namespace,
"notification:mark_all_read",
mark_all_notifications_read
);
register_event!(svc, namespace, "bookmark:add", add_bookmark);
register_event!(svc, namespace, "bookmark:remove", remove_bookmark);
register_event!(svc, namespace, "bookmark:list", list_bookmarks);
register_event!(svc, namespace, "thread:create", create_thread);
register_event!(svc, namespace, "thread:resolve", resolve_thread);
register_event!(svc, namespace, "thread:join", join_thread);
register_event!(svc, namespace, "thread:leave", leave_thread);
register_event!(svc, namespace, "thread:list", list_threads);
register_event!(svc, namespace, "article:create", create_article);
register_event!(svc, namespace, "article:update", update_article);
register_event!(svc, namespace, "article:list", list_articles);
register_event!(svc, namespace, "article:delete", delete_article);
register_event!(svc, namespace, "component:interact", interact_component);
register_event!(svc, namespace, "component:update", update_component);
// Start scheduled message dispatcher (background task)
svc.clone().start_scheduled_dispatcher();
tracing::info!("Registered Socket.IO event handlers with observability instrumentation");
}
// Start servers
if deploy.webtransport_enabled && !deploy.cert_path.is_empty() {
let engine = socket_server.engine.clone();
let wt_port = deploy.webtransport_port;
let cert_path = deploy.cert_path.clone();
let key_path = deploy.key_path.clone();
let server = socket_server.clone();
tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port);
tokio::select! {
result = server.run_http(addr) => {
result?;
}
result = engine.run_webtransport(wt_port, &cert_path, &key_path) => {
result?;
}
}
} else {
tracing::info!("Socket.IO HTTP server listening on {}", addr);
socket_server.run_http(addr).await?;
}
Ok::<(), Box<dyn std::error::Error>>(())
})?;
// Graceful telemetry shutdown
telemetry_guard.shutdown();
Ok(())
}
/// Create a local broadcast function for Redis/NATS adapters.
///
/// The callback is used both for same-node delivery and for cross-node messages
/// received from the message bus.
fn make_local_broadcast_fn(
namespaces: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>>,
) -> LocalBroadcastFn {
Arc::new(move |packet, opts| {
let Some(manager) = namespaces.get() else {
tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized");
return;
};
let Some(namespace) = manager.get_namespace(&packet.namespace) else {
tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast");
return;
};
namespace.emit_local_filtered(packet, opts);
})
}