Files
zhenyi 716f952bb6 feat(message): add comprehensive message system with database migrations and health checks
- Add database migrations for message base table with indexes for efficient querying
- Implement rich content support with attachment, embed, and poll tables
- Create social features including reactions, bookmarks, mentions, and read states
- Add thread management with participant tracking and resolution capabilities
- Include article posts with title, cover image, tags, and engagement metrics
- Support scheduled messages, stickers, forwards, and interactive components
- Fix UUID defaults and ensure proper uniqueness constraints for drafts
- Add gRPC health server for imks and health check client for appks connectivity
- Replace non-connectable 0.0.0.0 addresses with localhost in service discovery
- Normalize addresses during RPC configuration to handle bind address issues
2026-06-11 23:07:38 +08:00

565 lines
23 KiB
Rust
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::RwLock;
use imks::database::{Database, DatabaseConfig};
use imks::engine::server::EngineConfig;
use imks::etcd::{EtcdConfig, ServiceRegistry};
use imks::repo::MessageRepo;
use imks::rpc::{AppksClients, RpcConfig};
use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter};
use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus};
use imks::socket::server::SocketServerBuilder;
use imks::svc::{DeployConfig, MessageService};
use imks::telemetry;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
// Initialize observability stack (traces, metrics, logs, health)
let telemetry_guard = telemetry::init();
telemetry::health::init_counters();
let deploy = DeployConfig::from_env();
// Read etcd bootstrap config from env (these MUST come from env, not etcd)
let etcd_endpoints: Vec<String> = std::env::var("ETCD_ENDPOINTS")
.unwrap_or_else(|_| "http://localhost:2379".to_string())
.split(',')
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty())
.collect();
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string());
tracing::info!(
adapter = %deploy.adapter_mode,
server_id = %deploy.server_id,
wt_enabled = deploy.webtransport_enabled,
"Starting imks server"
);
let addr = "0.0.0.0:50048";
// --- etcd: connect, register, discover appks ---
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix)
.await
.unwrap_or_else(|e| {
tracing::error!(error = %e, "etcd required but unavailable");
panic!("etcd required: {e}")
});
// Register this service so others can discover us
let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix);
let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await;
registry.register("imks", &imks_addr).await.ok();
// Discover appks from etcd (priority > env).
// etcd-registered addresses are bare "host:port" — prepend http:// for gRPC.
// Bind address 0.0.0.0 is not connectable; replace with localhost.
let appks_addr = etcd
.discover_service("appks")
.await
.ok()
.and_then(|addrs| addrs.into_iter().next())
.map(|addr| {
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
format!("http://{}", normalized)
})
.unwrap_or_else(|| {
std::env::var("APPKS_GRPC_ADDR")
.unwrap_or_else(|_| "http://localhost:50051".to_string())
});
tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd");
// Track the currently active appks address for health checks.
// Updated on successful connect, cleared on health failure.
let current_appks_addr: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
// Start imks's own gRPC health server early so Redis/NATS stalls do not hide it.
// Default: imks RPC health 50047, separate from HTTP (50048) and appks gRPC (50049).
{
let grpc_health_addr: std::net::SocketAddr = std::env::var("IMKS_GRPC_HEALTH_ADDR")
.unwrap_or_else(|_| "0.0.0.0:50047".to_string())
.parse()
.expect("Invalid IMKS_GRPC_HEALTH_ADDR");
tokio::spawn(async move {
if let Err(e) = imks::rpc::health::start_health_server(grpc_health_addr).await {
tracing::error!(error = %e, "imks gRPC health server failed");
}
});
}
let engine_config = EngineConfig::default();
let mut builder = SocketServerBuilder::new(engine_config);
let namespace_holder: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>> =
Arc::new(OnceLock::new());
// Pre-configure adapter for Redis/NATS mode.
match deploy.adapter_mode.as_str() {
"redis" => {
let redis_url = deploy
.redis_url()
.map_err(|e| format!("Invalid Redis configuration: {e}"))?;
tracing::info!(
cluster_enabled = deploy.redis_cluster_enabled,
"Configuring Redis adapter"
);
let message_bus = Arc::new(
RedisMessageBus::new(&redis_url)
.await
.map_err(|e| format!("Failed to connect to Redis: {e}"))?,
);
let redis_client = message_bus.client().clone();
let server_id = deploy.server_id.clone();
let adapter = Arc::new(RedisAdapter::new(
message_bus.clone() as Arc<_>,
redis_client,
server_id,
"/".into(),
make_local_broadcast_fn(namespace_holder.clone()),
));
adapter
.init()
.await
.map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?;
builder = builder.adapter(adapter);
tracing::info!("Redis adapter configured for multi-node");
}
"nats" => {
let message_bus = Arc::new(
NatsMessageBus::new(&deploy.nats_url)
.await
.map_err(|e| format!("Failed to connect to NATS: {e}"))?,
);
let server_id = deploy.server_id.clone();
let adapter = Arc::new(NatsAdapter::new(
message_bus.clone() as Arc<_>,
server_id,
"/".into(),
make_local_broadcast_fn(namespace_holder.clone()),
));
adapter
.init()
.await
.map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?;
builder = builder.adapter(adapter);
tracing::info!("NATS adapter configured for multi-node");
}
_ => {
tracing::info!("Local adapter (single-node mode)");
}
};
let socket_server = Arc::new(builder.build());
let _ = namespace_holder.set(socket_server.namespaces.clone());
// Connect to database (independent of appks gRPC availability)
let db_config = DatabaseConfig::from_env();
let db = Database::connect(&db_config)
.await
.map_err(|e| format!("Database connection failed: {e}"))?;
imks::database::run_migrations(db.pool())
.await
.map_err(|e| format!("Database migration failed: {e}"))?;
let repo = Arc::new(MessageRepo::new(db.pool().clone()));
tracing::info!("Database connected and migrated");
// Shared service handle — swapped atomically when appks comes/goes
let service: Arc<RwLock<Option<Arc<MessageService>>>> = Arc::new(RwLock::new(None));
// Helper to try creating a MessageService given an appks address.
// Returns true on success.
let try_connect_service = {
let service = service.clone();
let repo = repo.clone();
let namespaces = socket_server.namespaces.clone();
let current_addr = current_appks_addr.clone();
let rpc_config = RpcConfig {
appks_addr: appks_addr.clone(),
..RpcConfig::from_env()
};
move |addr: String| {
let service = service.clone();
let repo = repo.clone();
let namespaces = namespaces.clone();
let current_addr = current_addr.clone();
let mut rpc = rpc_config.clone();
// etcd-registered address is bare "host:port" — prepend scheme for gRPC.
// Bind address 0.0.0.0 is not connectable; replace with localhost.
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
rpc.appks_addr = if normalized.starts_with("http") {
normalized
} else {
format!("http://{}", normalized)
};
async move {
// Retry with backoff — appks may have registered in etcd
// before its gRPC server finished binding.
let max_retries = 5;
let mut delay = std::time::Duration::from_millis(500);
for attempt in 1..=max_retries {
match AppksClients::connect(&rpc).await {
Ok(clients) => {
match MessageService::new((*repo).clone(), clients, namespaces.clone())
.await
{
Ok(svc) => {
let svc = Arc::new(svc);
let mut guard = service.write().await;
*guard = Some(svc);
// Update the active appks address for health checker
let mut addr_guard = current_addr.write().await;
*addr_guard = Some(rpc.appks_addr.clone());
tracing::info!(
addr = %rpc.appks_addr,
attempt,
"Message service initialized"
);
return true;
}
Err(e) => {
tracing::warn!(
addr = %rpc.appks_addr,
attempt,
error = %e,
"Failed to init message service"
);
if attempt < max_retries {
tokio::time::sleep(delay).await;
delay *= 2;
}
}
}
}
Err(e) => {
tracing::warn!(
addr = %rpc.appks_addr,
attempt,
error = %e,
"gRPC connect failed"
);
if attempt < max_retries {
tokio::time::sleep(delay).await;
delay *= 2;
}
}
}
}
tracing::error!(
addr = %rpc.appks_addr,
max_retries,
"All connection attempts to appks exhausted"
);
false
}
}
};
// Try initial connection: etcd-discovered addr first, then env fallback
let env_fallback =
std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string());
if !try_connect_service(appks_addr.clone()).await {
if appks_addr != env_fallback {
tracing::info!(
etcd_addr = %appks_addr,
fallback = %env_fallback,
"etcd-discovered appks unreachable, trying env fallback"
);
try_connect_service(env_fallback).await;
} else {
tracing::warn!(
%appks_addr,
"Initial connection to appks failed — will retry when etcd discovers new instances"
);
}
}
// Watch etcd for appks join/leave — reconnect dynamically
{
let try_connect = try_connect_service.clone();
let svc_clear = service.clone();
let addr_clear = current_appks_addr.clone();
etcd.start_service_watcher(
"appks",
move |addr| {
tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection");
let try_connect = try_connect.clone();
tokio::spawn(async move {
let ok = try_connect(addr).await;
tracing::info!(ok, "etcd watcher: reconnection result");
});
},
move |_addr| {
let svc_clear = svc_clear.clone();
let addr_clear = addr_clear.clone();
tokio::spawn(async move {
let mut guard = svc_clear.write().await;
*guard = None;
// Also clear the tracked address so health checker stops probing
let mut addr_guard = addr_clear.write().await;
*addr_guard = None;
tracing::warn!(
"etcd watcher: appks instance DOWN — running without permission checks"
);
});
},
);
}
// Health check loop: probe appks gRPC health every 5 seconds.
// If appks becomes unhealthy, clear the MessageService (degraded mode)
// and trigger an immediate reconnection attempt.
{
let health_service = service.clone();
let health_addr = current_appks_addr.clone();
let health_reconnect = try_connect_service.clone();
tokio::spawn(async move {
tracing::info!(
interval_secs = 5,
"Health check loop started — probing appks gRPC health every 5s"
);
let mut last_logged_ok = false;
let mut waiting_ticks = 0u64;
loop {
tokio::time::sleep(Duration::from_secs(5)).await;
// Snapshot the current appks address
let addr = health_addr.read().await.clone();
let Some(ref addr) = addr else {
// No appks connected yet
waiting_ticks += 1;
if !last_logged_ok && waiting_ticks % 12 == 1 {
// Log every ~60s while waiting (12 × 5s)
tracing::info!(
waiting_secs = waiting_ticks * 5,
"No appks connection — health check paused, waiting for etcd discovery"
);
}
continue;
};
waiting_ticks = 0;
match imks::rpc::health::check_appks_health(addr).await {
Ok(true) => {
if !last_logged_ok {
tracing::info!(%addr, "appks health check OK");
last_logged_ok = true;
}
}
Ok(false) | Err(_) => {
tracing::warn!(%addr, "appks health check FAILED — entering degraded mode");
// Clear the MessageService so event handlers drop requests
let mut guard = health_service.write().await;
*guard = None;
// Clear the tracked address so the next cycle skips
// health checks until a new connection is established
let mut addr_guard = health_addr.write().await;
*addr_guard = None;
last_logged_ok = false;
// Trigger an immediate reconnection attempt.
// If the same address recovers, try_connect will restore
// the service. If not, we wait for etcd discovery.
let reconnect = health_reconnect.clone();
let reconnect_addr = addr.clone();
tokio::spawn(async move {
if !reconnect(reconnect_addr).await {
tracing::warn!(
"Immediate reconnection after health failure failed, waiting for etcd discovery"
);
}
});
}
}
}
});
}
// Register connect handler
let namespace = socket_server.of("/");
let svc_connect = service.clone();
namespace
.on_connect(move |socket, auth_data| {
let svc = svc_connect.blocking_read();
if let Some(ref svc) = *svc {
svc.authenticate_socket(socket, auth_data)
.map_err(|e| e.to_string())?;
}
// Increment connection metrics
let m = telemetry::metrics::get();
m.connections_active.add(
1,
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
m.connections_total.add(
1,
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
telemetry::health::connection_connected();
tracing::info!(
socket_sid = %socket.sid,
engine_sid = %socket.engine_sid,
namespace = %socket.namespace,
"Socket connected"
);
Ok(())
})
.await;
// Register Socket.IO event handlers (always register — each handler reads the latest service)
{
let svc = service.clone();
macro_rules! register_event {
($svc:expr, $ns:expr, $event:expr, $method:ident) => {
let s = $svc.clone();
let event_name = $event.to_string();
$ns.on_event($event, Arc::new(move |socket, data| {
let s = s.clone();
let data = data.clone();
let event = event_name.clone();
tokio::spawn(async move {
let _span = tracing::info_span!(
"socket_event",
otel.name = format!("handle {event}"),
event = %event,
socket_sid = %socket.sid,
);
let _enter = _span.enter();
let svc_guard = s.read().await;
let Some(ref svc) = *svc_guard else {
tracing::warn!(event = %event, "No message service available, dropping event");
return;
};
let start = std::time::Instant::now();
if let Err(e) = svc.$method(socket, &data).await {
tracing::error!(event = %event, error = %e, "Event handler failed");
}
let elapsed = start.elapsed().as_secs_f64();
telemetry::metrics::get().event_handling_duration.record(
elapsed,
&telemetry::MetricsInstruments::event_attrs(&event),
);
});
})).await;
};
}
register_event!(svc, namespace, "channel:join", join_channel);
register_event!(svc, namespace, "channel:leave", leave_channel);
register_event!(svc, namespace, "message:send", send_message);
register_event!(svc, namespace, "message:edit", edit_message);
register_event!(svc, namespace, "message:delete", delete_message);
register_event!(svc, namespace, "reaction:add", toggle_reaction);
register_event!(svc, namespace, "pin:add", pin_message);
register_event!(svc, namespace, "pin:remove", unpin_message);
register_event!(svc, namespace, "poll:vote", poll_vote);
register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote);
register_event!(svc, namespace, "typing:start", typing_start);
register_event!(svc, namespace, "typing:stop", typing_stop);
register_event!(svc, namespace, "presence:update", presence_update);
register_event!(svc, namespace, "draft:save", save_draft);
register_event!(svc, namespace, "draft:get", get_draft);
register_event!(svc, namespace, "draft:delete", delete_draft);
register_event!(svc, namespace, "read_state:mark", mark_read);
register_event!(svc, namespace, "read_state:get", get_read_state);
register_event!(svc, namespace, "notification:list", list_notifications);
register_event!(
svc,
namespace,
"notification:mark_read",
mark_notification_read
);
register_event!(
svc,
namespace,
"notification:mark_all_read",
mark_all_notifications_read
);
register_event!(svc, namespace, "bookmark:add", add_bookmark);
register_event!(svc, namespace, "bookmark:remove", remove_bookmark);
register_event!(svc, namespace, "bookmark:list", list_bookmarks);
register_event!(svc, namespace, "thread:create", create_thread);
register_event!(svc, namespace, "thread:resolve", resolve_thread);
register_event!(svc, namespace, "thread:join", join_thread);
register_event!(svc, namespace, "thread:leave", leave_thread);
register_event!(svc, namespace, "thread:list", list_threads);
register_event!(svc, namespace, "article:create", create_article);
register_event!(svc, namespace, "article:update", update_article);
register_event!(svc, namespace, "article:list", list_articles);
register_event!(svc, namespace, "article:delete", delete_article);
register_event!(svc, namespace, "component:interact", interact_component);
register_event!(svc, namespace, "component:update", update_component);
tracing::info!("Registered Socket.IO event handlers");
}
// Start scheduled message dispatcher (once)
if let Some(ref svc) = *service.read().await {
svc.clone().start_scheduled_dispatcher();
}
// Start servers
if deploy.webtransport_enabled && !deploy.cert_path.is_empty() {
let engine = socket_server.engine.clone();
let wt_port = deploy.webtransport_port;
let cert_path = deploy.cert_path.clone();
let key_path = deploy.key_path.clone();
let server = socket_server.clone();
tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port);
tokio::select! {
result = server.run_http(addr) => {
result?;
}
result = engine.run_webtransport(wt_port, &cert_path, &key_path) => {
result?;
}
}
} else {
tracing::info!("Socket.IO HTTP server listening on {}", addr);
socket_server.run_http(addr).await?;
}
// Graceful telemetry shutdown
telemetry_guard.shutdown();
Ok(())
}
/// Create a local broadcast function for Redis/NATS adapters.
///
/// The callback is used both for same-node delivery and for cross-node messages
/// received from the message bus.
fn make_local_broadcast_fn(
namespaces: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>>,
) -> LocalBroadcastFn {
Arc::new(move |packet, opts| {
let Some(manager) = namespaces.get() else {
tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized");
return;
};
let Some(namespace) = manager.get_namespace(&packet.namespace) else {
tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast");
return;
};
namespace.emit_local_filtered(packet, opts);
})
}