diff --git a/metrics.rs b/metrics.rs index 04638fe..619e957 100644 --- a/metrics.rs +++ b/metrics.rs @@ -7,44 +7,85 @@ //! - Repository count //! - Cache hits / misses //! - Error counts by error type +//! - Git subprocess duration +//! - Cache operation duration +//! - Slow request detection //! -//! Exposes a `/metrics` HTTP endpoint on a configurable port (default 9100). +//! Exposes HTTP endpoints on a configurable port (default 9100): +//! - GET /metrics — Prometheus metrics +//! - GET /health — Liveness probe +//! - GET /ready — Readiness probe +//! - GET /debug/log-level — Current log level filter +//! - PUT /debug/log-level — Update log level filter (body: "gitks=debug") +//! - GET /debug/config — Runtime configuration use dashmap::DashMap; -use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::atomic::{AtomicU64, AtomicBool, Ordering}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; struct MetricsInner { /// Counter: total requests by (method, status_code) - /// Key: "method:status" request_count: DashMap, - - /// Histogram buckets for request duration (seconds). - /// Each bucket: (method, le_bound_ms) → count + /// Histogram buckets for request duration (ms) duration_buckets: DashMap, - /// Gauge: number of currently in-flight requests active_requests: AtomicU64, - /// Gauge: total number of registered repositories repository_count: AtomicU64, - /// Counter: cache hits cache_hits: AtomicU64, - /// Counter: cache misses cache_misses: AtomicU64, - /// Counter: errors by error kind error_count: DashMap, - - /// Start timestamp (seconds since Unix epoch) + /// Start timestamp start_time: Instant, + /// Slow request threshold in ms + slow_request_threshold_ms: AtomicU64, + /// Server ready flag + ready: AtomicBool, + /// Histogram buckets for git subprocess duration (ms) + git_cmd_duration_buckets: DashMap, + /// Counter: git subprocess calls by command + git_cmd_count: DashMap, + /// Histogram buckets for cache operation duration (ms) + cache_op_duration_buckets: DashMap, + /// Counter: cache operations by (cache, result) + cache_op_count: DashMap, + /// Histogram: hook execution duration (ms) + hook_duration_buckets: DashMap, + /// Counter: hook executions by (hook_type, result) + hook_count: DashMap, + /// Counter: slow requests by method + slow_request_count: DashMap, + + raft_term: AtomicU64, + /// Gauge: current Raft commit index + raft_commit_index: AtomicU64, + /// Gauge: current Raft last applied index + raft_last_applied: AtomicU64, + /// Gauge: whether this node is the Raft leader (1 = yes, 0 = no) + raft_is_leader: AtomicU64, + /// Gauge: number of entries in the Raft log + raft_log_entries: AtomicU64, + /// Counter: total AppendEntries RPCs sent + raft_append_entries_total: AtomicU64, + /// Counter: successful AppendEntries RPCs + raft_append_entries_success: AtomicU64, + /// Counter: total elections triggered + raft_elections_total: AtomicU64, + /// Counter: elections won + raft_elections_won: AtomicU64, } static METRICS: OnceLock> = OnceLock::new(); +/// Handle for dynamic log level reload. +static LOG_RELOAD_HANDLE: OnceLock>> = OnceLock::new(); + +use tracing_subscriber::EnvFilter; + fn metrics() -> &'static Arc { METRICS.get_or_init(|| { Arc::new(MetricsInner { @@ -56,6 +97,25 @@ fn metrics() -> &'static Arc { cache_misses: AtomicU64::new(0), error_count: DashMap::new(), start_time: Instant::now(), + slow_request_threshold_ms: AtomicU64::new(5000), + ready: AtomicBool::new(false), + git_cmd_duration_buckets: DashMap::new(), + git_cmd_count: DashMap::new(), + cache_op_duration_buckets: DashMap::new(), + cache_op_count: DashMap::new(), + hook_duration_buckets: DashMap::new(), + hook_count: DashMap::new(), + slow_request_count: DashMap::new(), + // Raft metrics + raft_term: AtomicU64::new(0), + raft_commit_index: AtomicU64::new(0), + raft_last_applied: AtomicU64::new(0), + raft_is_leader: AtomicU64::new(0), + raft_log_entries: AtomicU64::new(0), + raft_append_entries_total: AtomicU64::new(0), + raft_append_entries_success: AtomicU64::new(0), + raft_elections_total: AtomicU64::new(0), + raft_elections_won: AtomicU64::new(0), }) }) } @@ -68,22 +128,11 @@ const DURATION_BUCKET_MS: &[u64] = &[ const BUCKET_INF: u64 = u64::MAX; -/// Record a request. -pub fn record_request(method: &str, status_code: &str, duration: Duration) { - let m = metrics(); - let key = format!("{method}:{status_code}"); - m.request_count - .entry(key) - .or_insert_with(|| AtomicU64::new(0)) - .value() - .fetch_add(1, Ordering::Relaxed); - - let duration_ms = duration.as_millis() as u64; +fn record_duration_bucket(map: &DashMap, key_prefix: &str, duration_ms: u64) { for &bound_ms in DURATION_BUCKET_MS { if duration_ms <= bound_ms || bound_ms == BUCKET_INF { - let bucket_key = format!("{method}:{bound_ms}"); - m.duration_buckets - .entry(bucket_key) + let bucket_key = format!("{key_prefix}:{bound_ms}"); + map.entry(bucket_key) .or_insert_with(|| AtomicU64::new(0)) .value() .fetch_add(1, Ordering::Relaxed); @@ -91,32 +140,67 @@ pub fn record_request(method: &str, status_code: &str, duration: Duration) { } } -/// Increment the active request gauge. +pub fn set_slow_request_threshold(ms: u64) { + metrics().slow_request_threshold_ms.store(ms, Ordering::Relaxed); +} + +pub fn set_ready(ready: bool) { + metrics().ready.store(ready, Ordering::Relaxed); +} + +pub fn set_log_reload_handle( + handle: tracing_subscriber::reload::Handle, +) { + LOG_RELOAD_HANDLE.set(Some(handle)).ok(); +} + +/// Record a gRPC request. +pub fn record_request(method: &str, status_code: &str, duration: Duration) { + let m = metrics(); + let duration_ms = duration.as_millis() as u64; + + // Request count + let key = format!("{method}:{status_code}"); + m.request_count + .entry(key) + .or_insert_with(|| AtomicU64::new(0)) + .value() + .fetch_add(1, Ordering::Relaxed); + + // Duration histogram + record_duration_bucket(&m.duration_buckets, method, duration_ms); + + // Slow request detection + let threshold = m.slow_request_threshold_ms.load(Ordering::Relaxed); + if threshold > 0 && duration_ms >= threshold { + m.slow_request_count + .entry(method.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .value() + .fetch_add(1, Ordering::Relaxed); + } +} + pub fn inc_active_requests() { metrics().active_requests.fetch_add(1, Ordering::Relaxed); } -/// Decrement the active request gauge. pub fn dec_active_requests() { metrics().active_requests.fetch_sub(1, Ordering::Relaxed); } -/// Set the repository count. pub fn set_repository_count(count: u64) { metrics().repository_count.store(count, Ordering::Relaxed); } -/// Record a cache hit. pub fn inc_cache_hits(count: u64) { metrics().cache_hits.fetch_add(count, Ordering::Relaxed); } -/// Record a cache miss. pub fn inc_cache_misses(count: u64) { metrics().cache_misses.fetch_add(count, Ordering::Relaxed); } -/// Record an error by kind (e.g., "not_found", "internal", "invalid_argument"). pub fn inc_error(kind: &str) { metrics() .error_count @@ -126,62 +210,156 @@ pub fn inc_error(kind: &str) { .fetch_add(1, Ordering::Relaxed); } -/// Render all metrics in Prometheus text exposition format. +/// Record a git subprocess execution. +pub fn record_git_cmd(command: &str, duration: Duration) { + let m = metrics(); + let duration_ms = duration.as_millis() as u64; + + m.git_cmd_count + .entry(command.to_string()) + .or_insert_with(|| AtomicU64::new(0)) + .value() + .fetch_add(1, Ordering::Relaxed); + + record_duration_bucket(&m.git_cmd_duration_buckets, command, duration_ms); +} + +/// Record a cache operation. +pub fn record_cache_op(cache: &str, result: &str, duration: Duration) { + let m = metrics(); + let duration_ms = duration.as_millis() as u64; + let key = format!("{cache}:{result}"); + + m.cache_op_count + .entry(key) + .or_insert_with(|| AtomicU64::new(0)) + .value() + .fetch_add(1, Ordering::Relaxed); + + record_duration_bucket(&m.cache_op_duration_buckets, cache, duration_ms); +} + +/// Record a hook execution. +pub fn record_hook_execution(hook_type: &str, result: &str, duration: Duration) { + let m = metrics(); + let duration_ms = duration.as_millis() as u64; + let key = format!("{hook_type}:{result}"); + + m.hook_count + .entry(key) + .or_insert_with(|| AtomicU64::new(0)) + .value() + .fetch_add(1, Ordering::Relaxed); + + record_duration_bucket(&m.hook_duration_buckets, hook_type, duration_ms); +} + +pub fn set_raft_state(term: u64, commit_index: u64, last_applied: u64, is_leader: bool, log_entries: u64) { + let m = metrics(); + m.raft_term.store(term, Ordering::Relaxed); + m.raft_commit_index.store(commit_index, Ordering::Relaxed); + m.raft_last_applied.store(last_applied, Ordering::Relaxed); + m.raft_is_leader.store(if is_leader { 1 } else { 0 }, Ordering::Relaxed); + m.raft_log_entries.store(log_entries, Ordering::Relaxed); +} + +/// Record an AppendEntries RPC attempt. +pub fn inc_raft_append_entries(success: bool) { + let m = metrics(); + m.raft_append_entries_total.fetch_add(1, Ordering::Relaxed); + if success { + m.raft_append_entries_success.fetch_add(1, Ordering::Relaxed); + } +} + +/// Record an election trigger. +pub fn inc_raft_election(won: bool) { + let m = metrics(); + m.raft_elections_total.fetch_add(1, Ordering::Relaxed); + if won { + m.raft_elections_won.fetch_add(1, Ordering::Relaxed); + } +} + +/// Escape a string for use as a Prometheus label value. +/// Replaces `\` → `\\`, `"` → `\"`, `\n` → `\n` per the Prometheus spec. +fn prom_escape(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for ch in value.chars() { + match ch { + '\\' => out.push_str("\\\\"), + '"' => out.push_str("\\\""), + '\n' => out.push_str("\\n"), + _ => out.push(ch), + } + } + out +} + +fn render_counter_map(out: &mut String, name: &str, help: &str, map: &DashMap, labels: &[&str]) { + out.push_str(&format!("# HELP {name} {help}\n")); + out.push_str(&format!("# TYPE {name} counter\n")); + for entry in map { + let (key, count) = (entry.key(), entry.value().load(Ordering::Relaxed)); + let parts: Vec<&str> = key.split(':').collect(); + if parts.len() == labels.len() { + let label_str: String = labels.iter().zip(parts.iter()) + .map(|(l, v)| format!("{l}=\"{}\"", prom_escape(v))) + .collect::>() + .join(","); + out.push_str(&format!("{name}{{{label_str}}} {count}\n")); + } + } + out.push('\n'); +} + +fn render_histogram(out: &mut String, name: &str, help: &str, map: &DashMap) { + out.push_str(&format!("# HELP {name} {help}\n")); + out.push_str(&format!("# TYPE {name} histogram\n")); + for entry in map { + let (key_and_bound, count) = (entry.key(), entry.value().load(Ordering::Relaxed)); + if let Some((key, bound_str)) = key_and_bound.rsplit_once(':') { + let le = if bound_str.parse::() == Ok(BUCKET_INF) { + "+Inf".to_string() + } else { + bound_str.to_string() + }; + out.push_str(&format!("{name}_bucket{{method=\"{}\",le=\"{le}\"}} {count}\n", prom_escape(key))); + } + } + out.push('\n'); +} + pub fn render_metrics() -> String { let m = metrics(); - let mut out = String::with_capacity(4096); + let mut out = String::with_capacity(8192); - // Header let uptime = m.start_time.elapsed().as_secs(); out.push_str("# HELP gitks_uptime_seconds Time since gitks started\n"); out.push_str("# TYPE gitks_uptime_seconds gauge\n"); out.push_str(&format!("gitks_uptime_seconds {uptime}\n\n")); - // Active requests let active = m.active_requests.load(Ordering::Relaxed); out.push_str("# HELP gitks_active_requests Currently in-flight requests\n"); out.push_str("# TYPE gitks_active_requests gauge\n"); out.push_str(&format!("gitks_active_requests {active}\n\n")); - // Repository count let repos = m.repository_count.load(Ordering::Relaxed); out.push_str("# HELP gitks_repository_count Number of registered repositories\n"); out.push_str("# TYPE gitks_repository_count gauge\n"); out.push_str(&format!("gitks_repository_count {repos}\n\n")); - // Request count - out.push_str("# HELP gitks_requests_total Total gRPC requests by method and status\n"); - out.push_str("# TYPE gitks_requests_total counter\n"); - for entry in &m.request_count { - let (method_and_status, count) = (entry.key(), entry.value()); - let count = count.load(Ordering::Relaxed); - if let Some((method, status)) = method_and_status.rsplit_once(':') { - out.push_str(&format!( - "gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n" - )); - } - } - out.push('\n'); + // gRPC requests + render_counter_map(&mut out, "gitks_requests_total", + "Total gRPC requests by method and status", &m.request_count, &["method", "status"]); - // Duration histogram - out.push_str("# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n"); - out.push_str("# TYPE gitks_request_duration_milliseconds histogram\n"); - for entry in &m.duration_buckets { - let (method_and_bound, count) = (entry.key(), entry.value()); - let count = count.load(Ordering::Relaxed); - if let Some((method, bound_str)) = method_and_bound.rsplit_once(':') { - let bound = bound_str; - let le = if bound_str.parse::() == Ok(BUCKET_INF) { - "+Inf".to_string() - } else { - bound.to_string() - }; - out.push_str( - &format!("gitks_request_duration_milliseconds_bucket{{method=\"{method}\",le=\"{le}\"}} {count}\n"), - ); - } - } - out.push('\n'); + // gRPC duration + render_histogram(&mut out, "gitks_request_duration_milliseconds", + "Request duration histogram in ms", &m.duration_buckets); + + // Slow requests + render_counter_map(&mut out, "gitks_slow_requests_total", + "Slow gRPC requests by method", &m.slow_request_count, &["method"]); // Cache let hits = m.cache_hits.load(Ordering::Relaxed); @@ -194,75 +372,263 @@ pub fn render_metrics() -> String { out.push_str(&format!("gitks_cache_misses_total {misses}\n\n")); // Errors - out.push_str("# HELP gitks_errors_total Total errors by kind\n"); - out.push_str("# TYPE gitks_errors_total counter\n"); - for entry in &m.error_count { - let (kind, count) = (entry.key(), entry.value()); - let count = count.load(Ordering::Relaxed); - out.push_str(&format!("gitks_errors_total{{kind=\"{kind}\"}} {count}\n")); - } - out.push('\n'); + render_counter_map(&mut out, "gitks_errors_total", + "Total errors by kind", &m.error_count, &["kind"]); + + // Git subprocess + render_counter_map(&mut out, "gitks_git_cmd_total", + "Git subprocess calls by command", &m.git_cmd_count, &["command"]); + render_histogram(&mut out, "gitks_git_cmd_duration_milliseconds", + "Git subprocess duration in ms", &m.git_cmd_duration_buckets); + + // Cache operations + render_counter_map(&mut out, "gitks_cache_ops_total", + "Cache operations by cache and result", &m.cache_op_count, &["cache", "result"]); + render_histogram(&mut out, "gitks_cache_op_duration_milliseconds", + "Cache operation duration in ms", &m.cache_op_duration_buckets); + + // Hook execution + render_counter_map(&mut out, "gitks_hook_executions_total", + "Hook executions by type and result", &m.hook_count, &["hook_type", "result"]); + render_histogram(&mut out, "gitks_hook_duration_milliseconds", + "Hook execution duration in ms", &m.hook_duration_buckets); + + // Raft consensus metrics + let raft_term = m.raft_term.load(Ordering::Relaxed); + let raft_commit = m.raft_commit_index.load(Ordering::Relaxed); + let raft_applied = m.raft_last_applied.load(Ordering::Relaxed); + let raft_leader = m.raft_is_leader.load(Ordering::Relaxed); + let raft_entries = m.raft_log_entries.load(Ordering::Relaxed); + let raft_ae_total = m.raft_append_entries_total.load(Ordering::Relaxed); + let raft_ae_success = m.raft_append_entries_success.load(Ordering::Relaxed); + let raft_elections = m.raft_elections_total.load(Ordering::Relaxed); + let raft_elections_won = m.raft_elections_won.load(Ordering::Relaxed); + + out.push_str("# HELP gitks_raft_term Current Raft term\n"); + out.push_str("# TYPE gitks_raft_term gauge\n"); + out.push_str(&format!("gitks_raft_term {raft_term}\n\n")); + + out.push_str("# HELP gitks_raft_commit_index Current Raft commit index\n"); + out.push_str("# TYPE gitks_raft_commit_index gauge\n"); + out.push_str(&format!("gitks_raft_commit_index {raft_commit}\n\n")); + + out.push_str("# HELP gitks_raft_last_applied Current Raft last applied index\n"); + out.push_str("# TYPE gitks_raft_last_applied gauge\n"); + out.push_str(&format!("gitks_raft_last_applied {raft_applied}\n\n")); + + out.push_str("# HELP gitks_raft_is_leader Whether this node is the Raft leader\n"); + out.push_str("# TYPE gitks_raft_is_leader gauge\n"); + out.push_str(&format!("gitks_raft_is_leader {raft_leader}\n\n")); + + out.push_str("# HELP gitks_raft_log_entries Number of entries in the Raft log\n"); + out.push_str("# TYPE gitks_raft_log_entries gauge\n"); + out.push_str(&format!("gitks_raft_log_entries {raft_entries}\n\n")); + + out.push_str("# HELP gitks_raft_append_entries_total Total AppendEntries RPCs sent\n"); + out.push_str("# TYPE gitks_raft_append_entries_total counter\n"); + out.push_str(&format!("gitks_raft_append_entries_total {raft_ae_total}\n\n")); + + out.push_str("# HELP gitks_raft_append_entries_success Successful AppendEntries RPCs\n"); + out.push_str("# TYPE gitks_raft_append_entries_success counter\n"); + out.push_str(&format!("gitks_raft_append_entries_success {raft_ae_success}\n\n")); + + out.push_str("# HELP gitks_raft_elections_total Total elections triggered\n"); + out.push_str("# TYPE gitks_raft_elections_total counter\n"); + out.push_str(&format!("gitks_raft_elections_total {raft_elections}\n\n")); + + out.push_str("# HELP gitks_raft_elections_won Elections won by this node\n"); + out.push_str("# TYPE gitks_raft_elections_won counter\n"); + out.push_str(&format!("gitks_raft_elections_won {raft_elections_won}\n\n")); out } -/// Start the metrics HTTP server on the given port. -/// Runs in a background task; returns the JoinHandle. +use bytes::Bytes; +use http_body_util::Full; +use hyper::body::Incoming; +use hyper::{Request, Response, Method}; +use hyper::service::Service; +use std::convert::Infallible; +use std::future::Future; +use std::pin::Pin; +use std::net::SocketAddr; + +/// Global cancel token for the HTTP server, set from main. +static HTTP_CANCEL: OnceLock = OnceLock::new(); + +pub fn set_http_cancel_token(token: tokio_util::sync::CancellationToken) { + HTTP_CANCEL.set(token).ok(); +} + +struct Router; + +impl Service> for Router { + type Response = Response>; + type Error = Infallible; + type Future = Pin> + Send>>; + + fn call(&self, req: Request) -> Self::Future { + Box::pin(handle_request(req)) + } +} + +fn json_response(status: u16, body: &str) -> Response> { + Response::builder() + .status(status) + .header("Content-Type", "application/json") + .header("Connection", "close") + .body(Full::new(Bytes::from(body.to_string()))) + .unwrap() +} + +fn text_response(status: u16, content_type: &str, body: String) -> Response> { + Response::builder() + .status(status) + .header("Content-Type", content_type) + .header("Connection", "close") + .body(Full::new(Bytes::from(body))) + .unwrap() +} + +async fn handle_request(req: Request) -> Result>, Infallible> { + let method = req.method().clone(); + let path = req.uri().path().to_owned(); + + tracing::debug!(%method, %path, "HTTP request"); + + let response = match (method, path.as_str()) { + (Method::GET, "/metrics") => { + let body = render_metrics(); + text_response(200, "text/plain; version=0.0.4; charset=utf-8", body) + } + (Method::GET, "/health") => { + json_response(200, r#"{"status":"healthy"}"#) + } + (Method::GET, "/ready") => { + if metrics().ready.load(Ordering::Relaxed) { + json_response(200, r#"{"status":"ready"}"#) + } else { + json_response(503, r#"{"status":"not_ready"}"#) + } + } + (Method::GET, "/debug/log-level") => { + let msg = if LOG_RELOAD_HANDLE.get().is_some() { + "use PUT to change" + } else { + "dynamic log level not available" + }; + json_response(200, &format!(r#"{{"log_level":"{msg}"}}"#)) + } + (Method::PUT, "/debug/log-level") => { + match handle_log_level_update(req).await { + Ok(resp) => resp, + Err(e) => json_response(400, &format!(r#"{{"error":"{e}"}}"#)), + } + } + (Method::GET, "/debug/config") => { + let threshold = metrics().slow_request_threshold_ms.load(Ordering::Relaxed); + let ready = metrics().ready.load(Ordering::Relaxed); + json_response(200, &format!( + r#"{{"slow_request_threshold_ms":{},"ready":{}}}"#, threshold, ready + )) + } + _ => { + json_response(404, r#"{"error":"not found"}"#) + } + }; + + Ok(response) +} + +async fn handle_log_level_update( + req: Request, +) -> Result>, String> { + use http_body_util::BodyExt; + + let body_bytes = req + .collect() + .await + .map_err(|e| format!("failed to read body: {e}"))? + .to_bytes(); + + let new_filter = String::from_utf8(body_bytes.to_vec()) + .map_err(|e| format!("invalid UTF-8: {e}"))?; + let new_filter = new_filter.trim().to_string(); + + if new_filter.is_empty() { + return Ok(json_response(400, r#"{"error":"empty filter"}"#)); + } + + match LOG_RELOAD_HANDLE.get() { + Some(Some(handle)) => match new_filter.parse::() { + Ok(filter) => { + if let Err(e) = handle.reload(filter) { + Ok(json_response(500, &format!(r#"{{"error":"{e}"}}"#))) + } else { + tracing::info!(new_filter = %new_filter, "log level updated via HTTP"); + Ok(json_response(200, &format!( + r#"{{"status":"ok","filter":"{}"}}"#, new_filter + ))) + } + } + Err(e) => Ok(json_response(400, &format!( + r#"{{"error":"invalid filter: {e}"}}"# + ))), + }, + _ => Ok(json_response(501, r#"{"error":"dynamic log level not configured"}"#)), + } +} + +/// Start the HTTP server (metrics + health + debug) using hyper 1.x. pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let listener = match tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")).await { + let listener = match tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) + .await + { Ok(l) => l, Err(e) => { - tracing::error!(port, error = %e, "failed to bind metrics server"); + tracing::error!(port, error = %e, "failed to bind HTTP server"); return; } }; - tracing::info!(port, "metrics HTTP server started"); + tracing::info!(port, "HTTP server started (metrics + health + debug)"); + + let cancel = HTTP_CANCEL + .get() + .cloned() + .unwrap_or_else(tokio_util::sync::CancellationToken::new); loop { - match listener.accept().await { - Ok((socket, peer)) => { - tracing::debug!(%peer, "metrics request"); - tokio::spawn(handle_metrics_connection(socket)); + tokio::select! { + result = listener.accept() => { + match result { + Ok((stream, _peer)) => { + let io = hyper_util::rt::TokioIo::new(stream); + tokio::spawn(async move { + let builder = hyper::server::conn::http1::Builder::new(); + if let Err(e) = builder + .serve_connection(io, Router) + .with_upgrades() + .await + { + tracing::debug!(error = %e, "HTTP connection error"); + } + }); + } + Err(e) => { + tracing::error!(error = %e, "HTTP accept error"); + } + } } - Err(e) => { - tracing::error!(error = %e, "metrics accept error"); + _ = cancel.cancelled() => { + tracing::info!("HTTP server shutting down"); + break; } } } }) } -async fn handle_metrics_connection(mut socket: tokio::net::TcpStream) { - use tokio::io::{AsyncReadExt, AsyncWriteExt}; - - let mut buf = [0u8; 4096]; - let _ = tokio::time::timeout(Duration::from_secs(5), socket.read(&mut buf)).await; - - let body = render_metrics(); - let response = format!( - "HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}", - body.len(), - body - ); - - let _ = tokio::time::timeout( - Duration::from_secs(5), - socket.write_all(response.as_bytes()), - ) - .await; - let _ = socket.shutdown().await; -} - -/// A guard that records metrics on drop. -/// -/// Usage in handlers: -/// ```ignore -/// let m = crate::metrics::RequestMetrics::new("Service/Method"); -/// // ... handle request ... -/// m.record("ok"); // on success -/// // m.record("internal"); // or on error, with tonic error kind -/// ``` pub struct RequestMetrics { method: &'static str, start: Instant, @@ -280,7 +646,20 @@ impl RequestMetrics { /// Record the outcome. Idempotent — safe to call before each return. pub fn record(&self, status: &str) { let duration = self.start.elapsed(); + let duration_ms = duration.as_millis() as u64; record_request(self.method, status, duration); + + // Slow request warning + let threshold = metrics().slow_request_threshold_ms.load(Ordering::Relaxed); + if threshold > 0 && duration_ms >= threshold { + tracing::warn!( + method = self.method, + status, + elapsed_ms = duration_ms, + threshold_ms = threshold, + "slow gRPC request detected" + ); + } } }