Files
imks/socket/server.rs
T
zhenyi 0dbac480ae feat(telemetry): integrate OpenTelemetry observability stack with health metrics
- Add OpenTelemetry SDK, OTLP exporter, Prometheus integration
- Implement connection tracking with active/total/disconnection metrics
- Add health endpoint with uptime and connection counts
- Integrate tracing spans for socket events and engine messages
- Add metrics collection for event handling duration
- Update health endpoint to include live runtime state
- Add graceful telemetry shutdown in main function
- Implement engine session active metrics tracking
- Add namespace-specific attributes to connection metrics
- Introduce message edit history retrieval endpoint
- Add scheduled message CRUD operations and dispatcher
- Update Socket.IO event registration with observability
- Refactor component update to remove dead code allowance
- Add comprehensive environment variables documentation
- Implement detailed development guidelines in AGENTS.md
2026-06-11 13:53:29 +08:00

349 lines
12 KiB
Rust

use std::sync::Arc;
use dashmap::DashMap;
use tokio::sync::mpsc;
use crate::engine::packet::Packet as EnginePacket;
use crate::engine::packet::PacketData as EnginePacketData;
use crate::engine::server::{EngineConfig, EngineServer};
use crate::engine::session::SessionStore;
use crate::socket::adapter::{Adapter, LocalAdapter};
use crate::socket::namespace::NamespaceManager;
use crate::socket::packet::{Packet, PacketType};
use crate::socket::parser;
use crate::socket::socket::Socket;
pub struct SocketServer {
pub engine: Arc<EngineServer>,
pub namespaces: Arc<NamespaceManager>,
pub adapter: Arc<dyn Adapter>,
socket_txs: Arc<DashMap<String, mpsc::Sender<Packet>>>,
}
impl SocketServer {
pub fn new(config: EngineConfig) -> Self {
SocketServerBuilder::new(config).build()
}
pub fn builder(config: EngineConfig) -> SocketServerBuilder {
SocketServerBuilder::new(config)
}
pub fn of(&self, path: impl Into<String>) -> Arc<crate::socket::namespace::Namespace> {
self.namespaces.get_or_create_namespace(&path.into())
}
pub async fn run_http(self: Arc<Self>, addr: &str) -> std::io::Result<()> {
self.engine.clone().run_http(addr).await
}
pub fn register_socket(&self, sid: String, tx: mpsc::Sender<Packet>) {
self.socket_txs.insert(sid, tx);
}
pub fn unregister_socket(&self, sid: &str) {
self.socket_txs.remove(sid);
}
}
pub struct SocketServerBuilder {
config: EngineConfig,
adapter: Option<Arc<dyn Adapter>>,
}
impl SocketServerBuilder {
pub fn new(config: EngineConfig) -> Self {
Self {
config,
adapter: None,
}
}
pub fn adapter(mut self, adapter: Arc<dyn Adapter>) -> Self {
self.adapter = Some(adapter);
self
}
pub fn build(self) -> SocketServer {
let namespaces = Arc::new(NamespaceManager::new());
let socket_txs: Arc<DashMap<String, mpsc::Sender<Packet>>> = Arc::new(DashMap::new());
let engine_store = SessionStore::new();
let namespaces_clone = namespaces.clone();
let socket_txs_clone = socket_txs.clone();
let engine_store_clone = engine_store.clone();
let adapter: Arc<dyn Adapter> = self.adapter.unwrap_or_else(|| {
let ns_clone = namespaces.clone();
let send_fn = move |engine_sid: &str, packet: &Packet| {
if let Some(ns) = ns_clone.get_namespace(&packet.namespace) {
if let Some(socket) = ns.get_socket_by_engine_sid(engine_sid) {
socket.send_packet(packet).map_err(|e| e.to_string())
} else {
Err(format!(
"Socket with engine_sid {} not found in namespace {}",
engine_sid, packet.namespace
))
}
} else {
Err(format!("Namespace {} not found", packet.namespace))
}
};
Arc::new(LocalAdapter::new(send_fn))
});
let adapter_clone = adapter.clone();
let engine = Arc::new(EngineServer::with_store(
self.config,
engine_store,
move |sid, engine_packet| {
let namespaces = namespaces_clone.clone();
let socket_txs = socket_txs_clone.clone();
let engine_store = engine_store_clone.clone();
let adapter = adapter_clone.clone();
tokio::spawn(async move {
handle_engine_message(
sid,
engine_packet,
&namespaces,
&socket_txs,
&engine_store,
&adapter,
)
.await;
});
},
));
let server = SocketServer {
engine,
namespaces,
adapter,
socket_txs,
};
for ns in server.namespaces.all_namespaces() {
let adapter_ref = server.adapter.clone();
tokio::spawn(async move {
ns.set_adapter(adapter_ref).await;
});
}
server
}
}
async fn handle_engine_message(
engine_sid: String,
engine_packet: EnginePacket,
namespaces: &Arc<NamespaceManager>,
socket_txs: &Arc<DashMap<String, mpsc::Sender<Packet>>>,
engine_store: &SessionStore,
adapter: &Arc<dyn Adapter>,
) {
if let EnginePacketData::Text(ref text) = engine_packet.data {
match parser::decode(text) {
Ok(socket_packet) => {
let packet_type = format!("{:?}", socket_packet.packet_type);
let _span = tracing::debug_span!(
"engine_message",
engine_sid = %engine_sid,
packet_type = %packet_type,
namespace = %socket_packet.namespace,
);
let _enter = _span.enter();
match socket_packet.packet_type {
PacketType::Connect => {
handle_connect(
&engine_sid,
&socket_packet,
namespaces,
socket_txs,
engine_store,
adapter,
)
.await;
}
PacketType::Disconnect => {
handle_disconnect(&engine_sid, &socket_packet, namespaces, socket_txs);
}
PacketType::Event => {
handle_event(&engine_sid, &socket_packet, namespaces);
}
PacketType::Ack => {
handle_ack(&engine_sid, &socket_packet);
}
_ => {}
}
}
Err(e) => {
tracing::warn!(engine_sid = %engine_sid, error = %e, "Invalid Socket.IO packet");
}
}
}
}
async fn handle_connect(
engine_sid: &str,
packet: &Packet,
namespaces: &Arc<NamespaceManager>,
socket_txs: &Arc<DashMap<String, mpsc::Sender<Packet>>>,
engine_store: &SessionStore,
adapter: &Arc<dyn Adapter>,
) {
let _span = tracing::info_span!(
"socket_connect",
engine_sid = %engine_sid,
namespace = %packet.namespace,
);
let _enter = _span.enter();
// Validate namespace path to prevent DoS via arbitrary namespace creation
if !crate::socket::namespace::is_valid_namespace(&packet.namespace) {
tracing::warn!(
"Rejected connect with invalid namespace: {}",
packet.namespace
);
return;
}
let namespace = namespaces.get_or_create_namespace(&packet.namespace);
// Ensure newly created namespaces get the shared adapter before registration.
{
let ns_adapter = namespace.adapter.read().await;
if ns_adapter.is_none() {
drop(ns_adapter);
namespace.set_adapter(adapter.clone()).await;
}
}
let socket_sid = crate::engine::session::generate_sid();
let (tx, mut rx) = mpsc::channel::<Packet>(256);
socket_txs.insert(socket_sid.clone(), tx.clone());
let socket = Arc::new(Socket::new(
socket_sid.clone(),
packet.namespace.clone(),
engine_sid.to_string(),
tx,
));
// Run connect handler and add to namespace.
// If the handler rejects, clean up and do NOT send a Connect response.
if let Err(msg) = namespace
.add_socket(socket.clone(), packet.data.as_ref())
.await
{
tracing::warn!("Socket {} connection rejected: {}", socket_sid, msg);
socket_txs.remove(&socket_sid);
return;
}
// Connect handler passed — spawn forwarding task
let engine_store_clone = engine_store.clone();
let engine_sid_clone = engine_sid.to_string();
let socket_sid_clone = socket_sid.clone();
let socket_txs_clone = socket_txs.clone();
let namespace_clone = namespace.clone();
tokio::spawn(async move {
while let Some(socket_packet) = rx.recv().await {
let encoded = parser::encode(&socket_packet);
let engine_packet = EnginePacket::message_text(encoded);
if let Some(session) = engine_store_clone.get(&engine_sid_clone) {
let mut s = session.write().await;
if s.state == crate::engine::session::SessionState::Closed {
break;
}
s.push_packet(engine_packet);
} else {
break;
}
}
// Forwarding task ended — ensure socket is cleaned up from namespace.
// If the socket was still registered (session expiry / engine disconnect
// without Socket.IO disconnect packet), also update the connection counter.
socket_txs_clone.remove(&socket_sid_clone);
let was_removed = namespace_clone
.remove_socket_by_sid(&socket_sid_clone)
.await;
if was_removed {
crate::telemetry::health::connection_disconnected();
}
});
// Send Connect response (only after handler passed)
let response = Packet::connect(
&socket.namespace,
Some(serde_json::json!({ "sid": &socket.sid })),
);
if socket.send_packet(&response).is_err() {
tracing::warn!("Failed to send connect response to socket {}", socket.sid);
}
}
fn handle_disconnect(
engine_sid: &str,
packet: &Packet,
namespaces: &Arc<NamespaceManager>,
socket_txs: &Arc<DashMap<String, mpsc::Sender<Packet>>>,
) {
if let Some(namespace) = namespaces.get_namespace(&packet.namespace)
&& let Some(socket) = namespace.get_socket_by_engine_sid(engine_sid)
{
let m = crate::telemetry::metrics::get();
m.connections_active.add(
-1,
&crate::telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
m.disconnections_total.add(
1,
&crate::telemetry::MetricsInstruments::namespace_attrs(&socket.namespace),
);
crate::telemetry::health::connection_disconnected();
socket_txs.remove(&socket.sid);
let socket_sid = socket.sid.clone();
let ns_clone = namespace.clone();
tokio::spawn(async move {
ns_clone.remove_socket_by_sid(&socket_sid).await;
});
}
}
fn handle_event(engine_sid: &str, packet: &Packet, namespaces: &Arc<NamespaceManager>) {
if let Some(namespace) = namespaces.get_namespace(&packet.namespace)
&& let Some(socket) = namespace.get_socket_by_engine_sid(engine_sid)
&& let Some(ref data) = packet.data
&& let Some(arr) = data.as_array()
&& let Some(event) = arr.first().and_then(|v| v.as_str())
{
let event_data = if arr.len() > 1 {
serde_json::Value::Array(arr[1..].to_vec())
} else {
serde_json::Value::Null
};
let namespace_clone = namespace.clone();
let event = event.to_string();
let socket_clone = socket.clone();
tokio::spawn(async move {
namespace_clone
.handle_event(socket_clone, &event, &event_data)
.await;
});
}
}
fn handle_ack(engine_sid: &str, packet: &Packet) {
tracing::debug!(
"Received ACK from {} for namespace {} with id {:?}",
engine_sid,
packet.namespace,
packet.id
);
}