10cb60f4ea
- Add ImksConfig struct combining DeployConfig, RpcConfig, DatabaseConfig - Add ImksServer / ImksServerBuilder to lib.rs - Move all startup logic (etcd, adapters, DB, socket server, event handlers, health checks, etcd watcher, appks connection) into builder - Expose serve() for embedding - main.rs reduced from ~350 to ~10 lines
639 lines
25 KiB
Rust
639 lines
25 KiB
Rust
pub mod auth;
|
|
pub mod database;
|
|
pub mod engine;
|
|
pub mod error;
|
|
pub mod etcd;
|
|
pub mod models;
|
|
pub mod pb;
|
|
pub mod repo;
|
|
pub mod rpc;
|
|
pub mod socket;
|
|
pub mod svc;
|
|
pub mod telemetry;
|
|
|
|
pub use error::{ImksError, ImksResult};
|
|
|
|
use std::sync::{Arc, OnceLock};
|
|
use std::time::Duration;
|
|
|
|
use tokio::sync::RwLock;
|
|
|
|
use crate::database::{Database, DatabaseConfig};
|
|
use crate::engine::server::EngineConfig;
|
|
use crate::repo::MessageRepo;
|
|
use crate::rpc::{AppksClients, RpcConfig};
|
|
use crate::socket::adapter::{LocalBroadcastFn, NatsAdapter, RedisAdapter};
|
|
use crate::socket::message_bus::{NatsMessageBus, RedisMessageBus};
|
|
use crate::socket::server::SocketServerBuilder;
|
|
use crate::svc::{DeployConfig, MessageService};
|
|
|
|
/// Combined configuration for an imks server.
|
|
///
|
|
/// Use [`ImksConfig::from_env`] for microservice deployments, or construct
|
|
/// manually when embedding as a library.
|
|
#[derive(Debug, Clone)]
|
|
pub struct ImksConfig {
|
|
pub deploy: DeployConfig,
|
|
pub rpc: RpcConfig,
|
|
pub db: DatabaseConfig,
|
|
pub etcd_endpoints: Vec<String>,
|
|
pub etcd_prefix: String,
|
|
pub http_addr: String,
|
|
pub grpc_health_addr: String,
|
|
pub engine: EngineConfig,
|
|
}
|
|
|
|
impl ImksConfig {
|
|
/// Build config from environment variables with defaults.
|
|
pub fn from_env() -> Self {
|
|
Self {
|
|
deploy: DeployConfig::from_env(),
|
|
rpc: RpcConfig::from_env(),
|
|
db: DatabaseConfig::from_env(),
|
|
etcd_endpoints: std::env::var("ETCD_ENDPOINTS")
|
|
.unwrap_or_else(|_| "http://localhost:2379".to_string())
|
|
.split(',')
|
|
.map(|s| s.trim().to_string())
|
|
.filter(|s| !s.is_empty())
|
|
.collect(),
|
|
etcd_prefix: std::env::var("ETCD_KEY_PREFIX")
|
|
.unwrap_or_else(|_| "/appks/".to_string()),
|
|
http_addr: std::env::var("IMKS_HTTP_ADDR")
|
|
.unwrap_or_else(|_| "0.0.0.0:50048".to_string()),
|
|
grpc_health_addr: std::env::var("IMKS_GRPC_HEALTH_ADDR")
|
|
.unwrap_or_else(|_| "0.0.0.0:50047".to_string()),
|
|
engine: EngineConfig::default(),
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A ready-to-run imks Socket.IO + gRPC server.
|
|
pub struct ImksServer {
|
|
socket_server: Arc<crate::socket::server::SocketServer>,
|
|
http_addr: String,
|
|
webtransport_enabled: bool,
|
|
webtransport_port: u16,
|
|
cert_path: String,
|
|
key_path: String,
|
|
telemetry_guard: Option<telemetry::TelemetryGuard>,
|
|
}
|
|
|
|
/// Builder for [`ImksServer`].
|
|
///
|
|
/// # Examples
|
|
///
|
|
/// ```no_run
|
|
/// use imks::{ImksServer, ImksConfig};
|
|
///
|
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
|
/// let config = ImksConfig::from_env();
|
|
/// let server = ImksServer::builder()
|
|
/// .config(config)
|
|
/// .build()
|
|
/// .await?;
|
|
/// server.serve().await?;
|
|
/// # Ok(())
|
|
/// # }
|
|
/// ```
|
|
pub struct ImksServerBuilder {
|
|
config: Option<ImksConfig>,
|
|
}
|
|
|
|
impl ImksServer {
|
|
/// Create a new builder.
|
|
pub fn builder() -> ImksServerBuilder {
|
|
ImksServerBuilder::default()
|
|
}
|
|
|
|
/// Start the HTTP/WebTransport server and block until shutdown.
|
|
pub async fn serve(self) -> Result<(), Box<dyn std::error::Error>> {
|
|
let addr = &self.http_addr;
|
|
let engine = self.socket_server.engine.clone();
|
|
let wt_port = self.webtransport_port;
|
|
let cert_path = self.cert_path.clone();
|
|
let key_path = self.key_path.clone();
|
|
let server = self.socket_server.clone();
|
|
|
|
if self.webtransport_enabled && !cert_path.is_empty() {
|
|
tracing::info!("Starting HTTP on {} + WebTransport on {}", addr, wt_port);
|
|
|
|
tokio::select! {
|
|
result = server.run_http(addr) => {
|
|
result?;
|
|
}
|
|
result = engine.run_webtransport(wt_port, &cert_path, &key_path) => {
|
|
result?;
|
|
}
|
|
}
|
|
} else {
|
|
tracing::info!("Socket.IO HTTP server listening on {}", addr);
|
|
server.run_http(addr).await?;
|
|
}
|
|
|
|
// Graceful telemetry shutdown
|
|
if let Some(guard) = self.telemetry_guard {
|
|
guard.shutdown();
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl ImksServerBuilder {
|
|
/// Set the server configuration.
|
|
pub fn config(mut self, config: ImksConfig) -> Self {
|
|
self.config = Some(config);
|
|
self
|
|
}
|
|
|
|
/// Build the server: connect to etcd, database, set up adapters, register event handlers.
|
|
pub async fn build(self) -> Result<ImksServer, Box<dyn std::error::Error>> {
|
|
dotenvy::dotenv().ok();
|
|
|
|
let config = self.config.unwrap_or_else(ImksConfig::from_env);
|
|
|
|
// Initialize observability
|
|
let telemetry_guard = telemetry::init();
|
|
telemetry::health::init_counters();
|
|
|
|
let deploy = &config.deploy;
|
|
|
|
tracing::info!(
|
|
adapter = %deploy.adapter_mode,
|
|
server_id = %deploy.server_id,
|
|
wt_enabled = deploy.webtransport_enabled,
|
|
"Starting imks server"
|
|
);
|
|
|
|
// etcd: connect, register, discover appks
|
|
let etcd = crate::etcd::EtcdConfig::connect(
|
|
config.etcd_endpoints.clone(),
|
|
&config.etcd_prefix,
|
|
)
|
|
.await
|
|
.unwrap_or_else(|e| {
|
|
tracing::error!(error = %e, "etcd required but unavailable");
|
|
panic!("etcd required: {e}")
|
|
});
|
|
|
|
let registry = crate::etcd::ServiceRegistry::new(etcd.client(), &config.etcd_prefix);
|
|
let imks_addr = etcd.get("IMKS_ADDR", "0.0.0.0:3000").await;
|
|
registry.register("imks", &imks_addr).await.ok();
|
|
|
|
let appks_addr = etcd
|
|
.discover_service("appks")
|
|
.await
|
|
.ok()
|
|
.and_then(|addrs| addrs.into_iter().next())
|
|
.map(|addr| {
|
|
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
|
|
format!("http://{}", normalized)
|
|
})
|
|
.unwrap_or_else(|| config.rpc.appks_addr.clone());
|
|
|
|
tracing::info!(appks_addr = %appks_addr, "appks discovered via etcd");
|
|
|
|
let current_appks_addr: Arc<RwLock<Option<String>>> = Arc::new(RwLock::new(None));
|
|
|
|
// Start gRPC health server
|
|
{
|
|
let grpc_health_addr: std::net::SocketAddr = config
|
|
.grpc_health_addr
|
|
.parse()
|
|
.expect("Invalid IMKS_GRPC_HEALTH_ADDR");
|
|
tokio::spawn(async move {
|
|
if let Err(e) = crate::rpc::health::start_health_server(grpc_health_addr).await {
|
|
tracing::error!(error = %e, "imks gRPC health server failed");
|
|
}
|
|
});
|
|
}
|
|
|
|
let engine_config = config.engine.clone();
|
|
let mut builder = SocketServerBuilder::new(engine_config);
|
|
let namespace_holder: Arc<OnceLock<Arc<crate::socket::namespace::NamespaceManager>>> =
|
|
Arc::new(OnceLock::new());
|
|
|
|
// Configure adapter
|
|
match deploy.adapter_mode.as_str() {
|
|
"redis" => {
|
|
let redis_url = deploy
|
|
.redis_url()
|
|
.map_err(|e| format!("Invalid Redis configuration: {e}"))?;
|
|
tracing::info!(
|
|
cluster_enabled = deploy.redis_cluster_enabled,
|
|
"Configuring Redis adapter"
|
|
);
|
|
let message_bus = Arc::new(
|
|
RedisMessageBus::new(&redis_url)
|
|
.await
|
|
.map_err(|e| format!("Failed to connect to Redis: {e}"))?,
|
|
);
|
|
let redis_client = message_bus.client().clone();
|
|
let server_id = deploy.server_id.clone();
|
|
let adapter = Arc::new(RedisAdapter::new(
|
|
message_bus.clone() as Arc<_>,
|
|
redis_client,
|
|
server_id,
|
|
"/".into(),
|
|
make_local_broadcast_fn(namespace_holder.clone()),
|
|
));
|
|
adapter
|
|
.init()
|
|
.await
|
|
.map_err(|e| format!("Failed to initialize Redis adapter: {e}"))?;
|
|
builder = builder.adapter(adapter);
|
|
tracing::info!("Redis adapter configured for multi-node");
|
|
}
|
|
"nats" => {
|
|
let message_bus = Arc::new(
|
|
NatsMessageBus::new(&deploy.nats_url)
|
|
.await
|
|
.map_err(|e| format!("Failed to connect to NATS: {e}"))?,
|
|
);
|
|
let server_id = deploy.server_id.clone();
|
|
let adapter = Arc::new(NatsAdapter::new(
|
|
message_bus.clone() as Arc<_>,
|
|
server_id,
|
|
"/".into(),
|
|
make_local_broadcast_fn(namespace_holder.clone()),
|
|
));
|
|
adapter
|
|
.init()
|
|
.await
|
|
.map_err(|e| format!("Failed to initialize NATS adapter: {e}"))?;
|
|
builder = builder.adapter(adapter);
|
|
tracing::info!("NATS adapter configured for multi-node");
|
|
}
|
|
_ => {
|
|
tracing::info!("Local adapter (single-node mode)");
|
|
}
|
|
}
|
|
|
|
let socket_server = Arc::new(builder.build());
|
|
let _ = namespace_holder.set(socket_server.namespaces.clone());
|
|
|
|
// Connect to database
|
|
let db = Database::connect(&config.db)
|
|
.await
|
|
.map_err(|e| format!("Database connection failed: {e}"))?;
|
|
crate::database::run_migrations(db.pool())
|
|
.await
|
|
.map_err(|e| format!("Database migration failed: {e}"))?;
|
|
let repo = Arc::new(MessageRepo::new(db.pool().clone()));
|
|
tracing::info!("Database connected and migrated");
|
|
|
|
// Shared service handle
|
|
let service: Arc<RwLock<Option<Arc<MessageService>>>> = Arc::new(RwLock::new(None));
|
|
|
|
// Helper to try creating a MessageService
|
|
let try_connect_service = {
|
|
let service = service.clone();
|
|
let repo = repo.clone();
|
|
let namespaces = socket_server.namespaces.clone();
|
|
let current_addr = current_appks_addr.clone();
|
|
let rpc_config = config.rpc.clone();
|
|
move |addr: String| {
|
|
let service = service.clone();
|
|
let repo = repo.clone();
|
|
let namespaces = namespaces.clone();
|
|
let current_addr = current_addr.clone();
|
|
let mut rpc = rpc_config.clone();
|
|
let normalized = addr.replace("0.0.0.0", "127.0.0.1");
|
|
rpc.appks_addr = if normalized.starts_with("http") {
|
|
normalized
|
|
} else {
|
|
format!("http://{}", normalized)
|
|
};
|
|
async move {
|
|
let max_retries = 5;
|
|
let mut delay = Duration::from_millis(500);
|
|
|
|
for attempt in 1..=max_retries {
|
|
match AppksClients::connect(&rpc).await {
|
|
Ok(clients) => match MessageService::new(
|
|
(*repo).clone(),
|
|
clients,
|
|
namespaces.clone(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(svc) => {
|
|
let svc = Arc::new(svc);
|
|
let mut guard = service.write().await;
|
|
*guard = Some(svc);
|
|
|
|
let mut addr_guard = current_addr.write().await;
|
|
*addr_guard = Some(rpc.appks_addr.clone());
|
|
|
|
tracing::info!(
|
|
addr = %rpc.appks_addr,
|
|
attempt,
|
|
"Message service initialized"
|
|
);
|
|
return true;
|
|
}
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
addr = %rpc.appks_addr,
|
|
attempt,
|
|
error = %e,
|
|
"Failed to init message service"
|
|
);
|
|
if attempt < max_retries {
|
|
tokio::time::sleep(delay).await;
|
|
delay *= 2;
|
|
}
|
|
}
|
|
},
|
|
Err(e) => {
|
|
tracing::warn!(
|
|
addr = %rpc.appks_addr,
|
|
attempt,
|
|
error = %e,
|
|
"gRPC connect failed"
|
|
);
|
|
if attempt < max_retries {
|
|
tokio::time::sleep(delay).await;
|
|
delay *= 2;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
tracing::error!(
|
|
addr = %rpc.appks_addr,
|
|
max_retries,
|
|
"All connection attempts to appks exhausted"
|
|
);
|
|
false
|
|
}
|
|
}
|
|
};
|
|
|
|
// Try initial connection
|
|
let env_fallback = std::env::var("APPKS_GRPC_ADDR")
|
|
.unwrap_or_else(|_| "http://localhost:50051".to_string());
|
|
if !try_connect_service(appks_addr.clone()).await {
|
|
if appks_addr != env_fallback {
|
|
tracing::info!(
|
|
etcd_addr = %appks_addr,
|
|
fallback = %env_fallback,
|
|
"etcd-discovered appks unreachable, trying env fallback"
|
|
);
|
|
try_connect_service(env_fallback).await;
|
|
} else {
|
|
tracing::warn!(
|
|
%appks_addr,
|
|
"Initial connection to appks failed — will retry when etcd discovers new instances"
|
|
);
|
|
}
|
|
}
|
|
|
|
// Watch etcd for appks join/leave
|
|
{
|
|
let try_connect = try_connect_service.clone();
|
|
let svc_clear = service.clone();
|
|
let addr_clear = current_appks_addr.clone();
|
|
etcd.start_service_watcher(
|
|
"appks",
|
|
move |addr| {
|
|
tracing::info!(%addr, "etcd watcher: appks instance UP, attempting reconnection");
|
|
let try_connect = try_connect.clone();
|
|
tokio::spawn(async move {
|
|
let ok = try_connect(addr).await;
|
|
tracing::info!(ok, "etcd watcher: reconnection result");
|
|
});
|
|
},
|
|
move |_addr| {
|
|
let svc_clear = svc_clear.clone();
|
|
let addr_clear = addr_clear.clone();
|
|
tokio::spawn(async move {
|
|
let mut guard = svc_clear.write().await;
|
|
*guard = None;
|
|
let mut addr_guard = addr_clear.write().await;
|
|
*addr_guard = None;
|
|
tracing::warn!(
|
|
"etcd watcher: appks instance DOWN — running without permission checks"
|
|
);
|
|
});
|
|
},
|
|
);
|
|
}
|
|
|
|
// Health check loop
|
|
{
|
|
let health_service = service.clone();
|
|
let health_addr = current_appks_addr.clone();
|
|
let health_reconnect = try_connect_service.clone();
|
|
tokio::spawn(async move {
|
|
tracing::info!(
|
|
interval_secs = 5,
|
|
"Health check loop started — probing appks gRPC health every 5s"
|
|
);
|
|
|
|
let mut last_logged_ok = false;
|
|
let mut waiting_ticks = 0u64;
|
|
|
|
loop {
|
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
|
|
|
let addr = health_addr.read().await.clone();
|
|
let Some(ref addr) = addr else {
|
|
waiting_ticks += 1;
|
|
if !last_logged_ok && waiting_ticks % 12 == 1 {
|
|
tracing::info!(
|
|
waiting_secs = waiting_ticks * 5,
|
|
"No appks connection — health check paused, waiting for etcd discovery"
|
|
);
|
|
}
|
|
continue;
|
|
};
|
|
|
|
waiting_ticks = 0;
|
|
|
|
match crate::rpc::health::check_appks_health(addr).await {
|
|
Ok(true) => {
|
|
if !last_logged_ok {
|
|
tracing::info!(%addr, "appks health check OK");
|
|
last_logged_ok = true;
|
|
}
|
|
}
|
|
Ok(false) | Err(_) => {
|
|
tracing::warn!(%addr, "appks health check FAILED — entering degraded mode");
|
|
|
|
let mut guard = health_service.write().await;
|
|
*guard = None;
|
|
|
|
let mut addr_guard = health_addr.write().await;
|
|
*addr_guard = None;
|
|
|
|
last_logged_ok = false;
|
|
|
|
let reconnect = health_reconnect.clone();
|
|
let reconnect_addr = addr.clone();
|
|
tokio::spawn(async move {
|
|
if !reconnect(reconnect_addr).await {
|
|
tracing::warn!(
|
|
"Immediate reconnection after health failure failed, waiting for etcd discovery"
|
|
);
|
|
}
|
|
});
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
|
|
// Register connect handler
|
|
let namespace = socket_server.of("/");
|
|
let svc_connect = service.clone();
|
|
namespace
|
|
.on_connect(move |socket, auth_data| {
|
|
let svc = svc_connect.blocking_read();
|
|
if let Some(ref svc) = *svc {
|
|
svc.authenticate_socket(socket, auth_data)
|
|
.map_err(|e| e.to_string())?;
|
|
}
|
|
|
|
let m = telemetry::metrics::get();
|
|
m.connections_active.add(
|
|
1,
|
|
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
|
|
);
|
|
m.connections_total.add(
|
|
1,
|
|
&telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
|
|
);
|
|
telemetry::health::connection_connected();
|
|
|
|
tracing::info!(
|
|
socket_sid = %socket.sid,
|
|
engine_sid = %socket.engine_sid,
|
|
namespace = %socket.namespace,
|
|
"Socket connected"
|
|
);
|
|
Ok(())
|
|
})
|
|
.await;
|
|
|
|
// Register Socket.IO event handlers
|
|
{
|
|
let svc = service.clone();
|
|
macro_rules! register_event {
|
|
($svc:expr, $ns:expr, $event:expr, $method:ident) => {
|
|
let s = $svc.clone();
|
|
let event_name = $event.to_string();
|
|
$ns.on_event($event, Arc::new(move |socket, data| {
|
|
let s = s.clone();
|
|
let data = data.clone();
|
|
let event = event_name.clone();
|
|
tokio::spawn(async move {
|
|
let _span = tracing::info_span!(
|
|
"socket_event",
|
|
otel.name = format!("handle {event}"),
|
|
event = %event,
|
|
socket_sid = %socket.sid,
|
|
);
|
|
let _ = _span.enter();
|
|
|
|
let svc_guard = s.read().await;
|
|
let Some(ref svc) = *svc_guard else {
|
|
tracing::warn!(event = %event, "No message service available, dropping event");
|
|
return;
|
|
};
|
|
let start = std::time::Instant::now();
|
|
if let Err(e) = svc.$method(socket, &data).await {
|
|
tracing::error!(event = %event, error = %e, "Event handler failed");
|
|
}
|
|
let elapsed = start.elapsed().as_secs_f64();
|
|
telemetry::metrics::get().event_handling_duration.record(
|
|
elapsed,
|
|
&telemetry::MetricsInstruments::event_attrs(&event),
|
|
);
|
|
});
|
|
})).await;
|
|
};
|
|
}
|
|
|
|
register_event!(svc, namespace, "channel:join", join_channel);
|
|
register_event!(svc, namespace, "channel:leave", leave_channel);
|
|
register_event!(svc, namespace, "message:send", send_message);
|
|
register_event!(svc, namespace, "message:edit", edit_message);
|
|
register_event!(svc, namespace, "message:delete", delete_message);
|
|
register_event!(svc, namespace, "reaction:add", toggle_reaction);
|
|
register_event!(svc, namespace, "pin:add", pin_message);
|
|
register_event!(svc, namespace, "pin:remove", unpin_message);
|
|
register_event!(svc, namespace, "poll:vote", poll_vote);
|
|
register_event!(svc, namespace, "poll:vote:remove", poll_remove_vote);
|
|
register_event!(svc, namespace, "typing:start", typing_start);
|
|
register_event!(svc, namespace, "typing:stop", typing_stop);
|
|
register_event!(svc, namespace, "presence:update", presence_update);
|
|
register_event!(svc, namespace, "draft:save", save_draft);
|
|
register_event!(svc, namespace, "draft:get", get_draft);
|
|
register_event!(svc, namespace, "draft:delete", delete_draft);
|
|
register_event!(svc, namespace, "read_state:mark", mark_read);
|
|
register_event!(svc, namespace, "read_state:get", get_read_state);
|
|
register_event!(svc, namespace, "notification:list", list_notifications);
|
|
register_event!(svc, namespace, "notification:mark_read", mark_notification_read);
|
|
register_event!(svc, namespace, "notification:mark_all_read", mark_all_notifications_read);
|
|
register_event!(svc, namespace, "bookmark:add", add_bookmark);
|
|
register_event!(svc, namespace, "bookmark:remove", remove_bookmark);
|
|
register_event!(svc, namespace, "bookmark:list", list_bookmarks);
|
|
register_event!(svc, namespace, "thread:create", create_thread);
|
|
register_event!(svc, namespace, "thread:resolve", resolve_thread);
|
|
register_event!(svc, namespace, "thread:join", join_thread);
|
|
register_event!(svc, namespace, "thread:leave", leave_thread);
|
|
register_event!(svc, namespace, "thread:list", list_threads);
|
|
register_event!(svc, namespace, "article:create", create_article);
|
|
register_event!(svc, namespace, "article:update", update_article);
|
|
register_event!(svc, namespace, "article:list", list_articles);
|
|
register_event!(svc, namespace, "article:delete", delete_article);
|
|
register_event!(svc, namespace, "component:interact", interact_component);
|
|
register_event!(svc, namespace, "component:update", update_component);
|
|
|
|
tracing::info!("Registered Socket.IO event handlers");
|
|
}
|
|
|
|
// Start scheduled message dispatcher
|
|
if let Some(ref svc) = *service.read().await {
|
|
svc.clone().start_scheduled_dispatcher();
|
|
}
|
|
|
|
Ok(ImksServer {
|
|
socket_server,
|
|
http_addr: config.http_addr,
|
|
webtransport_enabled: deploy.webtransport_enabled,
|
|
webtransport_port: deploy.webtransport_port,
|
|
cert_path: deploy.cert_path.clone(),
|
|
key_path: deploy.key_path.clone(),
|
|
telemetry_guard: Some(telemetry_guard),
|
|
})
|
|
}
|
|
}
|
|
|
|
impl Default for ImksServerBuilder {
|
|
fn default() -> Self {
|
|
Self { config: None }
|
|
}
|
|
}
|
|
|
|
// Re-export telemetry guard for the public API
|
|
pub use telemetry::TelemetryGuard;
|
|
|
|
/// Create a local broadcast function for Redis/NATS adapters.
|
|
fn make_local_broadcast_fn(
|
|
namespaces: Arc<OnceLock<Arc<crate::socket::namespace::NamespaceManager>>>,
|
|
) -> LocalBroadcastFn {
|
|
Arc::new(move |packet, opts| {
|
|
let Some(manager) = namespaces.get() else {
|
|
tracing::warn!(namespace = %packet.namespace, "Namespace manager not initialized");
|
|
return;
|
|
};
|
|
let Some(namespace) = manager.get_namespace(&packet.namespace) else {
|
|
tracing::warn!(namespace = %packet.namespace, "Namespace not found for local broadcast");
|
|
return;
|
|
};
|
|
namespace.emit_local_filtered(packet, opts);
|
|
})
|
|
}
|