716f952bb6
- Add database migrations for message base table with indexes for efficient querying - Implement rich content support with attachment, embed, and poll tables - Create social features including reactions, bookmarks, mentions, and read states - Add thread management with participant tracking and resolution capabilities - Include article posts with title, cover image, tags, and engagement metrics - Support scheduled messages, stickers, forwards, and interactive components - Fix UUID defaults and ensure proper uniqueness constraints for drafts - Add gRPC health server for imks and health check client for appks connectivity - Replace non-connectable 0.0.0.0 addresses with localhost in service discovery - Normalize addresses during RPC configuration to handle bind address issues
565 lines
23 KiB
Rust
565 lines
23 KiB
Rust
use std::sync::{Arc, OnceLock};
|
||
use std::time::Duration;
|
||
|
||
use tokio::sync::RwLock;
|
||
|
||
use imks::database::{Database, DatabaseConfig};
|
||
use imks::engine::server::EngineConfig;
|
||
use imks::etcd::{EtcdConfig, ServiceRegistry};
|
||
use imks::repo::MessageRepo;
|
||
use imks::rpc::{AppksClients, RpcConfig};
|
||
use imks::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter};
|
||
use imks::socket::message_bus::{NatsMessageBus, RedisMessageBus};
|
||
|
||
use imks::socket::server::SocketServerBuilder;
|
||
use imks::svc::{DeployConfig, MessageService};
|
||
use imks::telemetry;
|
||
|
||
#[tokio::main]
|
||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||
dotenvy::dotenv().ok();
|
||
|
||
// Initialize observability stack (traces, metrics, logs, health)
|
||
let telemetry_guard = telemetry::init();
|
||
telemetry::health::init_counters();
|
||
|
||
let deploy = DeployConfig::from_env();
|
||
|
||
// Read etcd bootstrap config from env (these MUST come from env, not etcd)
|
||
let etcd_endpoints: Vec<String> = std::env::var("ETCD_ENDPOINTS")
|
||
.unwrap_or_else(|_| "http://localhost:2379".to_string())
|
||
.split(',')
|
||
.map(|s| s.trim().to_string())
|
||
.filter(|s| !s.is_empty())
|
||
.collect();
|
||
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string());
|
||
|
||
tracing::info!(
|
||
adapter = %deploy.adapter_mode,
|
||
server_id = %deploy.server_id,
|
||
wt_enabled = deploy.webtransport_enabled,
|
||
"Starting imks server"
|
||
);
|
||
|
||
let addr = "0.0.0.0:50048";
|
||
|
||
// --- etcd: connect, register, discover appks ---
|
||
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix)
|
||
.await
|
||
.unwrap_or_else(|e| {
|
||
tracing::error!(error = %e, "etcd required but unavailable");
|
||
panic!("etcd required: {e}")
|
||
});
|
||
|
||
// Register this service so others can discover us
|
||
let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix);
|
||
let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await;
|
||
registry.register("imks", &imks_addr).await.ok();
|
||
|
||
// Discover appks from etcd (priority > env).
|
||
// etcd-registered addresses are bare "host:port" — prepend http:// for gRPC.
|
||
// Bind address 0.0.0.0 is not connectable; replace with localhost.
|
||
let appks_addr = etcd
|
||
.discover_service("appks")
|
||
.await
|
||
.ok()
|
||
.and_then(|addrs| addrs.into_iter().next())
|
||
.map(|addr| {
|
||
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
|
||
format!("http://{}", normalized)
|
||
})
|
||
.unwrap_or_else(|| {
|
||
std::env::var("APPKS_GRPC_ADDR")
|
||
.unwrap_or_else(|_| "http://localhost:50051".to_string())
|
||
});
|
||
tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd");
|
||
|
||
// Track the currently active appks address for health checks.
|
||
// Updated on successful connect, cleared on health failure.
|
||
let current_appks_addr: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
|
||
|
||
// Start imks's own gRPC health server early so Redis/NATS stalls do not hide it.
|
||
// Default: imks RPC health 50047, separate from HTTP (50048) and appks gRPC (50049).
|
||
{
|
||
let grpc_health_addr: std::net::SocketAddr = std::env::var("IMKS_GRPC_HEALTH_ADDR")
|
||
.unwrap_or_else(|_| "0.0.0.0:50047".to_string())
|
||
.parse()
|
||
.expect("Invalid IMKS_GRPC_HEALTH_ADDR");
|
||
tokio::spawn(async move {
|
||
if let Err(e) = imks::rpc::health::start_health_server(grpc_health_addr).await {
|
||
tracing::error!(error = %e, "imks gRPC health server failed");
|
||
}
|
||
});
|
||
}
|
||
|
||
let engine_config = EngineConfig::default();
|
||
let mut builder = SocketServerBuilder::new(engine_config);
|
||
let namespace_holder: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>> =
|
||
Arc::new(OnceLock::new());
|
||
|
||
// Pre-configure adapter for Redis/NATS mode.
|
||
match deploy.adapter_mode.as_str() {
|
||
"redis" => {
|
||
let redis_url = deploy
|
||
.redis_url()
|
||
.map_err(|e| format!("Invalid Redis configuration: {e}"))?;
|
||
tracing::info!(
|
||
cluster_enabled = deploy.redis_cluster_enabled,
|
||
"Configuring Redis adapter"
|
||
);
|
||
let message_bus = Arc::new(
|
||
RedisMessageBus::new(&redis_url)
|
||
.await
|
||
.map_err(|e| format!("Failed to connect to Redis: {e}"))?,
|
||
);
|
||
let redis_client = message_bus.client().clone();
|
||
let server_id = deploy.server_id.clone();
|
||
let adapter = Arc::new(RedisAdapter::new(
|
||
message_bus.clone() as Arc<_>,
|
||
redis_client,
|
||
server_id,
|
||
"/".into(),
|
||
make_local_broadcast_fn(namespace_holder.clone()),
|
||
));
|
||
adapter
|
||
.init()
|
||
.await
|
||
.map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?;
|
||
builder = builder.adapter(adapter);
|
||
tracing::info!("Redis adapter configured for multi-node");
|
||
}
|
||
"nats" => {
|
||
let message_bus = Arc::new(
|
||
NatsMessageBus::new(&deploy.nats_url)
|
||
.await
|
||
.map_err(|e| format!("Failed to connect to NATS: {e}"))?,
|
||
);
|
||
let server_id = deploy.server_id.clone();
|
||
let adapter = Arc::new(NatsAdapter::new(
|
||
message_bus.clone() as Arc<_>,
|
||
server_id,
|
||
"/".into(),
|
||
make_local_broadcast_fn(namespace_holder.clone()),
|
||
));
|
||
adapter
|
||
.init()
|
||
.await
|
||
.map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?;
|
||
builder = builder.adapter(adapter);
|
||
tracing::info!("NATS adapter configured for multi-node");
|
||
}
|
||
_ => {
|
||
tracing::info!("Local adapter (single-node mode)");
|
||
}
|
||
};
|
||
|
||
let socket_server = Arc::new(builder.build());
|
||
let _ = namespace_holder.set(socket_server.namespaces.clone());
|
||
|
||
// Connect to database (independent of appks gRPC availability)
|
||
let db_config = DatabaseConfig::from_env();
|
||
let db = Database::connect(&db_config)
|
||
.await
|
||
.map_err(|e| format!("Database connection failed: {e}"))?;
|
||
imks::database::run_migrations(db.pool())
|
||
.await
|
||
.map_err(|e| format!("Database migration failed: {e}"))?;
|
||
let repo = Arc::new(MessageRepo::new(db.pool().clone()));
|
||
tracing::info!("Database connected and migrated");
|
||
|
||
// Shared service handle — swapped atomically when appks comes/goes
|
||
let service: Arc<RwLock<Option<Arc<MessageService>>>> = Arc::new(RwLock::new(None));
|
||
|
||
// Helper to try creating a MessageService given an appks address.
|
||
// Returns true on success.
|
||
let try_connect_service = {
|
||
let service = service.clone();
|
||
let repo = repo.clone();
|
||
let namespaces = socket_server.namespaces.clone();
|
||
let current_addr = current_appks_addr.clone();
|
||
let rpc_config = RpcConfig {
|
||
appks_addr: appks_addr.clone(),
|
||
..RpcConfig::from_env()
|
||
};
|
||
move |addr: String| {
|
||
let service = service.clone();
|
||
let repo = repo.clone();
|
||
let namespaces = namespaces.clone();
|
||
let current_addr = current_addr.clone();
|
||
let mut rpc = rpc_config.clone();
|
||
// etcd-registered address is bare "host:port" — prepend scheme for gRPC.
|
||
// Bind address 0.0.0.0 is not connectable; replace with localhost.
|
||
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
|
||
rpc.appks_addr = if normalized.starts_with("http") {
|
||
normalized
|
||
} else {
|
||
format!("http://{}", normalized)
|
||
};
|
||
async move {
|
||
// Retry with backoff — appks may have registered in etcd
|
||
// before its gRPC server finished binding.
|
||
let max_retries = 5;
|
||
let mut delay = std::time::Duration::from_millis(500);
|
||
|
||
for attempt in 1..=max_retries {
|
||
match AppksClients::connect(&rpc).await {
|
||
Ok(clients) => {
|
||
match MessageService::new((*repo).clone(), clients, namespaces.clone())
|
||
.await
|
||
{
|
||
Ok(svc) => {
|
||
let svc = Arc::new(svc);
|
||
let mut guard = service.write().await;
|
||
*guard = Some(svc);
|
||
|
||
// Update the active appks address for health checker
|
||
let mut addr_guard = current_addr.write().await;
|
||
*addr_guard = Some(rpc.appks_addr.clone());
|
||
|
||
tracing::info!(
|
||
addr = %rpc.appks_addr,
|
||
attempt,
|
||
"Message service initialized"
|
||
);
|
||
return true;
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(
|
||
addr = %rpc.appks_addr,
|
||
attempt,
|
||
error = %e,
|
||
"Failed to init message service"
|
||
);
|
||
if attempt < max_retries {
|
||
tokio::time::sleep(delay).await;
|
||
delay *= 2;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
Err(e) => {
|
||
tracing::warn!(
|
||
addr = %rpc.appks_addr,
|
||
attempt,
|
||
error = %e,
|
||
"gRPC connect failed"
|
||
);
|
||
if attempt < max_retries {
|
||
tokio::time::sleep(delay).await;
|
||
delay *= 2;
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
tracing::error!(
|
||
addr = %rpc.appks_addr,
|
||
max_retries,
|
||
"All connection attempts to appks exhausted"
|
||
);
|
||
false
|
||
}
|
||
}
|
||
};
|
||
|
||
// Try initial connection: etcd-discovered addr first, then env fallback
|
||
let env_fallback =
|
||
std::env::var("APPKS_GRPC_ADDR").unwrap_or_else(|_| "http://localhost:50051".to_string());
|
||
if !try_connect_service(appks_addr.clone()).await {
|
||
if appks_addr != env_fallback {
|
||
tracing::info!(
|
||
etcd_addr = %appks_addr,
|
||
fallback = %env_fallback,
|
||
"etcd-discovered appks unreachable, trying env fallback"
|
||
);
|
||
try_connect_service(env_fallback).await;
|
||
} else {
|
||
tracing::warn!(
|
||
%appks_addr,
|
||
"Initial connection to appks failed — will retry when etcd discovers new instances"
|
||
);
|
||
}
|
||
}
|
||
|
||
// Watch etcd for appks join/leave — reconnect dynamically
|
||
{
|
||
let try_connect = try_connect_service.clone();
|
||
let svc_clear = service.clone();
|
||
let addr_clear = current_appks_addr.clone();
|
||
etcd.start_service_watcher(
|
||
"appks",
|
||
move |addr| {
|
||
tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection");
|
||
let try_connect = try_connect.clone();
|
||
tokio::spawn(async move {
|
||
let ok = try_connect(addr).await;
|
||
tracing::info!(ok, "etcd watcher: reconnection result");
|
||
});
|
||
},
|
||
move |_addr| {
|
||
let svc_clear = svc_clear.clone();
|
||
let addr_clear = addr_clear.clone();
|
||
tokio::spawn(async move {
|
||
let mut guard = svc_clear.write().await;
|
||
*guard = None;
|
||
// Also clear the tracked address so health checker stops probing
|
||
let mut addr_guard = addr_clear.write().await;
|
||
*addr_guard = None;
|
||
tracing::warn!(
|
||
"etcd watcher: appks instance DOWN — running without permission checks"
|
||
);
|
||
});
|
||
},
|
||
);
|
||
}
|
||
|
||
// Health check loop: probe appks gRPC health every 5 seconds.
|
||
// If appks becomes unhealthy, clear the MessageService (degraded mode)
|
||
// and trigger an immediate reconnection attempt.
|
||
{
|
||
let health_service = service.clone();
|
||
let health_addr = current_appks_addr.clone();
|
||
let health_reconnect = try_connect_service.clone();
|
||
tokio::spawn(async move {
|
||
tracing::info!(
|
||
interval_secs = 5,
|
||
"Health check loop started — probing appks gRPC health every 5s"
|
||
);
|
||
|
||
let mut last_logged_ok = false;
|
||
let mut waiting_ticks = 0u64;
|
||
|
||
loop {
|
||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||
|
||
// Snapshot the current appks address
|
||
let addr = health_addr.read().await.clone();
|
||
let Some(ref addr) = addr else {
|
||
// No appks connected yet
|
||
waiting_ticks += 1;
|
||
if !last_logged_ok && waiting_ticks % 12 == 1 {
|
||
// Log every ~60s while waiting (12 × 5s)
|
||
tracing::info!(
|
||
waiting_secs = waiting_ticks * 5,
|
||
"No appks connection — health check paused, waiting for etcd discovery"
|
||
);
|
||
}
|
||
continue;
|
||
};
|
||
|
||
waiting_ticks = 0;
|
||
|
||
match imks::rpc::health::check_appks_health(addr).await {
|
||
Ok(true) => {
|
||
if !last_logged_ok {
|
||
tracing::info!(%addr, "appks health check OK");
|
||
last_logged_ok = true;
|
||
}
|
||
}
|
||
Ok(false) | Err(_) => {
|
||
tracing::warn!(%addr, "appks health check FAILED — entering degraded mode");
|
||
|
||
// Clear the MessageService so event handlers drop requests
|
||
let mut guard = health_service.write().await;
|
||
*guard = None;
|
||
|
||
// Clear the tracked address so the next cycle skips
|
||
// health checks until a new connection is established
|
||
let mut addr_guard = health_addr.write().await;
|
||
*addr_guard = None;
|
||
|
||
last_logged_ok = false;
|
||
|
||
// Trigger an immediate reconnection attempt.
|
||
// If the same address recovers, try_connect will restore
|
||
// the service. If not, we wait for etcd discovery.
|
||
let reconnect = health_reconnect.clone();
|
||
let reconnect_addr = addr.clone();
|
||
tokio::spawn(async move {
|
||
if !reconnect(reconnect_addr).await {
|
||
tracing::warn!(
|
||
"Immediate reconnection after health failure failed, waiting for etcd discovery"
|
||
);
|
||
}
|
||
});
|
||
}
|
||
}
|
||
}
|
||
});
|
||
}
|
||
|
||
// Register connect handler
|
||
let namespace = socket_server.of("/");
|
||
let svc_connect = service.clone();
|
||
namespace
|
||
.on_connect(move |socket, auth_data| {
|
||
let svc = svc_connect.blocking_read();
|
||
if let Some(ref svc) = *svc {
|
||
svc.authenticate_socket(socket, auth_data)
|
||
.map_err(|e| e.to_string())?;
|
||
}
|
||
|
||
// Increment connection metrics
|
||
let m = telemetry::metrics::get();
|
||
m.connections_active.add(
|
||
1,
|
||
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
|
||
);
|
||
m.connections_total.add(
|
||
1,
|
||
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
|
||
);
|
||
telemetry::health::connection_connected();
|
||
|
||
tracing::info!(
|
||
socket_sid = %socket.sid,
|
||
engine_sid = %socket.engine_sid,
|
||
namespace = %socket.namespace,
|
||
"Socket connected"
|
||
);
|
||
Ok(())
|
||
})
|
||
.await;
|
||
|
||
// Register Socket.IO event handlers (always register — each handler reads the latest service)
|
||
{
|
||
let svc = service.clone();
|
||
macro_rules! register_event {
|
||
($svc:expr, $ns:expr, $event:expr, $method:ident) => {
|
||
let s = $svc.clone();
|
||
let event_name = $event.to_string();
|
||
$ns.on_event($event, Arc::new(move |socket, data| {
|
||
let s = s.clone();
|
||
let data = data.clone();
|
||
let event = event_name.clone();
|
||
tokio::spawn(async move {
|
||
let _span = tracing::info_span!(
|
||
"socket_event",
|
||
otel.name = format!("handle {event}"),
|
||
event = %event,
|
||
socket_sid = %socket.sid,
|
||
);
|
||
let _enter = _span.enter();
|
||
|
||
let svc_guard = s.read().await;
|
||
let Some(ref svc) = *svc_guard else {
|
||
tracing::warn!(event = %event, "No message service available, dropping event");
|
||
return;
|
||
};
|
||
let start = std::time::Instant::now();
|
||
if let Err(e) = svc.$method(socket, &data).await {
|
||
tracing::error!(event = %event, error = %e, "Event handler failed");
|
||
}
|
||
let elapsed = start.elapsed().as_secs_f64();
|
||
telemetry::metrics::get().event_handling_duration.record(
|
||
elapsed,
|
||
&telemetry::MetricsInstruments::event_attrs(&event),
|
||
);
|
||
});
|
||
})).await;
|
||
};
|
||
}
|
||
|
||
register_event!(svc, namespace, "channel:join", join_channel);
|
||
register_event!(svc, namespace, "channel:leave", leave_channel);
|
||
register_event!(svc, namespace, "message:send", send_message);
|
||
register_event!(svc, namespace, "message:edit", edit_message);
|
||
register_event!(svc, namespace, "message:delete", delete_message);
|
||
register_event!(svc, namespace, "reaction:add", toggle_reaction);
|
||
register_event!(svc, namespace, "pin:add", pin_message);
|
||
register_event!(svc, namespace, "pin:remove", unpin_message);
|
||
register_event!(svc, namespace, "poll:vote", poll_vote);
|
||
register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote);
|
||
register_event!(svc, namespace, "typing:start", typing_start);
|
||
register_event!(svc, namespace, "typing:stop", typing_stop);
|
||
register_event!(svc, namespace, "presence:update", presence_update);
|
||
register_event!(svc, namespace, "draft:save", save_draft);
|
||
register_event!(svc, namespace, "draft:get", get_draft);
|
||
register_event!(svc, namespace, "draft:delete", delete_draft);
|
||
register_event!(svc, namespace, "read_state:mark", mark_read);
|
||
register_event!(svc, namespace, "read_state:get", get_read_state);
|
||
register_event!(svc, namespace, "notification:list", list_notifications);
|
||
register_event!(
|
||
svc,
|
||
namespace,
|
||
"notification:mark_read",
|
||
mark_notification_read
|
||
);
|
||
register_event!(
|
||
svc,
|
||
namespace,
|
||
"notification:mark_all_read",
|
||
mark_all_notifications_read
|
||
);
|
||
register_event!(svc, namespace, "bookmark:add", add_bookmark);
|
||
register_event!(svc, namespace, "bookmark:remove", remove_bookmark);
|
||
register_event!(svc, namespace, "bookmark:list", list_bookmarks);
|
||
register_event!(svc, namespace, "thread:create", create_thread);
|
||
register_event!(svc, namespace, "thread:resolve", resolve_thread);
|
||
register_event!(svc, namespace, "thread:join", join_thread);
|
||
register_event!(svc, namespace, "thread:leave", leave_thread);
|
||
register_event!(svc, namespace, "thread:list", list_threads);
|
||
register_event!(svc, namespace, "article:create", create_article);
|
||
register_event!(svc, namespace, "article:update", update_article);
|
||
register_event!(svc, namespace, "article:list", list_articles);
|
||
register_event!(svc, namespace, "article:delete", delete_article);
|
||
register_event!(svc, namespace, "component:interact", interact_component);
|
||
register_event!(svc, namespace, "component:update", update_component);
|
||
|
||
tracing::info!("Registered Socket.IO event handlers");
|
||
}
|
||
|
||
// Start scheduled message dispatcher (once)
|
||
if let Some(ref svc) = *service.read().await {
|
||
svc.clone().start_scheduled_dispatcher();
|
||
}
|
||
|
||
// Start servers
|
||
if deploy.webtransport_enabled && !deploy.cert_path.is_empty() {
|
||
let engine = socket_server.engine.clone();
|
||
let wt_port = deploy.webtransport_port;
|
||
let cert_path = deploy.cert_path.clone();
|
||
let key_path = deploy.key_path.clone();
|
||
let server = socket_server.clone();
|
||
|
||
tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port);
|
||
|
||
tokio::select! {
|
||
result = server.run_http(addr) => {
|
||
result?;
|
||
}
|
||
result = engine.run_webtransport(wt_port, &cert_path, &key_path) => {
|
||
result?;
|
||
}
|
||
}
|
||
} else {
|
||
tracing::info!("Socket.IO HTTP server listening on {}", addr);
|
||
socket_server.run_http(addr).await?;
|
||
}
|
||
|
||
// Graceful telemetry shutdown
|
||
telemetry_guard.shutdown();
|
||
|
||
Ok(())
|
||
}
|
||
|
||
/// Create a local broadcast function for Redis/NATS adapters.
|
||
///
|
||
/// The callback is used both for same-node delivery and for cross-node messages
|
||
/// received from the message bus.
|
||
fn make_local_broadcast_fn(
|
||
namespaces: Arc<OnceLock<Arc<imks::socket::namespace::NamespaceManager>>>,
|
||
) -> LocalBroadcastFn {
|
||
Arc::new(move |packet, opts| {
|
||
let Some(manager) = namespaces.get() else {
|
||
tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized");
|
||
return;
|
||
};
|
||
let Some(namespace) = manager.get_namespace(&packet.namespace) else {
|
||
tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast");
|
||
return;
|
||
};
|
||
namespace.emit_local_filtered(packet, opts);
|
||
})
|
||
}
|