6f40921576
- Replace explicit match statement with unwrap_or_else for cleaner error handling - Maintain same error logging behavior when response building fails - Reduce code complexity and improve readability - Keep identical fallback response creation on builder errors
771 lines
25 KiB
Rust
771 lines
25 KiB
Rust
//! Copyright (c) 2022-2026 GitDataAi All rights reserved.
|
|
|
|
//! Prometheus-compatible metrics for GitKS.
|
|
//!
|
|
//! Tracks:
|
|
//! - Request counts by gRPC method + status code
|
|
//! - Request duration histogram by method
|
|
//! - Active requests gauge
|
|
//! - Repository count
|
|
//! - Cache hits / misses
|
|
//! - Error counts by error type
|
|
//! - Git subprocess duration
|
|
//! - Cache operation duration
|
|
//! - Slow request detection
|
|
//!
|
|
//! Exposes HTTP endpoints on a configurable port (default 9100):
|
|
//! - GET /metrics — Prometheus metrics
|
|
//! - GET /health — Liveness probe
|
|
//! - GET /ready — Readiness probe
|
|
//! - GET /debug/log-level — Current log level filter
|
|
//! - PUT /debug/log-level — Update log level filter (body: "gitks=debug")
|
|
//! - GET /debug/config — Runtime configuration
|
|
|
|
use dashmap::DashMap;
|
|
use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
|
|
use std::sync::{Arc, OnceLock};
|
|
use std::time::{Duration, Instant};
|
|
|
|
struct MetricsInner {
|
|
/// Counter: total requests by (method, status_code)
|
|
request_count: DashMap<String, AtomicU64>,
|
|
/// Histogram buckets for request duration (ms)
|
|
duration_buckets: DashMap<String, AtomicU64>,
|
|
/// Gauge: number of currently in-flight requests
|
|
active_requests: AtomicU64,
|
|
/// Gauge: total number of registered repositories
|
|
repository_count: AtomicU64,
|
|
/// Counter: cache hits
|
|
cache_hits: AtomicU64,
|
|
/// Counter: cache misses
|
|
cache_misses: AtomicU64,
|
|
/// Counter: errors by error kind
|
|
error_count: DashMap<String, AtomicU64>,
|
|
/// Start timestamp
|
|
start_time: Instant,
|
|
/// Slow request threshold in ms
|
|
slow_request_threshold_ms: AtomicU64,
|
|
/// Server ready flag
|
|
ready: AtomicBool,
|
|
/// Histogram buckets for git subprocess duration (ms)
|
|
git_cmd_duration_buckets: DashMap<String, AtomicU64>,
|
|
/// Counter: git subprocess calls by command
|
|
git_cmd_count: DashMap<String, AtomicU64>,
|
|
/// Histogram buckets for cache operation duration (ms)
|
|
cache_op_duration_buckets: DashMap<String, AtomicU64>,
|
|
/// Counter: cache operations by (cache, result)
|
|
cache_op_count: DashMap<String, AtomicU64>,
|
|
/// Histogram: hook execution duration (ms)
|
|
hook_duration_buckets: DashMap<String, AtomicU64>,
|
|
/// Counter: hook executions by (hook_type, result)
|
|
hook_count: DashMap<String, AtomicU64>,
|
|
/// Counter: slow requests by method
|
|
slow_request_count: DashMap<String, AtomicU64>,
|
|
/// Counter: cache evictions by (cause, namespace)
|
|
cache_eviction_count: DashMap<String, AtomicU64>,
|
|
/// Counter: cache hits by namespace
|
|
cache_hit_by_namespace: DashMap<String, AtomicU64>,
|
|
/// Counter: cache misses by namespace
|
|
cache_miss_by_namespace: DashMap<String, AtomicU64>,
|
|
/// Histogram: cache value size in bytes
|
|
cache_value_size_buckets: DashMap<String, AtomicU64>,
|
|
/// Counter: rate-limit rejections by repository
|
|
rate_limit_reject_count: DashMap<String, AtomicU64>,
|
|
/// Counter: rate-limit acquires by repository
|
|
rate_limit_acquire_count: DashMap<String, AtomicU64>,
|
|
}
|
|
|
|
static METRICS: OnceLock<Arc<MetricsInner>> = OnceLock::new();
|
|
|
|
/// Handle for dynamic log level reload.
|
|
static LOG_RELOAD_HANDLE: OnceLock<
|
|
Option<tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>>,
|
|
> = OnceLock::new();
|
|
|
|
use tracing_subscriber::EnvFilter;
|
|
|
|
fn metrics() -> &'static Arc<MetricsInner> {
|
|
METRICS.get_or_init(|| {
|
|
Arc::new(MetricsInner {
|
|
request_count: DashMap::new(),
|
|
duration_buckets: DashMap::new(),
|
|
active_requests: AtomicU64::new(0),
|
|
repository_count: AtomicU64::new(0),
|
|
cache_hits: AtomicU64::new(0),
|
|
cache_misses: AtomicU64::new(0),
|
|
error_count: DashMap::new(),
|
|
start_time: Instant::now(),
|
|
slow_request_threshold_ms: AtomicU64::new(5000),
|
|
ready: AtomicBool::new(false),
|
|
git_cmd_duration_buckets: DashMap::new(),
|
|
git_cmd_count: DashMap::new(),
|
|
cache_op_duration_buckets: DashMap::new(),
|
|
cache_op_count: DashMap::new(),
|
|
hook_duration_buckets: DashMap::new(),
|
|
hook_count: DashMap::new(),
|
|
slow_request_count: DashMap::new(),
|
|
cache_eviction_count: DashMap::new(),
|
|
cache_hit_by_namespace: DashMap::new(),
|
|
cache_miss_by_namespace: DashMap::new(),
|
|
cache_value_size_buckets: DashMap::new(),
|
|
rate_limit_reject_count: DashMap::new(),
|
|
rate_limit_acquire_count: DashMap::new(),
|
|
})
|
|
})
|
|
}
|
|
|
|
#[rustfmt::skip]
|
|
const DURATION_BUCKET_MS: &[u64] = &[
|
|
5, 10, 25, 50, 100, 250, 500, 1_000,
|
|
2_500, 5_000, 10_000, 30_000, 60_000, u64::MAX,
|
|
];
|
|
|
|
const BUCKET_INF: u64 = u64::MAX;
|
|
|
|
fn record_duration_bucket(map: &DashMap<String, AtomicU64>, key_prefix: &str, duration_ms: u64) {
|
|
for &bound_ms in DURATION_BUCKET_MS {
|
|
if duration_ms <= bound_ms || bound_ms == BUCKET_INF {
|
|
let bucket_key = format!("{key_prefix}:{bound_ms}");
|
|
map.entry(bucket_key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn set_slow_request_threshold(ms: u64) {
|
|
metrics()
|
|
.slow_request_threshold_ms
|
|
.store(ms, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn set_ready(ready: bool) {
|
|
metrics().ready.store(ready, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn set_log_reload_handle(
|
|
handle: tracing_subscriber::reload::Handle<EnvFilter, tracing_subscriber::Registry>,
|
|
) {
|
|
LOG_RELOAD_HANDLE.set(Some(handle)).ok();
|
|
}
|
|
|
|
/// Record a gRPC request.
|
|
pub fn record_request(method: &str, status_code: &str, duration: Duration) {
|
|
let m = metrics();
|
|
let duration_ms = duration.as_millis() as u64;
|
|
|
|
let key = format!("{method}:{status_code}");
|
|
m.request_count
|
|
.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
record_duration_bucket(&m.duration_buckets, method, duration_ms);
|
|
|
|
let threshold = m.slow_request_threshold_ms.load(Ordering::Relaxed);
|
|
if threshold > 0 && duration_ms >= threshold {
|
|
m.slow_request_count
|
|
.entry(method.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
|
|
pub fn inc_active_requests() {
|
|
metrics().active_requests.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn dec_active_requests() {
|
|
metrics().active_requests.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn set_repository_count(count: u64) {
|
|
metrics().repository_count.store(count, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn inc_cache_hits(count: u64) {
|
|
metrics().cache_hits.fetch_add(count, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn inc_cache_misses(count: u64) {
|
|
metrics().cache_misses.fetch_add(count, Ordering::Relaxed);
|
|
}
|
|
|
|
pub fn inc_error(kind: &str) {
|
|
metrics()
|
|
.error_count
|
|
.entry(kind.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a git subprocess execution.
|
|
pub fn record_git_cmd(command: &str, duration: Duration) {
|
|
let m = metrics();
|
|
let duration_ms = duration.as_millis() as u64;
|
|
|
|
m.git_cmd_count
|
|
.entry(command.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
record_duration_bucket(&m.git_cmd_duration_buckets, command, duration_ms);
|
|
}
|
|
|
|
/// Record a cache operation.
|
|
pub fn record_cache_op(cache: &str, result: &str, duration: Duration) {
|
|
let m = metrics();
|
|
let duration_ms = duration.as_millis() as u64;
|
|
let key = format!("{cache}:{result}");
|
|
|
|
m.cache_op_count
|
|
.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
record_duration_bucket(&m.cache_op_duration_buckets, cache, duration_ms);
|
|
}
|
|
|
|
/// Record a cache entry eviction.
|
|
pub fn record_cache_eviction(namespace: &str, cause: &str) {
|
|
let m = metrics();
|
|
let key = format!("{cause}:{namespace}");
|
|
m.cache_eviction_count
|
|
.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a per-namespace cache hit.
|
|
pub fn record_cache_hit_ns(namespace: &str) {
|
|
metrics()
|
|
.cache_hit_by_namespace
|
|
.entry(namespace.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a per-namespace cache miss.
|
|
pub fn record_cache_miss_ns(namespace: &str) {
|
|
metrics()
|
|
.cache_miss_by_namespace
|
|
.entry(namespace.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a hook execution.
|
|
pub fn record_hook_execution(hook_type: &str, result: &str, duration: Duration) {
|
|
let m = metrics();
|
|
let duration_ms = duration.as_millis() as u64;
|
|
let key = format!("{hook_type}:{result}");
|
|
|
|
m.hook_count
|
|
.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
record_duration_bucket(&m.hook_duration_buckets, hook_type, duration_ms);
|
|
}
|
|
|
|
/// Record cache value size distribution (in bytes).
|
|
pub fn record_cache_value_size(namespace: &str, size: usize) {
|
|
let m = metrics();
|
|
record_size_bucket(&m.cache_value_size_buckets, namespace, size as u64);
|
|
}
|
|
|
|
/// Record a rate-limit rejection event.
|
|
pub fn record_rate_limit_reject(repo: &str) {
|
|
let m = metrics();
|
|
m.rate_limit_reject_count
|
|
.entry(repo.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a rate-limit acquire event.
|
|
pub fn record_rate_limit_acquire(repo: &str) {
|
|
let m = metrics();
|
|
m.rate_limit_acquire_count
|
|
.entry(repo.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record size distribution buckets (log2-based: 1KB, 4KB, 16KB, ..., 1GB).
|
|
fn record_size_bucket(map: &DashMap<String, AtomicU64>, label: &str, size: u64) {
|
|
let buckets = [1024, 4096, 16384, 65536, 262144, 1048576, 4194304, 16777216, 67108864, 268435456, 1073741824];
|
|
for &bound in &buckets {
|
|
let key = format!("{label}:le_{bound}");
|
|
if size <= bound {
|
|
map.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Escape a string for use as a Prometheus label value.
|
|
/// Replaces `\` → `\\`, `"` → `\"`, `\n` → `\n` per the Prometheus spec.
|
|
fn prom_escape(value: &str) -> String {
|
|
let mut out = String::with_capacity(value.len());
|
|
for ch in value.chars() {
|
|
match ch {
|
|
'\\' => out.push_str("\\\\"),
|
|
'"' => out.push_str("\\\""),
|
|
'\n' => out.push_str("\\n"),
|
|
_ => out.push(ch),
|
|
}
|
|
}
|
|
out
|
|
}
|
|
|
|
fn render_counter_map(
|
|
out: &mut String,
|
|
name: &str,
|
|
help: &str,
|
|
map: &DashMap<String, AtomicU64>,
|
|
labels: &[&str],
|
|
) {
|
|
out.push_str(&format!("# HELP {name} {help}\n"));
|
|
out.push_str(&format!("# TYPE {name} counter\n"));
|
|
for entry in map {
|
|
let (key, count) = (entry.key(), entry.value().load(Ordering::Relaxed));
|
|
let parts: Vec<&str> = key.split(':').collect();
|
|
if parts.len() == labels.len() {
|
|
let label_str: String = labels
|
|
.iter()
|
|
.zip(parts.iter())
|
|
.map(|(l, v)| format!("{l}=\"{}\"", prom_escape(v)))
|
|
.collect::<Vec<_>>()
|
|
.join(",");
|
|
out.push_str(&format!("{name}{{{label_str}}} {count}\n"));
|
|
}
|
|
}
|
|
out.push('\n');
|
|
}
|
|
|
|
fn render_histogram(out: &mut String, name: &str, help: &str, map: &DashMap<String, AtomicU64>) {
|
|
out.push_str(&format!("# HELP {name} {help}\n"));
|
|
out.push_str(&format!("# TYPE {name} histogram\n"));
|
|
for entry in map {
|
|
let (key_and_bound, count) = (entry.key(), entry.value().load(Ordering::Relaxed));
|
|
if let Some((key, bound_str)) = key_and_bound.rsplit_once(':') {
|
|
let le = if bound_str.parse::<u64>() == Ok(BUCKET_INF) {
|
|
"+Inf".to_string()
|
|
} else {
|
|
bound_str.to_string()
|
|
};
|
|
out.push_str(&format!(
|
|
"{name}_bucket{{method=\"{}\",le=\"{le}\"}} {count}\n",
|
|
prom_escape(key)
|
|
));
|
|
}
|
|
}
|
|
out.push('\n');
|
|
}
|
|
|
|
pub fn render_metrics() -> String {
|
|
let m = metrics();
|
|
let mut out = String::with_capacity(8192);
|
|
|
|
let uptime = m.start_time.elapsed().as_secs();
|
|
out.push_str("# HELP gitks_uptime_seconds Time since gitks started\n");
|
|
out.push_str("# TYPE gitks_uptime_seconds gauge\n");
|
|
out.push_str(&format!("gitks_uptime_seconds {uptime}\n\n"));
|
|
|
|
let active = m.active_requests.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_active_requests Currently in-flight requests\n");
|
|
out.push_str("# TYPE gitks_active_requests gauge\n");
|
|
out.push_str(&format!("gitks_active_requests {active}\n\n"));
|
|
|
|
let repos = m.repository_count.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_repository_count Number of registered repositories\n");
|
|
out.push_str("# TYPE gitks_repository_count gauge\n");
|
|
out.push_str(&format!("gitks_repository_count {repos}\n\n"));
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_requests_total",
|
|
"Total gRPC requests by method and status",
|
|
&m.request_count,
|
|
&["method", "status"],
|
|
);
|
|
|
|
render_histogram(
|
|
&mut out,
|
|
"gitks_request_duration_milliseconds",
|
|
"Request duration histogram in ms",
|
|
&m.duration_buckets,
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_slow_requests_total",
|
|
"Slow gRPC requests by method",
|
|
&m.slow_request_count,
|
|
&["method"],
|
|
);
|
|
|
|
let hits = m.cache_hits.load(Ordering::Relaxed);
|
|
let misses = m.cache_misses.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_cache_hits_total Cache hit count\n");
|
|
out.push_str("# TYPE gitks_cache_hits_total counter\n");
|
|
out.push_str(&format!("gitks_cache_hits_total {hits}\n\n"));
|
|
out.push_str("# HELP gitks_cache_misses_total Cache miss count\n");
|
|
out.push_str("# TYPE gitks_cache_misses_total counter\n");
|
|
out.push_str(&format!("gitks_cache_misses_total {misses}\n\n"));
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_errors_total",
|
|
"Total errors by kind",
|
|
&m.error_count,
|
|
&["kind"],
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_git_cmd_total",
|
|
"Git subprocess calls by command",
|
|
&m.git_cmd_count,
|
|
&["command"],
|
|
);
|
|
render_histogram(
|
|
&mut out,
|
|
"gitks_git_cmd_duration_milliseconds",
|
|
"Git subprocess duration in ms",
|
|
&m.git_cmd_duration_buckets,
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_cache_ops_total",
|
|
"Cache operations by cache and result",
|
|
&m.cache_op_count,
|
|
&["cache", "result"],
|
|
);
|
|
render_histogram(
|
|
&mut out,
|
|
"gitks_cache_op_duration_milliseconds",
|
|
"Cache operation duration in ms",
|
|
&m.cache_op_duration_buckets,
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_cache_evictions_total",
|
|
"Cache evictions by cause and namespace",
|
|
&m.cache_eviction_count,
|
|
&["cause", "namespace"],
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_cache_hits_by_namespace_total",
|
|
"Cache hits by namespace",
|
|
&m.cache_hit_by_namespace,
|
|
&["namespace"],
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_cache_misses_by_namespace_total",
|
|
"Cache misses by namespace",
|
|
&m.cache_miss_by_namespace,
|
|
&["namespace"],
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_hook_executions_total",
|
|
"Hook executions by type and result",
|
|
&m.hook_count,
|
|
&["hook_type", "result"],
|
|
);
|
|
render_histogram(
|
|
&mut out,
|
|
"gitks_hook_duration_milliseconds",
|
|
"Hook execution duration in ms",
|
|
&m.hook_duration_buckets,
|
|
);
|
|
|
|
render_histogram(
|
|
&mut out,
|
|
"gitks_cache_value_size_bytes",
|
|
"Cache value size distribution in bytes",
|
|
&m.cache_value_size_buckets,
|
|
);
|
|
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_rate_limit_rejects_total",
|
|
"Rate-limit rejections by repository",
|
|
&m.rate_limit_reject_count,
|
|
&["repo"],
|
|
);
|
|
render_counter_map(
|
|
&mut out,
|
|
"gitks_rate_limit_acquires_total",
|
|
"Rate-limit acquires by repository",
|
|
&m.rate_limit_acquire_count,
|
|
&["repo"],
|
|
);
|
|
|
|
out
|
|
}
|
|
|
|
use bytes::Bytes;
|
|
use http_body_util::Full;
|
|
use hyper::body::Incoming;
|
|
use hyper::service::Service;
|
|
use hyper::{Method, Request, Response};
|
|
use std::convert::Infallible;
|
|
use std::future::Future;
|
|
use std::net::SocketAddr;
|
|
use std::pin::Pin;
|
|
|
|
/// Global cancel token for the HTTP server, set from main.
|
|
static HTTP_CANCEL: OnceLock<tokio_util::sync::CancellationToken> = OnceLock::new();
|
|
|
|
pub fn set_http_cancel_token(token: tokio_util::sync::CancellationToken) {
|
|
HTTP_CANCEL.set(token).ok();
|
|
}
|
|
|
|
struct Router;
|
|
|
|
impl Service<Request<Incoming>> for Router {
|
|
type Response = Response<Full<Bytes>>;
|
|
type Error = Infallible;
|
|
type Future = Pin<Box<dyn Future<Output = Result<Self::Response, Self::Error>> + Send>>;
|
|
|
|
fn call(&self, req: Request<Incoming>) -> Self::Future {
|
|
Box::pin(handle_request(req))
|
|
}
|
|
}
|
|
|
|
fn json_response(status: u16, body: &str) -> Response<Full<Bytes>> {
|
|
match Response::builder()
|
|
.status(status)
|
|
.header("Content-Type", "application/json")
|
|
.header("Connection", "close")
|
|
.body(Full::new(Bytes::from(body.to_string())))
|
|
{
|
|
Ok(response) => response,
|
|
Err(err) => {
|
|
tracing::error!(error = %err, "failed to build JSON response");
|
|
Response::new(Full::new(Bytes::from_static(
|
|
br#"{"error":"response build failed"}"#,
|
|
)))
|
|
}
|
|
}
|
|
}
|
|
|
|
fn text_response(status: u16, content_type: &str, body: String) -> Response<Full<Bytes>> {
|
|
Response::builder()
|
|
.status(status)
|
|
.header("Content-Type", content_type)
|
|
.header("Connection", "close")
|
|
.body(Full::new(Bytes::from(body))).unwrap_or_else(|err| {
|
|
tracing::error!(error = %err, "failed to build text response");
|
|
Response::new(Full::new(Bytes::from_static(b"response build failed")))
|
|
})
|
|
}
|
|
|
|
async fn handle_request(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, Infallible> {
|
|
let method = req.method().clone();
|
|
let path = req.uri().path().to_owned();
|
|
|
|
tracing::debug!(%method, %path, "HTTP request");
|
|
|
|
let response = match (method, path.as_str()) {
|
|
(Method::GET, "/metrics") => {
|
|
let body = render_metrics();
|
|
text_response(200, "text/plain; version=0.0.4; charset=utf-8", body)
|
|
}
|
|
(Method::GET, "/health") => json_response(200, r#"{"status":"healthy"}"#),
|
|
(Method::GET, "/ready") => {
|
|
if metrics().ready.load(Ordering::Relaxed) {
|
|
json_response(200, r#"{"status":"ready"}"#)
|
|
} else {
|
|
json_response(503, r#"{"status":"not_ready"}"#)
|
|
}
|
|
}
|
|
(Method::GET, "/debug/log-level") => {
|
|
let msg = if LOG_RELOAD_HANDLE.get().is_some() {
|
|
"use PUT to change"
|
|
} else {
|
|
"dynamic log level not available"
|
|
};
|
|
json_response(200, &format!(r#"{{"log_level":"{msg}"}}"#))
|
|
}
|
|
(Method::PUT, "/debug/log-level") => handle_log_level_update(req)
|
|
.await
|
|
.unwrap_or_else(|e| json_response(400, &format!(r#"{{"error":"{e}"}}"#))),
|
|
(Method::GET, "/debug/config") => {
|
|
let threshold = metrics().slow_request_threshold_ms.load(Ordering::Relaxed);
|
|
let ready = metrics().ready.load(Ordering::Relaxed);
|
|
json_response(
|
|
200,
|
|
&format!(
|
|
r#"{{"slow_request_threshold_ms":{},"ready":{}}}"#,
|
|
threshold, ready
|
|
),
|
|
)
|
|
}
|
|
_ => json_response(404, r#"{"error":"not found"}"#),
|
|
};
|
|
|
|
Ok(response)
|
|
}
|
|
|
|
async fn handle_log_level_update(req: Request<Incoming>) -> Result<Response<Full<Bytes>>, String> {
|
|
use http_body_util::BodyExt;
|
|
|
|
let body_bytes = req
|
|
.collect()
|
|
.await
|
|
.map_err(|e| format!("failed to read body: {e}"))?
|
|
.to_bytes();
|
|
|
|
let new_filter =
|
|
String::from_utf8(body_bytes.to_vec()).map_err(|e| format!("invalid UTF-8: {e}"))?;
|
|
let new_filter = new_filter.trim().to_string();
|
|
|
|
if new_filter.is_empty() {
|
|
return Ok(json_response(400, r#"{"error":"empty filter"}"#));
|
|
}
|
|
|
|
match LOG_RELOAD_HANDLE.get() {
|
|
Some(Some(handle)) => match new_filter.parse::<EnvFilter>() {
|
|
Ok(filter) => {
|
|
if let Err(e) = handle.reload(filter) {
|
|
Ok(json_response(500, &format!(r#"{{"error":"{e}"}}"#)))
|
|
} else {
|
|
tracing::info!(new_filter = %new_filter, "log level updated via HTTP");
|
|
Ok(json_response(
|
|
200,
|
|
&format!(r#"{{"status":"ok","filter":"{}"}}"#, new_filter),
|
|
))
|
|
}
|
|
}
|
|
Err(e) => Ok(json_response(
|
|
400,
|
|
&format!(r#"{{"error":"invalid filter: {e}"}}"#),
|
|
)),
|
|
},
|
|
_ => Ok(json_response(
|
|
501,
|
|
r#"{"error":"dynamic log level not configured"}"#,
|
|
)),
|
|
}
|
|
}
|
|
|
|
/// Start the HTTP server (metrics + health + debug) using hyper 1.x.
|
|
pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let listener =
|
|
match tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
tracing::error!(port, error = %e, "failed to bind HTTP server");
|
|
return;
|
|
}
|
|
};
|
|
tracing::info!(port, "HTTP server started (metrics + health + debug)");
|
|
|
|
let cancel = HTTP_CANCEL
|
|
.get()
|
|
.cloned()
|
|
.unwrap_or_else(tokio_util::sync::CancellationToken::new);
|
|
|
|
loop {
|
|
tokio::select! {
|
|
result = listener.accept() => {
|
|
match result {
|
|
Ok((stream, _peer)) => {
|
|
let io = hyper_util::rt::TokioIo::new(stream);
|
|
tokio::spawn(async move {
|
|
let builder = hyper::server::conn::http1::Builder::new();
|
|
if let Err(e) = builder
|
|
.serve_connection(io, Router)
|
|
.with_upgrades()
|
|
.await
|
|
{
|
|
tracing::debug!(error = %e, "HTTP connection error");
|
|
}
|
|
});
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "HTTP accept error");
|
|
}
|
|
}
|
|
}
|
|
_ = cancel.cancelled() => {
|
|
tracing::info!("HTTP server shutting down");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
pub struct RequestMetrics {
|
|
method: &'static str,
|
|
start: Instant,
|
|
}
|
|
|
|
impl RequestMetrics {
|
|
pub fn new(method: &'static str) -> Self {
|
|
inc_active_requests();
|
|
Self {
|
|
method,
|
|
start: Instant::now(),
|
|
}
|
|
}
|
|
|
|
/// Record the outcome. Idempotent — safe to call before each return.
|
|
pub fn record(&self, status: &str) {
|
|
let duration = self.start.elapsed();
|
|
let duration_ms = duration.as_millis() as u64;
|
|
record_request(self.method, status, duration);
|
|
|
|
let threshold = metrics().slow_request_threshold_ms.load(Ordering::Relaxed);
|
|
if threshold > 0 && duration_ms >= threshold {
|
|
tracing::warn!(
|
|
method = self.method,
|
|
status,
|
|
elapsed_ms = duration_ms,
|
|
threshold_ms = threshold,
|
|
"slow gRPC request detected"
|
|
);
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Drop for RequestMetrics {
|
|
fn drop(&mut self) {
|
|
dec_active_requests();
|
|
}
|
|
}
|
|
|
|
/// Convenience: record an error from a tonic Status.
|
|
pub fn record_rpc_error(m: &RequestMetrics, status: &tonic::Status) {
|
|
let kind = status.code().description();
|
|
inc_error(kind);
|
|
m.record(kind);
|
|
}
|