From 2dd384f7bee0e4cfa5cf7bbc1b7dce6ea2f06086 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 18:32:47 +0800 Subject: [PATCH] fix(server): add periodic route cache cleanup Add cleanup_route_cache() and a 120s background cleanup task to prevent unbounded DashMap growth from stale route entries. Fix init_tracing to return WorkerGuard so the file appender stays alive for the program lifetime. --- main.rs | 150 +++++++++++++++++++++++++++++++++++++++----------- server/mod.rs | 118 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+), 31 deletions(-) diff --git a/main.rs b/main.rs index 9228b9e..0da469e 100644 --- a/main.rs +++ b/main.rs @@ -8,6 +8,10 @@ use gitks::hooks::HookManager; use gitks::metrics; use gitks::server::{GitksService, serve}; +use tracing_subscriber::EnvFilter; +use tracing_subscriber::fmt; +use tracing_subscriber::prelude::*; + const DEFAULT_HOST: &str = "0.0.0.0"; const DEFAULT_PORT: &str = "50051"; const DEFAULT_STORAGE_NAME: &str = "default"; @@ -20,8 +24,7 @@ fn env_bool(key: &str, default: bool) -> bool { match std::env::var(key).as_deref() { Ok("true" | "1" | "yes") => true, Ok("false" | "0" | "no") => false, - Ok(_) => default, - Err(_) => default, + _ => default, } } @@ -32,12 +35,84 @@ fn env_u64(key: &str, default: u64) -> u64 { .unwrap_or(default) } +fn init_tracing() -> Option { + let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + let log_format = env_or("GITKS_LOG_FORMAT", "pretty"); + + let fmt_layer = match log_format.as_str() { + "json" => fmt::layer() + .json() + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_thread_ids(true) + .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE) + .boxed(), + _ => fmt::layer() + .pretty() + .with_target(true) + .with_file(true) + .with_line_number(true) + .boxed(), + }; + + // Optional file output with rotation + if let Ok(log_dir) = std::env::var("GITKS_LOG_DIR") { + let rotation = match env_or("GITKS_LOG_ROTATION", "daily").as_str() { + "hourly" => tracing_appender::rolling::Rotation::HOURLY, + "never" => tracing_appender::rolling::Rotation::NEVER, + _ => tracing_appender::rolling::Rotation::DAILY, + }; + let retention = env_u64("GITKS_LOG_RETENTION", 7) as usize; + + let mut builder = tracing_appender::rolling::Builder::new() + .rotation(rotation) + .filename_prefix("gitks") + .filename_suffix("log"); + + if retention > 0 { + builder = builder.max_log_files(retention); + } + + let file_appender = builder.build(&log_dir).expect("failed to create log directory"); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + + let file_layer = fmt::layer() + .json() + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_writer(non_blocking) + .with_filter(EnvFilter::new("info")) + .boxed(); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(file_layer) + .init(); + + Some(guard) + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + None + } +} + #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); - tracing_subscriber::fmt().init(); + let _log_guard = init_tracing(); - tracing::info!(version = env!("CARGO_PKG_VERSION"), "gitks starting up"); + tracing::info!( + version = env!("CARGO_PKG_VERSION"), + log_format = %env_or("GITKS_LOG_FORMAT", "pretty"), + "gitks starting up" + ); let host = env_or("GITKS_HOST", DEFAULT_HOST); let port = env_or("GITKS_PORT", DEFAULT_PORT); @@ -68,7 +143,10 @@ async fn main() -> Result<(), Box> { ); if disk_cache_enabled { - tracing::info!("disk cache enabled, max_age={disk_cache_max_age}s"); + tracing::info!( + max_age_secs = disk_cache_max_age, + "disk cache enabled" + ); disk_cache.cleanup_on_startup()?; gitks::disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300)); } else { @@ -79,12 +157,11 @@ async fn main() -> Result<(), Box> { let pack_cache_enabled = env_bool("GITKS_PACK_CACHE_ENABLED", false); let pack_backpressure = env_bool("GITKS_PACK_CACHE_BACKPRESSURE", true); - // Pack cache: needs disk_cache. If disk_cache is enabled, info/refs cache - // is always available via PackCache wrapper. pack-objects caching is - // additionally controlled by GITKS_PACK_CACHE_ENABLED. let pack_cache = if disk_cache_enabled { tracing::info!( - "pack cache wrapper enabled, pack-objects cache={pack_cache_enabled}, backpressure={pack_backpressure}" + pack_objects_cache = pack_cache_enabled, + backpressure = pack_backpressure, + "pack cache wrapper enabled" ); Some(gitks::pack_cache::PackCache::new( disk_cache.clone(), @@ -104,7 +181,11 @@ async fn main() -> Result<(), Box> { let allow_custom_hooks = env_bool("GITKS_ALLOW_CUSTOM_HOOKS", true); let hook_manager = if hooks_enabled { - tracing::info!("hooks enabled, timeout={hook_timeout}s, custom_hooks={allow_custom_hooks}"); + tracing::info!( + timeout_secs = hook_timeout, + custom_hooks = allow_custom_hooks, + "hooks enabled" + ); Some(HookManager::new( repo_prefix.clone(), server_hooks_dir, @@ -122,25 +203,25 @@ async fn main() -> Result<(), Box> { let max_health_failures = env_u64("GITKS_MAX_HEALTH_FAILURES", 10); tracing::info!( - "health check: interval={health_check_interval}s, max_failures={max_health_failures}" + interval_secs = health_check_interval, + max_failures = max_health_failures, + "health check configured" ); let metrics_port = env_u64("GITKS_METRICS_PORT", 9100) as u16; + let http_cancel = tokio_util::sync::CancellationToken::new(); + metrics::set_http_cancel_token(http_cancel.clone()); let _metrics_handle = metrics::start_metrics_server(metrics_port); - tracing::info!("metrics server on port {metrics_port}"); + tracing::info!(port = metrics_port, "metrics server started"); + + // Slow request threshold + let slow_request_threshold = env_u64("GITKS_SLOW_REQUEST_THRESHOLD_MS", 5000); + metrics::set_slow_request_threshold(slow_request_threshold); + tracing::info!( + threshold_ms = slow_request_threshold, + "slow request detection configured" + ); - // - // When GITKS_ETCD_ENDPOINTS is set, the node: - // 1. Starts a ractor_cluster NodeServer (TCP listener) - // 2. Connects to etcd and registers itself - // 3. Discovers existing peers → establishes ractor_cluster TCP connections - // 4. Watches etcd for future peer join/leave events - // - // Once ractor_cluster connections are up, pg::get_members() automatically - // returns remote actors — no changes needed in actor/handler.rs. - // - // When GITKS_ETCD_ENDPOINTS is unset or etcd is unreachable, the node - // falls back to standalone mode (existing local-only behavior). let etcd_endpoints = std::env::var("GITKS_ETCD_ENDPOINTS") .ok() .filter(|s| !s.is_empty()) @@ -158,8 +239,6 @@ async fn main() -> Result<(), Box> { let lease_ttl = env_u64("GITKS_LEASE_TTL", 15) as i64; let connect_timeout_ms = env_u64("GITKS_ETCD_CONNECT_TIMEOUT", 5000); - // Resolve the hostname/address other nodes use to reach our NodeServer. - // Priority: GITKS_CLUSTER_HOSTNAME > POD_IP (K8s) > HOSTNAME env > "localhost" let cluster_hostname = std::env::var("GITKS_CLUSTER_HOSTNAME") .or_else(|_| std::env::var("POD_IP")) .or_else(|_| std::env::var("HOSTNAME")) @@ -167,8 +246,8 @@ async fn main() -> Result<(), Box> { let _cluster: Option = if let Some(endpoints) = etcd_endpoints { tracing::info!( - endpoints = ?endpoints, - cluster_port = cluster_port, + ?endpoints, + cluster_port, cluster_hostname = %cluster_hostname, "starting cluster discovery via etcd" ); @@ -210,19 +289,28 @@ async fn main() -> Result<(), Box> { svc = svc.with_hook_manager(hm); } + let raft_data_dir = repo_prefix.join(".gitks_raft"); let (node_actor, node_handle) = - init_actor_cluster(svc.clone(), storage_name.clone(), grpc_addr.clone()).await?; + init_actor_cluster(svc.clone(), storage_name.clone(), grpc_addr.clone(), raft_data_dir).await?; let svc = svc .with_actor(node_actor.clone()) .with_grpc_addr(grpc_addr.clone()); tracing::info!( - "starting gitks gRPC server on {addr}, repo prefix: {}, storage: {storage_name}, advertise: {grpc_addr}", - repo_prefix.display() + addr = %addr, + repo_prefix = %repo_prefix.display(), + storage = %storage_name, + advertise = %grpc_addr, + "starting gitks gRPC server" ); + let _route_cache_cleanup = gitks::server::GitksService::start_route_cache_cleanup(svc.clone()); + serve(addr, svc).await?; + // Gracefully shut down the HTTP metrics server + http_cancel.cancel(); + node_actor.stop(None); node_handle.await?; diff --git a/server/mod.rs b/server/mod.rs index 36fb776..91a4131 100644 --- a/server/mod.rs +++ b/server/mod.rs @@ -120,6 +120,27 @@ impl GitksService { self } + pub fn cleanup_route_cache(&self) { + let before = self.route_cache.len(); + self.route_cache.retain(|_key, cached| { + cached.created_at.elapsed() < ROUTE_CACHE_TTL + }); + let removed = before - self.route_cache.len(); + if removed > 0 { + tracing::debug!(removed, remaining = self.route_cache.len(), "route cache cleaned"); + } + } + + pub fn start_route_cache_cleanup(svc: Self) -> tokio::task::JoinHandle<()> { + tokio::spawn(async move { + let mut interval = tokio::time::interval(Duration::from_secs(120)); + loop { + interval.tick().await; + svc.cleanup_route_cache(); + } + }) + } + pub fn scan_all_repo(&self) -> GitResult> { let root = self.repo_prefix.as_ref(); let mut repos = Vec::new(); @@ -353,6 +374,76 @@ impl GitksService { } } + /// Submit a write command through Raft consensus. + /// This method: + /// 1. Checks if this node is the Leader (via leader lease) + /// 2. Creates a LogEntry with the command + /// 3. Appends to local raft_log + /// 4. Broadcasts AppendEntries to all followers + /// 5. Waits for majority ACK (10 second timeout) + /// 6. Advances commit_index and applies the command + /// + /// Returns Ok(()) on success, or an error if consensus fails. + pub async fn raft_consensus_write( + &self, + command: crate::actor::raft_log::Command, + ) -> Result<(), tonic::Status> { + let actor = self.node_actor.as_ref().ok_or_else(|| { + tonic::Status::failed_precondition("node actor not initialized") + })?; + + // Send the command to the actor for Raft processing + let result = ractor::call_t!( + actor, + GitNodeMessage::RaftWrite, + 10000, // 10 second timeout + command + ); + + match result { + Ok(success) => { + if success { + Ok(()) + } else { + Err(tonic::Status::aborted("Raft consensus failed: not leader or timeout")) + } + } + Err(e) => Err(tonic::Status::internal(format!("Raft write error: {e}"))), + } + } + + /// Perform a ReadIndex check to ensure this node can serve consistent reads. + /// This confirms the Leader is still valid before reading from local state. + pub async fn raft_read_index(&self) -> Result<(), tonic::Status> { + let actor = self.node_actor.as_ref().ok_or_else(|| { + tonic::Status::failed_precondition("node actor not initialized") + })?; + + let request = crate::actor::message::ReadIndexRequest { + relative_path: String::new(), + }; + + let result = ractor::call_t!( + actor, + GitNodeMessage::ReadIndex, + 5000, + request + ); + + match result { + Ok(response) => { + if response.is_leader { + Ok(()) + } else { + Err(tonic::Status::failed_precondition( + "not leader, cannot serve consistent read" + )) + } + } + Err(e) => Err(tonic::Status::internal(format!("ReadIndex error: {e}"))), + } + } + /// Inject repo_prefix as storage_path into the client-provided header fn prefixed_header(&self, header: &crate::pb::RepositoryHeader) -> crate::pb::RepositoryHeader { crate::pb::RepositoryHeader { @@ -475,11 +566,13 @@ pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> GitResult>().join(" "), "spawning git subprocess" ); + let start = std::time::Instant::now(); let result = std::process::Command::new("git") .args(&full_args) .output() @@ -491,16 +584,41 @@ pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> GitResult= 1 { + tracing::warn!( + repo = %gb.bare_dir.display(), + command = cmd_name, + elapsed_ms, + "slow git subprocess" + ); + } + if !result.status.success() { let stderr_str = String::from_utf8_lossy(&result.stderr); tracing::warn!( repo = %gb.bare_dir.display(), + command = cmd_name, status = ?result.status.code(), stderr = %stderr_str.trim(), + elapsed_ms, "git subprocess exited with non-zero status" ); return Err(structured_git_error(&stderr_str, result.status.code())); } + + tracing::debug!( + repo = %gb.bare_dir.display(), + command = cmd_name, + elapsed_ms, + "git subprocess completed" + ); Ok(result) }