Files
zhenyi 0dbac480ae feat(telemetry): integrate OpenTelemetry observability stack with health metrics
- Add OpenTelemetry SDK, OTLP exporter, Prometheus integration
- Implement connection tracking with active/total/disconnection metrics
- Add health endpoint with uptime and connection counts
- Integrate tracing spans for socket events and engine messages
- Add metrics collection for event handling duration
- Update health endpoint to include live runtime state
- Add graceful telemetry shutdown in main function
- Implement engine session active metrics tracking
- Add namespace-specific attributes to connection metrics
- Introduce message edit history retrieval endpoint
- Add scheduled message CRUD operations and dispatcher
- Update Socket.IO event registration with observability
- Refactor component update to remove dead code allowance
- Add comprehensive environment variables documentation
- Implement detailed development guidelines in AGENTS.md
2026-06-11 13:53:29 +08:00

204 lines
6.9 KiB
Rust

//! Telemetry module — OpenTelemetry-compatible observability stack.
//!
//! Provides:
//! - **Traces**: distributed tracing via OTLP (gRPC or HTTP) with W3C TraceContext propagation
//! - **Metrics**: Prometheus-compatible metrics exposed at `/metrics`
//! - **Logs**: JSON + console dual output, plus OTLP log export bridge
//! - **Health**: enhanced `/health` endpoint with upstream dependency checks
//!
//! # Quick start
//!
//! ```ignore
//! let guard = telemetry::init();
//! // ... application runs ...
//! drop(guard); // graceful shutdown, flushes all pending telemetry
//! ```
//!
//! # Environment variables
//!
//! | Variable | Default | Description |
//! |---|---|---|
//! | `OTEL_SERVICE_NAME` | `imks` | Service name in traces/metrics/logs |
//! | `OTEL_SERVICE_VERSION` | Cargo version | Service version |
//! | `OTEL_EXPORTER_OTLP_ENDPOINT` | `http://localhost:4317` | OTLP collector endpoint |
//! | `OTEL_EXPORTER_OTLP_PROTOCOL` | `grpc` | `grpc` or `http/protobuf` |
//! | `OTEL_TRACES_ENABLED` | `true` | Enable distributed tracing |
//! | `OTEL_METRICS_ENABLED` | `true` | Enable Prometheus metrics |
//! | `OTEL_LOGS_ENABLED` | `true` | Enable OTLP log export |
//! | `LOG_FORMAT` | `both` | `json`, `pretty`, or `both` |
//! | `RUST_LOG` | `info` | Log level filter |
pub mod config;
pub mod health;
pub mod logs;
pub mod metrics;
pub mod traces;
use opentelemetry_sdk::Resource;
pub use config::TelemetryConfig;
pub use health::{HealthCheckFns, health_check};
pub use metrics::{MetricsInstruments, get as metrics, try_get as try_metrics};
/// Holds all telemetry providers for graceful shutdown.
///
/// When `shutdown()` is called, flushes and shuts down all providers in order:
/// tracer → meter → logger.
pub struct TelemetryGuard {
tracer_provider: Option<opentelemetry_sdk::trace::SdkTracerProvider>,
meter_provider: Option<opentelemetry_sdk::metrics::SdkMeterProvider>,
logger_provider: Option<opentelemetry_sdk::logs::SdkLoggerProvider>,
}
impl TelemetryGuard {
/// Flush all pending telemetry and shut down providers.
///
/// Call this before process exit to avoid data loss.
pub fn shutdown(mut self) {
if let Some(tp) = self.tracer_provider.take()
&& let Ok(rt) = tokio::runtime::Runtime::new()
{
rt.block_on(async {
tp.shutdown().unwrap_or_default();
});
}
if let Some(mp) = self.meter_provider.take()
&& let Ok(rt) = tokio::runtime::Runtime::new()
{
rt.block_on(async {
mp.shutdown().unwrap_or_default();
});
}
if let Some(lp) = self.logger_provider.take()
&& let Ok(rt) = tokio::runtime::Runtime::new()
{
rt.block_on(async {
lp.shutdown().unwrap_or_default();
});
}
}
/// Force-flush all pending trace spans (non-blocking best-effort).
pub fn flush_traces(&self) {
if let Some(ref tp) = self.tracer_provider
&& let Ok(rt) = tokio::runtime::Runtime::new()
{
rt.block_on(async {
tp.force_flush().unwrap_or_default();
});
}
}
/// Force-flush all pending metrics.
pub fn flush_metrics(&self) {
if let Some(ref mp) = self.meter_provider
&& let Ok(rt) = tokio::runtime::Runtime::new()
{
rt.block_on(async {
mp.force_flush().unwrap_or_default();
});
}
}
}
impl Drop for TelemetryGuard {
fn drop(&mut self) {
// Best-effort: the caller should call shutdown() explicitly before process exit
}
}
/// Initialize the full telemetry stack.
///
/// 1. Creates the OTel Resource (service name, version, host)
/// 2. Sets up tracing subscriber with console + JSON + OTel layers
/// 3. Initializes Prometheus metrics
/// 4. Records server start time for uptime tracking
///
/// Returns a `TelemetryGuard` that should be held until process exit.
pub fn init() -> TelemetryGuard {
let config = TelemetryConfig::from_env();
let resource = Resource::builder()
.with_service_name(config.service_name.clone())
.with_attribute(opentelemetry::KeyValue::new(
"service.version",
config.service_version.clone(),
))
.with_attribute(opentelemetry::KeyValue::new(
"deployment.environment",
std::env::var("OTEL_RESOURCE_ATTRIBUTES_DEPLOYMENT")
.unwrap_or_else(|_| "development".to_string()),
))
.build();
// 1. Set up tracing (traces + subscriber)
let (tracer_provider, logger_provider) = if config.traces_enabled {
match traces::init_tracing(&config, &resource) {
Ok((provider, otel_layer)) => {
match logs::init_subscriber(&config, Some(&resource), Some(otel_layer)) {
Ok(logger_provider) => {
tracing::info!(
service = %config.service_name,
endpoint = %config.otlp_endpoint,
protocol = ?config.otlp_protocol,
"OpenTelemetry tracing initialized"
);
(Some(provider), Some(logger_provider))
}
Err(e) => {
tracing::warn!(
"Failed to initialize log bridge: {e}. Tracing still active."
);
(Some(provider), None)
}
}
}
Err(e) => {
tracing::warn!(
"Failed to initialize OTLP tracing: {e}. Using console-only logging."
);
match logs::init_subscriber(&config, Some(&resource), None) {
Ok(lp) => (None, Some(lp)),
Err(_) => {
tracing_subscriber::fmt().init();
(None, None)
}
}
}
}
} else {
match logs::init_subscriber(&config, Some(&resource), None) {
Ok(lp) => (None, Some(lp)),
Err(_) => {
tracing_subscriber::fmt().init();
(None, None)
}
}
};
// 2. Metrics
let meter_provider = if config.metrics_enabled {
match metrics::init_metrics(&config, &resource) {
Ok((provider, _instruments)) => {
tracing::info!("Prometheus metrics initialized (available at /metrics)");
Some(provider)
}
Err(e) => {
tracing::warn!("Failed to initialize Prometheus metrics: {e}");
None
}
}
} else {
None
};
// 3. Record start time for uptime
health::record_start_time();
TelemetryGuard {
tracer_provider,
meter_provider,
logger_provider,
}
}