66afd932ed
- Add FindCommit, ListCommitsByOid, CommitIsAncestor RPCs to CommitService - Add CheckObjectsExist, CommitsByMessage, GetCommitStats RPCs to CommitService - Add LastCommitForPath, CountCommits, CountDivergingCommits RPCs to CommitService - Add RawDiff, RawPatch, FindChangedPaths RPCs to DiffService - Add FindMergeBase, WriteRef, SearchFilesByContent RPCs to RepositoryService - Add SearchFilesByName, ObjectsSize, RepositorySize RPCs to RepositoryService - Add FindLicense, OptimizeRepository, GetRawChanges RPCs to RepositoryService - Add FetchRemote, CreateRepositoryFromURL RPCs to RepositoryService - Implement server handlers for all new RPC methods - Add new modules for commit counting, finding, and querying features - Add new modules for diff changed paths and raw operations - Add new modules for refs and remote operations - Remove unnecessary comments from various source files - Update proto definitions with new message types and service methods
307 lines
9.6 KiB
Rust
307 lines
9.6 KiB
Rust
//! Prometheus-compatible metrics for GitKS.
|
|
//!
|
|
//! Tracks:
|
|
//! - Request counts by gRPC method + status code
|
|
//! - Request duration histogram by method
|
|
//! - Active requests gauge
|
|
//! - Repository count
|
|
//! - Cache hits / misses
|
|
//! - Error counts by error type
|
|
//!
|
|
//! Exposes a `/metrics` HTTP endpoint on a configurable port (default 9100).
|
|
|
|
use dashmap::DashMap;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
use std::sync::{Arc, OnceLock};
|
|
use std::time::{Duration, Instant};
|
|
|
|
|
|
struct MetricsInner {
|
|
/// Counter: total requests by (method, status_code)
|
|
/// Key: "method:status"
|
|
request_count: DashMap<String, AtomicU64>,
|
|
|
|
/// Histogram buckets for request duration (seconds).
|
|
/// Each bucket: (method, le_bound_ms) → count
|
|
duration_buckets: DashMap<String, AtomicU64>,
|
|
|
|
/// Gauge: number of currently in-flight requests
|
|
active_requests: AtomicU64,
|
|
|
|
/// Gauge: total number of registered repositories
|
|
repository_count: AtomicU64,
|
|
|
|
/// Counter: cache hits
|
|
cache_hits: AtomicU64,
|
|
|
|
/// Counter: cache misses
|
|
cache_misses: AtomicU64,
|
|
|
|
/// Counter: errors by error kind
|
|
error_count: DashMap<String, AtomicU64>,
|
|
|
|
/// Start timestamp (seconds since Unix epoch)
|
|
start_time: Instant,
|
|
}
|
|
|
|
static METRICS: OnceLock<Arc<MetricsInner>> = OnceLock::new();
|
|
|
|
fn metrics() -> &'static Arc<MetricsInner> {
|
|
METRICS.get_or_init(|| {
|
|
Arc::new(MetricsInner {
|
|
request_count: DashMap::new(),
|
|
duration_buckets: DashMap::new(),
|
|
active_requests: AtomicU64::new(0),
|
|
repository_count: AtomicU64::new(0),
|
|
cache_hits: AtomicU64::new(0),
|
|
cache_misses: AtomicU64::new(0),
|
|
error_count: DashMap::new(),
|
|
start_time: Instant::now(),
|
|
})
|
|
})
|
|
}
|
|
|
|
|
|
#[rustfmt::skip]
|
|
const DURATION_BUCKET_MS: &[u64] = &[
|
|
5, 10, 25, 50, 100, 250, 500, 1_000,
|
|
2_500, 5_000, 10_000, 30_000, 60_000, u64::MAX,
|
|
];
|
|
|
|
const BUCKET_INF: u64 = u64::MAX;
|
|
|
|
/// Record a request.
|
|
pub fn record_request(method: &str, status_code: &str, duration: Duration) {
|
|
let m = metrics();
|
|
let key = format!("{method}:{status_code}");
|
|
m.request_count
|
|
.entry(key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
|
|
let duration_ms = duration.as_millis() as u64;
|
|
for &bound_ms in DURATION_BUCKET_MS {
|
|
if duration_ms <= bound_ms || bound_ms == BUCKET_INF {
|
|
let bucket_key = format!("{method}:{bound_ms}");
|
|
m.duration_buckets
|
|
.entry(bucket_key)
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Increment the active request gauge.
|
|
pub fn inc_active_requests() {
|
|
metrics().active_requests.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Decrement the active request gauge.
|
|
pub fn dec_active_requests() {
|
|
metrics().active_requests.fetch_sub(1, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Set the repository count.
|
|
pub fn set_repository_count(count: u64) {
|
|
metrics()
|
|
.repository_count
|
|
.store(count, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a cache hit.
|
|
pub fn inc_cache_hits(count: u64) {
|
|
metrics().cache_hits.fetch_add(count, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record a cache miss.
|
|
pub fn inc_cache_misses(count: u64) {
|
|
metrics()
|
|
.cache_misses
|
|
.fetch_add(count, Ordering::Relaxed);
|
|
}
|
|
|
|
/// Record an error by kind (e.g., "not_found", "internal", "invalid_argument").
|
|
pub fn inc_error(kind: &str) {
|
|
metrics()
|
|
.error_count
|
|
.entry(kind.to_string())
|
|
.or_insert_with(|| AtomicU64::new(0))
|
|
.value()
|
|
.fetch_add(1, Ordering::Relaxed);
|
|
}
|
|
|
|
|
|
/// Render all metrics in Prometheus text exposition format.
|
|
pub fn render_metrics() -> String {
|
|
let m = metrics();
|
|
let mut out = String::with_capacity(4096);
|
|
|
|
// Header
|
|
let uptime = m.start_time.elapsed().as_secs();
|
|
out.push_str("# HELP gitks_uptime_seconds Time since gitks started\n");
|
|
out.push_str("# TYPE gitks_uptime_seconds gauge\n");
|
|
out.push_str(&format!("gitks_uptime_seconds {uptime}\n\n"));
|
|
|
|
// Active requests
|
|
let active = m.active_requests.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_active_requests Currently in-flight requests\n");
|
|
out.push_str("# TYPE gitks_active_requests gauge\n");
|
|
out.push_str(&format!("gitks_active_requests {active}\n\n"));
|
|
|
|
// Repository count
|
|
let repos = m.repository_count.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_repository_count Number of registered repositories\n");
|
|
out.push_str("# TYPE gitks_repository_count gauge\n");
|
|
out.push_str(&format!("gitks_repository_count {repos}\n\n"));
|
|
|
|
// Request count
|
|
out.push_str("# HELP gitks_requests_total Total gRPC requests by method and status\n");
|
|
out.push_str("# TYPE gitks_requests_total counter\n");
|
|
for entry in &m.request_count {
|
|
let (method_and_status, count) = (entry.key(), entry.value());
|
|
let count = count.load(Ordering::Relaxed);
|
|
if let Some((method, status)) = method_and_status.rsplit_once(':') {
|
|
out.push_str(
|
|
&format!("gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n"),
|
|
);
|
|
}
|
|
}
|
|
out.push('\n');
|
|
|
|
// Duration histogram
|
|
out.push_str(
|
|
"# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n",
|
|
);
|
|
out.push_str("# TYPE gitks_request_duration_milliseconds histogram\n");
|
|
for entry in &m.duration_buckets {
|
|
let (method_and_bound, count) = (entry.key(), entry.value());
|
|
let count = count.load(Ordering::Relaxed);
|
|
if let Some((method, bound_str)) = method_and_bound.rsplit_once(':') {
|
|
let bound = bound_str;
|
|
let le = if bound_str.parse::<u64>() == Ok(BUCKET_INF) {
|
|
"+Inf".to_string()
|
|
} else {
|
|
bound.to_string()
|
|
};
|
|
out.push_str(
|
|
&format!("gitks_request_duration_milliseconds_bucket{{method=\"{method}\",le=\"{le}\"}} {count}\n"),
|
|
);
|
|
}
|
|
}
|
|
out.push('\n');
|
|
|
|
// Cache
|
|
let hits = m.cache_hits.load(Ordering::Relaxed);
|
|
let misses = m.cache_misses.load(Ordering::Relaxed);
|
|
out.push_str("# HELP gitks_cache_hits_total Cache hit count\n");
|
|
out.push_str("# TYPE gitks_cache_hits_total counter\n");
|
|
out.push_str(&format!("gitks_cache_hits_total {hits}\n\n"));
|
|
out.push_str("# HELP gitks_cache_misses_total Cache miss count\n");
|
|
out.push_str("# TYPE gitks_cache_misses_total counter\n");
|
|
out.push_str(&format!("gitks_cache_misses_total {misses}\n\n"));
|
|
|
|
// Errors
|
|
out.push_str("# HELP gitks_errors_total Total errors by kind\n");
|
|
out.push_str("# TYPE gitks_errors_total counter\n");
|
|
for entry in &m.error_count {
|
|
let (kind, count) = (entry.key(), entry.value());
|
|
let count = count.load(Ordering::Relaxed);
|
|
out.push_str(&format!("gitks_errors_total{{kind=\"{kind}\"}} {count}\n"));
|
|
}
|
|
out.push('\n');
|
|
|
|
out
|
|
}
|
|
|
|
|
|
/// Start the metrics HTTP server on the given port.
|
|
/// Runs in a background task; returns the JoinHandle.
|
|
pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let listener = match tokio::net::TcpListener::bind(format!("0.0.0.0:{port}")).await {
|
|
Ok(l) => l,
|
|
Err(e) => {
|
|
tracing::error!(port, error = %e, "failed to bind metrics server");
|
|
return;
|
|
}
|
|
};
|
|
tracing::info!(port, "metrics HTTP server started");
|
|
|
|
loop {
|
|
match listener.accept().await {
|
|
Ok((socket, peer)) => {
|
|
tracing::debug!(%peer, "metrics request");
|
|
tokio::spawn(handle_metrics_connection(socket));
|
|
}
|
|
Err(e) => {
|
|
tracing::error!(error = %e, "metrics accept error");
|
|
}
|
|
}
|
|
}
|
|
})
|
|
}
|
|
|
|
async fn handle_metrics_connection(mut socket: tokio::net::TcpStream) {
|
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
|
|
|
let mut buf = [0u8; 4096];
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), socket.read(&mut buf)).await;
|
|
|
|
let body = render_metrics();
|
|
let response = format!(
|
|
"HTTP/1.1 200 OK\r\nContent-Type: text/plain; version=0.0.4\r\nContent-Length: {}\r\nConnection: close\r\n\r\n{}",
|
|
body.len(),
|
|
body
|
|
);
|
|
|
|
let _ = tokio::time::timeout(Duration::from_secs(5), socket.write_all(response.as_bytes()))
|
|
.await;
|
|
let _ = socket.shutdown().await;
|
|
}
|
|
|
|
|
|
/// A guard that records metrics on drop.
|
|
///
|
|
/// Usage in handlers:
|
|
/// ```ignore
|
|
/// let m = crate::metrics::RequestMetrics::new("Service/Method");
|
|
/// // ... handle request ...
|
|
/// m.record("ok"); // on success
|
|
/// // m.record("internal"); // or on error, with tonic error kind
|
|
/// ```
|
|
pub struct RequestMetrics {
|
|
method: &'static str,
|
|
start: Instant,
|
|
}
|
|
|
|
impl RequestMetrics {
|
|
pub fn new(method: &'static str) -> Self {
|
|
inc_active_requests();
|
|
Self {
|
|
method,
|
|
start: Instant::now(),
|
|
}
|
|
}
|
|
|
|
/// Record the outcome. Idempotent — safe to call before each return.
|
|
pub fn record(&self, status: &str) {
|
|
let duration = self.start.elapsed();
|
|
record_request(self.method, status, duration);
|
|
}
|
|
}
|
|
|
|
impl Drop for RequestMetrics {
|
|
fn drop(&mut self) {
|
|
dec_active_requests();
|
|
}
|
|
}
|
|
|
|
/// Convenience: record an error from a tonic Status.
|
|
pub fn record_rpc_error(m: &RequestMetrics, status: &tonic::Status) {
|
|
let kind = status.code().description();
|
|
inc_error(kind);
|
|
m.record(kind);
|
|
}
|