0dbac480ae
- Add OpenTelemetry SDK, OTLP exporter, Prometheus integration - Implement connection tracking with active/total/disconnection metrics - Add health endpoint with uptime and connection counts - Integrate tracing spans for socket events and engine messages - Add metrics collection for event handling duration - Update health endpoint to include live runtime state - Add graceful telemetry shutdown in main function - Implement engine session active metrics tracking - Add namespace-specific attributes to connection metrics - Introduce message edit history retrieval endpoint - Add scheduled message CRUD operations and dispatcher - Update Socket.IO event registration with observability - Refactor component update to remove dead code allowance - Add comprehensive environment variables documentation - Implement detailed development guidelines in AGENTS.md
169 lines
4.6 KiB
Rust
169 lines
4.6 KiB
Rust
use std::sync::Arc;
|
|
|
|
use actix_web::{App, HttpRequest, HttpResponse, HttpServer, web};
|
|
|
|
use crate::engine::heartbeat::HeartbeatManager;
|
|
use crate::engine::packet::Packet;
|
|
use crate::engine::session::SessionStore;
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub struct EngineConfig {
|
|
pub ping_interval: u64,
|
|
pub ping_timeout: u64,
|
|
pub max_payload: usize,
|
|
pub path: String,
|
|
}
|
|
|
|
impl Default for EngineConfig {
|
|
fn default() -> Self {
|
|
Self {
|
|
ping_interval: 25000,
|
|
ping_timeout: 20000,
|
|
max_payload: 1_000_000,
|
|
path: "/engine.io/".to_string(),
|
|
}
|
|
}
|
|
}
|
|
|
|
pub struct EngineServer {
|
|
pub config: EngineConfig,
|
|
pub store: SessionStore,
|
|
on_message: Arc<dyn Fn(String, Packet) + Send + Sync>,
|
|
}
|
|
|
|
#[derive(Debug, serde::Deserialize)]
|
|
pub struct EngineQuery {
|
|
#[serde(rename = "EIO")]
|
|
pub eio: Option<String>,
|
|
pub transport: Option<String>,
|
|
pub sid: Option<String>,
|
|
}
|
|
|
|
pub async fn engine_get(
|
|
req: HttpRequest,
|
|
body: web::Payload,
|
|
query: web::Query<EngineQuery>,
|
|
store: web::Data<SessionStore>,
|
|
config: web::Data<EngineConfig>,
|
|
on_message: web::Data<Arc<dyn Fn(String, Packet) + Send + Sync>>,
|
|
) -> Result<HttpResponse, actix_web::Error> {
|
|
match query.transport.as_deref() {
|
|
Some("websocket") => {
|
|
crate::engine::websocket::websocket_handler(
|
|
req,
|
|
body,
|
|
web::Query(crate::engine::websocket::WsQuery {
|
|
eio: query.eio.clone(),
|
|
transport: query.transport.clone(),
|
|
sid: query.sid.clone(),
|
|
}),
|
|
store,
|
|
config,
|
|
on_message,
|
|
)
|
|
.await
|
|
}
|
|
_ => Ok(crate::engine::polling::polling_get(
|
|
req,
|
|
web::Query(crate::engine::polling::PollingQuery {
|
|
eio: query.eio.clone(),
|
|
transport: query.transport.clone(),
|
|
sid: query.sid.clone(),
|
|
}),
|
|
store,
|
|
config,
|
|
on_message,
|
|
)
|
|
.await),
|
|
}
|
|
}
|
|
|
|
impl EngineServer {
|
|
pub fn new(
|
|
config: EngineConfig,
|
|
on_message: impl Fn(String, Packet) + Send + Sync + 'static,
|
|
) -> Self {
|
|
Self {
|
|
config,
|
|
store: SessionStore::new(),
|
|
on_message: Arc::new(on_message),
|
|
}
|
|
}
|
|
|
|
pub fn with_store(
|
|
config: EngineConfig,
|
|
store: SessionStore,
|
|
on_message: impl Fn(String, Packet) + Send + Sync + 'static,
|
|
) -> Self {
|
|
Self {
|
|
config,
|
|
store,
|
|
on_message: Arc::new(on_message),
|
|
}
|
|
}
|
|
|
|
pub async fn run_http(self: Arc<Self>, addr: &str) -> std::io::Result<()> {
|
|
let store = self.store.clone();
|
|
let config = self.config.clone();
|
|
let on_message = self.on_message.clone();
|
|
|
|
// Start heartbeat manager to clean up stale sessions
|
|
let heartbeat = Arc::new(HeartbeatManager::new(
|
|
store.clone(),
|
|
config.ping_interval,
|
|
config.ping_timeout,
|
|
));
|
|
let heartbeat_handle = heartbeat.start();
|
|
|
|
tracing::info!(
|
|
endpoint = %addr,
|
|
"Engine.IO HTTP server listening, /health and /metrics available"
|
|
);
|
|
|
|
let result = HttpServer::new(move || {
|
|
App::new()
|
|
.app_data(web::Data::new(store.clone()))
|
|
.app_data(web::Data::new(config.clone()))
|
|
.app_data(web::Data::new(on_message.clone()))
|
|
// Health check with connection metrics
|
|
.route(
|
|
"/health",
|
|
web::get().to(crate::engine::health::health_check),
|
|
)
|
|
// Prometheus metrics endpoint
|
|
.route(
|
|
"/metrics",
|
|
web::get().to(crate::telemetry::metrics::metrics_handler),
|
|
)
|
|
.route("/engine.io/", web::get().to(engine_get))
|
|
.route(
|
|
"/engine.io/",
|
|
web::post().to(crate::engine::polling::polling_post),
|
|
)
|
|
})
|
|
.bind(addr)?
|
|
.run()
|
|
.await;
|
|
|
|
heartbeat_handle.abort();
|
|
result
|
|
}
|
|
|
|
pub async fn run_webtransport(
|
|
&self,
|
|
port: u16,
|
|
cert_path: &str,
|
|
key_path: &str,
|
|
) -> crate::ImksResult<()> {
|
|
crate::engine::webtransport::run_webtransport_server(
|
|
port,
|
|
cert_path,
|
|
key_path,
|
|
self.store.clone(),
|
|
self.config.clone(),
|
|
self.on_message.clone(),
|
|
)
|
|
.await
|
|
}
|
|
}
|