Files
zhenyi 0dbac480ae feat(telemetry): integrate OpenTelemetry observability stack with health metrics
- Add OpenTelemetry SDK, OTLP exporter, Prometheus integration
- Implement connection tracking with active/total/disconnection metrics
- Add health endpoint with uptime and connection counts
- Integrate tracing spans for socket events and engine messages
- Add metrics collection for event handling duration
- Update health endpoint to include live runtime state
- Add graceful telemetry shutdown in main function
- Implement engine session active metrics tracking
- Add namespace-specific attributes to connection metrics
- Introduce message edit history retrieval endpoint
- Add scheduled message CRUD operations and dispatcher
- Update Socket.IO event registration with observability
- Refactor component update to remove dead code allowance
- Add comprehensive environment variables documentation
- Implement detailed development guidelines in AGENTS.md
2026-06-11 13:53:29 +08:00

169 lines
4.6 KiB
Rust

use std::sync::Arc;
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, web};
use crate::engine::heartbeat::HeartbeatManager;
use crate::engine::packet::Packet;
use crate::engine::session::SessionStore;
#[derive(Debug, Clone)]
pub struct EngineConfig {
pub ping_interval: u64,
pub ping_timeout: u64,
pub max_payload: usize,
pub path: String,
}
impl Default for EngineConfig {
fn default() -> Self {
Self {
ping_interval: 25000,
ping_timeout: 20000,
max_payload: 1_000_000,
path: "/engine.io/".to_string(),
}
}
}
pub struct EngineServer {
pub config: EngineConfig,
pub store: SessionStore,
on_message: Arc<dyn Fn(String, Packet) + Send + Sync>,
}
#[derive(Debug, serde::Deserialize)]
pub struct EngineQuery {
#[serde(rename = "EIO")]
pub eio: Option<String>,
pub transport: Option<String>,
pub sid: Option<String>,
}
pub async fn engine_get(
req: HttpRequest,
body: web::Payload,
query: web::Query<EngineQuery>,
store: web::Data<SessionStore>,
config: web::Data<EngineConfig>,
on_message: web::Data<Arc<dyn Fn(String, Packet) + Send + Sync>>,
) -> Result<HttpResponse, actix_web::Error> {
match query.transport.as_deref() {
Some("websocket") => {
crate::engine::websocket::websocket_handler(
req,
body,
web::Query(crate::engine::websocket::WsQuery {
eio: query.eio.clone(),
transport: query.transport.clone(),
sid: query.sid.clone(),
}),
store,
config,
on_message,
)
.await
}
_ => Ok(crate::engine::polling::polling_get(
req,
web::Query(crate::engine::polling::PollingQuery {
eio: query.eio.clone(),
transport: query.transport.clone(),
sid: query.sid.clone(),
}),
store,
config,
on_message,
)
.await),
}
}
impl EngineServer {
pub fn new(
config: EngineConfig,
on_message: impl Fn(String, Packet) + Send + Sync + 'static,
) -> Self {
Self {
config,
store: SessionStore::new(),
on_message: Arc::new(on_message),
}
}
pub fn with_store(
config: EngineConfig,
store: SessionStore,
on_message: impl Fn(String, Packet) + Send + Sync + 'static,
) -> Self {
Self {
config,
store,
on_message: Arc::new(on_message),
}
}
pub async fn run_http(self: Arc<Self>, addr: &str) -> std::io::Result<()> {
let store = self.store.clone();
let config = self.config.clone();
let on_message = self.on_message.clone();
// Start heartbeat manager to clean up stale sessions
let heartbeat = Arc::new(HeartbeatManager::new(
store.clone(),
config.ping_interval,
config.ping_timeout,
));
let heartbeat_handle = heartbeat.start();
tracing::info!(
endpoint = %addr,
"Engine.IO HTTP server listening, /health and /metrics available"
);
let result = HttpServer::new(move || {
App::new()
.app_data(web::Data::new(store.clone()))
.app_data(web::Data::new(config.clone()))
.app_data(web::Data::new(on_message.clone()))
// Health check with connection metrics
.route(
"/health",
web::get().to(crate::engine::health::health_check),
)
// Prometheus metrics endpoint
.route(
"/metrics",
web::get().to(crate::telemetry::metrics::metrics_handler),
)
.route("/engine.io/", web::get().to(engine_get))
.route(
"/engine.io/",
web::post().to(crate::engine::polling::polling_post),
)
})
.bind(addr)?
.run()
.await;
heartbeat_handle.abort();
result
}
pub async fn run_webtransport(
&self,
port: u16,
cert_path: &str,
key_path: &str,
) -> crate::ImksResult<()> {
crate::engine::webtransport::run_webtransport_server(
port,
cert_path,
key_path,
self.store.clone(),
self.config.clone(),
self.on_message.clone(),
)
.await
}
}