diff --git a/lib.rs b/lib.rs index 1ec7140..079aaf7 100644 --- a/lib.rs +++ b/lib.rs @@ -29,3 +29,416 @@ pub mod server; pub mod snapshot; pub mod tag; pub mod tree; + +use std::future::Future; +use std::net::SocketAddr; +use std::path::PathBuf; +use std::time::Duration; + +use server::GitksService; + +/// Configuration for building a [`GitksServer`]. +/// +/// Use [`GitksConfig::from_env`] to read from environment variables, +/// or construct manually when embedding as a library. +#[derive(Debug, Clone)] +pub struct GitksConfig { + /// Repository storage prefix path (required). + pub repo_prefix: PathBuf, + /// gRPC listen host. + pub host: String, + /// gRPC listen port. + pub port: String, + /// Storage name used in repository headers. + pub storage_name: String, + /// Advertised gRPC address for other services. + pub grpc_addr: String, + /// Enable disk-based cache. + pub disk_cache_enabled: bool, + /// Max age for disk cache entries (seconds). + pub disk_cache_max_age: u64, + /// Enable pack-objects cache wrapper. + pub pack_cache_enabled: bool, + /// Enable pack cache backpressure. + pub pack_cache_backpressure: bool, + /// Enable hook support. + pub hooks_enabled: bool, + /// Server-side hooks directory. + pub server_hooks_dir: Option, + /// Callback address for hooks. + pub hook_callback_addr: Option, + /// Hook execution timeout (seconds). + pub hook_timeout: u64, + /// Allow custom hooks from repository config. + pub allow_custom_hooks: bool, + /// Prometheus metrics port. + pub metrics_port: u16, + /// Slow request detection threshold (ms). + pub slow_request_threshold: u64, + /// Log format: "pretty" or "json". + pub log_format: String, + /// Optional log directory for file output. + pub log_dir: Option, + /// Log rotation strategy. + pub log_rotation: String, + /// Max log files to retain. + pub log_retention: usize, +} + +/// A ready-to-run gitks gRPC server. +pub struct GitksServer { + service: GitksService, + addr: SocketAddr, + http_cancel: tokio_util::sync::CancellationToken, + _log_guard: Option, + _metrics_handle: tokio::task::JoinHandle<()>, + _semaphore_cleanup: tokio::task::JoinHandle<()>, +} + +/// Builder for [`GitksServer`]. +/// +/// # Examples +/// +/// ```no_run +/// use gitks::{GitksServer, GitksConfig}; +/// +/// # async fn example() -> Result<(), Box> { +/// let config = GitksConfig::from_env()?; +/// let server = GitksServer::builder() +/// .config(config) +/// .build()?; +/// server.serve().await?; +/// # Ok(()) +/// # } +/// ``` +pub struct GitksServerBuilder { + config: Option, +} + +impl GitksConfig { + /// Build config from environment variables. + pub fn from_env() -> Result> { + dotenvy::dotenv().ok(); + Ok(Self { + repo_prefix: PathBuf::from( + std::env::var("REPO_PREFIX_PATH") + .map_err(|_| "REPO_PREFIX_PATH is required (e.g. /data/repos)")?, + ), + host: env_or("GITKS_HOST", "0.0.0.0"), + port: env_or("GITKS_PORT", "50051"), + storage_name: env_or("STORAGE_NAME", "default"), + grpc_addr: std::env::var("GITKS_ADVERTISE_ADDR") + .unwrap_or_else(|_| format!("http://{}:{}", env_or("GITKS_HOST", "0.0.0.0"), env_or("GITKS_PORT", "50051"))), + disk_cache_enabled: env_bool("GITKS_DISK_CACHE_ENABLED", false), + disk_cache_max_age: env_u64("GITKS_DISK_CACHE_MAX_AGE", 300), + pack_cache_enabled: env_bool("GITKS_PACK_CACHE_ENABLED", false), + pack_cache_backpressure: env_bool("GITKS_PACK_CACHE_BACKPRESSURE", true), + hooks_enabled: env_bool("GITKS_HOOKS_ENABLED", true), + server_hooks_dir: std::env::var("GITKS_SERVER_HOOKS_DIR").ok().map(PathBuf::from), + hook_callback_addr: std::env::var("GITKS_HOOK_CALLBACK_ADDR").ok(), + hook_timeout: env_u64("GITKS_HOOK_TIMEOUT", 30), + allow_custom_hooks: env_bool("GITKS_ALLOW_CUSTOM_HOOKS", true), + metrics_port: env_u64("GITKS_METRICS_PORT", 9100) as u16, + slow_request_threshold: env_u64("GITKS_SLOW_REQUEST_THRESHOLD_MS", 5000), + log_format: env_or("GITKS_LOG_FORMAT", "pretty"), + log_dir: std::env::var("GITKS_LOG_DIR").ok(), + log_rotation: env_or("GITKS_LOG_ROTATION", "daily"), + log_retention: env_u64("GITKS_LOG_RETENTION", 7) as usize, + }) + } +} + +impl GitksServer { + /// Create a new builder. + pub fn builder() -> GitksServerBuilder { + GitksServerBuilder::default() + } + + /// Start the gRPC server and block until Ctrl+C (or SIGTERM on Unix). + pub async fn serve(self) -> Result<(), Box> { + self.serve_with_shutdown(shutdown_signal()).await + } + + /// Start the gRPC server and block until the provided `shutdown` future resolves. + pub async fn serve_with_shutdown( + self, + shutdown: impl Future, + ) -> Result<(), Box> { + metrics::set_ready(true); + + server::serve_with_shutdown(self.addr, self.service, shutdown).await?; + + metrics::set_ready(false); + self.http_cancel.cancel(); + + tracing::info!("gitks shut down complete"); + Ok(()) + } +} + +impl GitksServerBuilder { + /// Set the server configuration. + pub fn config(mut self, config: GitksConfig) -> Self { + self.config = Some(config); + self + } + + /// Build the server. + pub fn build(self) -> Result> { + let config = self.config.unwrap_or_else(|| { + GitksConfig::from_env().expect("failed to load gitks config") + }); + + // Validate repo_prefix + if !config.repo_prefix.is_absolute() { + return Err("REPO_PREFIX_PATH must be an absolute path".into()); + } + if !config.repo_prefix.exists() { + tracing::info!(path = %config.repo_prefix.display(), "creating repo prefix directory"); + std::fs::create_dir_all(&config.repo_prefix)?; + } + + // Init logging if log_dir is set + let log_guard = init_tracing_from_config(&config); + + tracing::info!( + version = env!("CARGO_PKG_VERSION"), + log_format = %config.log_format, + "gitks starting up" + ); + + // Disk cache + let disk_cache = disk_cache::DiskCache::new( + config.repo_prefix.clone(), + env!("CARGO_PKG_VERSION").to_string(), + config.disk_cache_max_age, + config.disk_cache_enabled, + ); + + if config.disk_cache_enabled { + tracing::info!(max_age_secs = config.disk_cache_max_age, "disk cache enabled"); + disk_cache.cleanup_on_startup()?; + disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300)); + } else { + tracing::info!("disk cache disabled"); + } + + // Pack cache + let pack_cache = if config.disk_cache_enabled { + tracing::info!( + pack_objects_cache = config.pack_cache_enabled, + backpressure = config.pack_cache_backpressure, + "pack cache wrapper enabled" + ); + Some(pack_cache::PackCache::new( + disk_cache.clone(), + config.pack_cache_backpressure, + )) + } else { + None + }; + + // Hook manager + let hook_manager = if config.hooks_enabled { + tracing::info!( + timeout_secs = config.hook_timeout, + custom_hooks = config.allow_custom_hooks, + "hooks enabled" + ); + Some(hooks::HookManager::new( + config.repo_prefix.clone(), + config.server_hooks_dir, + config.hook_callback_addr, + Duration::from_secs(config.hook_timeout), + config.allow_custom_hooks, + )) + } else { + tracing::info!("hooks disabled"); + None + }; + + // Metrics + let http_cancel = tokio_util::sync::CancellationToken::new(); + metrics::set_http_cancel_token(http_cancel.clone()); + let metrics_handle = metrics::start_metrics_server(config.metrics_port); + tracing::info!(port = config.metrics_port, "metrics server started"); + + // Rate limiter cleanup + let semaphore_cleanup = rate_limit::start_semaphore_cleanup_task(); + + // Slow request detection + metrics::set_slow_request_threshold(config.slow_request_threshold); + tracing::info!( + threshold_ms = config.slow_request_threshold, + "slow request detection configured" + ); + + // Build service + let addr: SocketAddr = format!("{}:{}", config.host, config.port).parse()?; + let mut svc = GitksService::new(config.repo_prefix.clone()); + + if config.disk_cache_enabled { + svc = svc.with_disk_cache(disk_cache); + } + if let Some(pc) = pack_cache { + svc = svc.with_pack_cache(pc); + } + if let Some(hm) = hook_manager { + svc = svc.with_hook_manager(hm); + } + + let svc = svc.with_grpc_addr(config.grpc_addr.clone()); + + tracing::info!( + addr = %addr, + repo_prefix = %config.repo_prefix.display(), + storage = %config.storage_name, + advertise = %config.grpc_addr, + "starting gitks gRPC server" + ); + + Ok(GitksServer { + service: svc, + addr, + http_cancel, + _log_guard: log_guard, + _metrics_handle: metrics_handle, + _semaphore_cleanup: semaphore_cleanup, + }) + } +} + +impl Default for GitksServerBuilder { + fn default() -> Self { + Self { config: None } + } +} + +fn env_or(key: &str, default: &str) -> String { + std::env::var(key).unwrap_or_else(|_| default.into()) +} + +fn env_bool(key: &str, default: bool) -> bool { + match std::env::var(key).as_deref() { + Ok("true" | "1" | "yes") => true, + Ok("false" | "0" | "no") => false, + _ => default, + } +} + +fn env_u64(key: &str, default: u64) -> u64 { + std::env::var(key) + .ok() + .and_then(|v| v.parse().ok()) + .unwrap_or(default) +} + +fn init_tracing_from_config( + config: &GitksConfig, +) -> Option { + use tracing_subscriber::EnvFilter; + use tracing_subscriber::fmt; + use tracing_subscriber::prelude::*; + + let env_filter = + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); + + let fmt_layer = match config.log_format.as_str() { + "json" => fmt::layer() + .json() + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_thread_ids(true) + .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE) + .boxed(), + _ => fmt::layer() + .pretty() + .with_target(true) + .with_file(true) + .with_line_number(true) + .boxed(), + }; + + if let Some(ref log_dir) = config.log_dir { + let rotation = match config.log_rotation.as_str() { + "hourly" => tracing_appender::rolling::Rotation::HOURLY, + "never" => tracing_appender::rolling::Rotation::NEVER, + _ => tracing_appender::rolling::Rotation::DAILY, + }; + let retention = config.log_retention; + + let mut builder = tracing_appender::rolling::Builder::new() + .rotation(rotation) + .filename_prefix("gitks") + .filename_suffix("log"); + + if retention > 0 { + builder = builder.max_log_files(retention); + } + + let file_appender = match builder.build(log_dir) { + Ok(file_appender) => file_appender, + Err(err) => { + eprintln!("failed to create log directory '{log_dir}': {err}"); + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + return None; + } + }; + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + + let file_layer = fmt::layer() + .json() + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_writer(non_blocking) + .with_filter(EnvFilter::new("info")) + .boxed(); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(file_layer) + .init(); + + Some(guard) + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + None + } +} + +async fn shutdown_signal() { + let ctrl_c = async { + tokio::signal::ctrl_c() + .await + .expect("failed to install Ctrl+C handler"); + }; + + #[cfg(unix)] + let terminate = async { + tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) + .expect("failed to install SIGTERM handler") + .recv() + .await; + }; + + #[cfg(not(unix))] + let terminate = std::future::pending::<()>(); + + tokio::select! { + _ = ctrl_c => { + tracing::info!("received Ctrl+C, starting graceful shutdown"); + } + _ = terminate => { + tracing::info!("received SIGTERM, starting graceful shutdown"); + } + } +} diff --git a/main.rs b/main.rs index d7f6cb5..8c1c4af 100644 --- a/main.rs +++ b/main.rs @@ -1,25 +1,10 @@ //! Copyright (c) 2022-2026 GitDataAi All rights reserved. -use std::path::PathBuf; use std::sync::Arc; -use std::time::Duration; - -use gitks::disk_cache::DiskCache; -use gitks::hooks::HookManager; -use gitks::metrics; -use gitks::server::{GitksService, serve_with_shutdown}; use etcd_client::{Client, PutOptions}; use tokio::sync::Mutex; -use tracing_subscriber::EnvFilter; -use tracing_subscriber::fmt; -use tracing_subscriber::prelude::*; - -const DEFAULT_HOST: &str = "0.0.0.0"; -const DEFAULT_PORT: &str = "50051"; -const DEFAULT_STORAGE_NAME: &str = "default"; - /// etcd-backed config reader. Priority: etcd > env > default. struct EtcdConfig { client: Arc>, @@ -37,7 +22,6 @@ impl EtcdConfig { }) } - /// Get config: etcd first, env second, default last. async fn get(&self, key: &str, default: &str) -> String { let etcd_key = format!("{}config/{}", self.prefix, key); if let Ok(mut c) = self.client.try_lock() @@ -51,7 +35,6 @@ impl EtcdConfig { std::env::var(key).unwrap_or_else(|_| default.to_string()) } - /// Register this service under the common prefix for discovery by other services. async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> { let instance_id = uuid::Uuid::now_v7().to_string(); let addr = addr.to_string(); @@ -102,117 +85,13 @@ impl EtcdConfig { } } -fn env_or(key: &str, default: &str) -> String { - std::env::var(key).unwrap_or_else(|_| default.into()) -} - -fn env_bool(key: &str, default: bool) -> bool { - match std::env::var(key).as_deref() { - Ok("true" | "1" | "yes") => true, - Ok("false" | "0" | "no") => false, - _ => default, - } -} - -fn env_u64(key: &str, default: u64) -> u64 { - std::env::var(key) - .ok() - .and_then(|v| v.parse().ok()) - .unwrap_or(default) -} - -fn init_tracing() -> Option { - let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")); - - let log_format = env_or("GITKS_LOG_FORMAT", "pretty"); - - let fmt_layer = match log_format.as_str() { - "json" => fmt::layer() - .json() - .with_target(true) - .with_file(true) - .with_line_number(true) - .with_thread_ids(true) - .with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE) - .boxed(), - _ => fmt::layer() - .pretty() - .with_target(true) - .with_file(true) - .with_line_number(true) - .boxed(), - }; - - if let Ok(log_dir) = std::env::var("GITKS_LOG_DIR") { - let rotation = match env_or("GITKS_LOG_ROTATION", "daily").as_str() { - "hourly" => tracing_appender::rolling::Rotation::HOURLY, - "never" => tracing_appender::rolling::Rotation::NEVER, - _ => tracing_appender::rolling::Rotation::DAILY, - }; - let retention = env_u64("GITKS_LOG_RETENTION", 7) as usize; - - let mut builder = tracing_appender::rolling::Builder::new() - .rotation(rotation) - .filename_prefix("gitks") - .filename_suffix("log"); - - if retention > 0 { - builder = builder.max_log_files(retention); - } - - let file_appender = match builder.build(&log_dir) { - Ok(file_appender) => file_appender, - Err(err) => { - eprintln!("failed to create log directory '{log_dir}': {err}"); - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .init(); - return None; - } - }; - let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); - - let file_layer = fmt::layer() - .json() - .with_target(true) - .with_file(true) - .with_line_number(true) - .with_writer(non_blocking) - .with_filter(EnvFilter::new("info")) - .boxed(); - - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .with(file_layer) - .init(); - - Some(guard) - } else { - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .init(); - None - } -} - #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); - let _log_guard = init_tracing(); - tracing::info!( - version = env!("CARGO_PKG_VERSION"), - log_format = %env_or("GITKS_LOG_FORMAT", "pretty"), - "gitks starting up" - ); - - let host = env_or("GITKS_HOST", DEFAULT_HOST); - let port = env_or("GITKS_PORT", DEFAULT_PORT); - let storage_name = env_or("STORAGE_NAME", DEFAULT_STORAGE_NAME); + let mut config = gitks::GitksConfig::from_env()?; + // Overlay etcd config (etcd > env > default) let etcd_endpoints: Vec = std::env::var("GITKS_ETCD_ENDPOINTS") .ok() .filter(|s| !s.is_empty()) @@ -220,177 +99,20 @@ async fn main() -> Result<(), Box> { .unwrap_or_else(|| vec!["http://localhost:2379".to_string()]); let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string()); - let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await.ok(); - let host = if let Some(ref e) = etcd { - e.get("GITKS_HOST", &host).await - } else { - host - }; - let port = if let Some(ref e) = etcd { - e.get("GITKS_PORT", &port).await - } else { - port - }; - let storage_name = if let Some(ref e) = etcd { - e.get("GITKS_STORAGE_NAME", &storage_name).await - } else { - storage_name - }; - let grpc_addr = - std::env::var("GITKS_ADVERTISE_ADDR").unwrap_or_else(|_| format!("http://{host}:{port}")); + if let Ok(etcd) = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await { + config.host = etcd.get("GITKS_HOST", &config.host).await; + config.port = etcd.get("GITKS_PORT", &config.port).await; + config.storage_name = etcd.get("GITKS_STORAGE_NAME", &config.storage_name).await; + config.grpc_addr = etcd + .get("GITKS_ADVERTISE_ADDR", &config.grpc_addr) + .await; - if let Some(ref e) = etcd { - let addr_str = format!("{host}:{port}"); - e.register("gitks", &addr_str).await.ok(); + let addr_str = format!("{}:{}", config.host, config.port); + etcd.register("gitks", &addr_str).await.ok(); } - let repo_prefix = std::env::var("REPO_PREFIX_PATH") - .map_err(|_| "REPO_PREFIX_PATH environment variable is required (e.g. /data/repos)")?; - let repo_prefix = PathBuf::from(&repo_prefix); - if !repo_prefix.is_absolute() { - return Err("REPO_PREFIX_PATH must be an absolute path".into()); - } - if !repo_prefix.exists() { - tracing::info!(path = %repo_prefix.display(), "creating repo prefix directory"); - std::fs::create_dir_all(&repo_prefix)?; - } + let server = gitks::GitksServer::builder().config(config).build()?; + server.serve().await?; - let disk_cache_enabled = env_bool("GITKS_DISK_CACHE_ENABLED", false); - let disk_cache_max_age = env_u64("GITKS_DISK_CACHE_MAX_AGE", 300); - - let disk_cache = DiskCache::new( - repo_prefix.clone(), - env!("CARGO_PKG_VERSION").to_string(), - disk_cache_max_age, - disk_cache_enabled, - ); - - if disk_cache_enabled { - tracing::info!(max_age_secs = disk_cache_max_age, "disk cache enabled"); - disk_cache.cleanup_on_startup()?; - gitks::disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300)); - } else { - tracing::info!("disk cache disabled"); - } - - let pack_cache_enabled = env_bool("GITKS_PACK_CACHE_ENABLED", false); - let pack_backpressure = env_bool("GITKS_PACK_CACHE_BACKPRESSURE", true); - - let pack_cache = if disk_cache_enabled { - tracing::info!( - pack_objects_cache = pack_cache_enabled, - backpressure = pack_backpressure, - "pack cache wrapper enabled" - ); - Some(gitks::pack_cache::PackCache::new( - disk_cache.clone(), - pack_backpressure, - )) - } else { - None - }; - - let hooks_enabled = env_bool("GITKS_HOOKS_ENABLED", true); - let server_hooks_dir = std::env::var("GITKS_SERVER_HOOKS_DIR") - .ok() - .map(PathBuf::from); - let hook_callback_addr = std::env::var("GITKS_HOOK_CALLBACK_ADDR").ok(); - let hook_timeout = env_u64("GITKS_HOOK_TIMEOUT", 30); - let allow_custom_hooks = env_bool("GITKS_ALLOW_CUSTOM_HOOKS", true); - - let hook_manager = if hooks_enabled { - tracing::info!( - timeout_secs = hook_timeout, - custom_hooks = allow_custom_hooks, - "hooks enabled" - ); - Some(HookManager::new( - repo_prefix.clone(), - server_hooks_dir, - hook_callback_addr, - Duration::from_secs(hook_timeout), - allow_custom_hooks, - )) - } else { - tracing::info!("hooks disabled"); - None - }; - - let metrics_port = env_u64("GITKS_METRICS_PORT", 9100) as u16; - let http_cancel = tokio_util::sync::CancellationToken::new(); - metrics::set_http_cancel_token(http_cancel.clone()); - let _metrics_handle = metrics::start_metrics_server(metrics_port); - tracing::info!(port = metrics_port, "metrics server started"); - - let _semaphore_cleanup = gitks::rate_limit::start_semaphore_cleanup_task(); - - let slow_request_threshold = env_u64("GITKS_SLOW_REQUEST_THRESHOLD_MS", 5000); - metrics::set_slow_request_threshold(slow_request_threshold); - tracing::info!( - threshold_ms = slow_request_threshold, - "slow request detection configured" - ); - - let addr: std::net::SocketAddr = format!("{host}:{port}").parse()?; - let mut svc = GitksService::new(repo_prefix.clone()); - - if disk_cache_enabled { - svc = svc.with_disk_cache(disk_cache); - } - if let Some(pc) = pack_cache { - svc = svc.with_pack_cache(pc); - } - if let Some(hm) = hook_manager { - svc = svc.with_hook_manager(hm); - } - - let svc = svc.with_grpc_addr(grpc_addr.clone()); - - tracing::info!( - addr = %addr, - repo_prefix = %repo_prefix.display(), - storage = %storage_name, - advertise = %grpc_addr, - "starting gitks gRPC server" - ); - - metrics::set_ready(true); - - serve_with_shutdown(addr, svc, shutdown_signal()).await?; - - metrics::set_ready(false); - - http_cancel.cancel(); - - tracing::info!("gitks shut down complete"); Ok(()) } - -/// Resolves when the process receives SIGTERM or SIGINT (Ctrl+C). -async fn shutdown_signal() { - let ctrl_c = async { - tokio::signal::ctrl_c() - .await - .expect("failed to install Ctrl+C handler"); - }; - - #[cfg(unix)] - let terminate = async { - tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) - .expect("failed to install SIGTERM handler") - .recv() - .await; - }; - - #[cfg(not(unix))] - let terminate = std::future::pending::<()>(); - - tokio::select! { - _ = ctrl_c => { - tracing::info!("received Ctrl+C, starting graceful shutdown"); - } - _ = terminate => { - tracing::info!("received SIGTERM, starting graceful shutdown"); - } - } -}