use std::path::PathBuf; use std::time::Duration; use gitks::actor::init_actor_cluster; use gitks::cluster::{ClusterConfig, ClusterManager}; use gitks::disk_cache::DiskCache; use gitks::hooks::HookManager; use gitks::metrics; use gitks::server::{GitksService, serve}; const DEFAULT_HOST: &str = "0.0.0.0"; const DEFAULT_PORT: &str = "50051"; const DEFAULT_STORAGE_NAME: &str = "default"; fn env_or(key: &str, default: &str) -> String { std::env::var(key).unwrap_or_else(|_| default.into()) } fn env_bool(key: &str, default: bool) -> bool { match std::env::var(key).as_deref() { Ok("true" | "1" | "yes") => true, Ok("false" | "0" | "no") => false, Ok(_) => default, Err(_) => default, } } fn env_u64(key: &str, default: u64) -> u64 { std::env::var(key) .ok() .and_then(|v| v.parse().ok()) .unwrap_or(default) } #[tokio::main] async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); tracing_subscriber::fmt().init(); tracing::info!(version = env!("CARGO_PKG_VERSION"), "gitks starting up"); let host = env_or("GITKS_HOST", DEFAULT_HOST); let port = env_or("GITKS_PORT", DEFAULT_PORT); let storage_name = env_or("STORAGE_NAME", DEFAULT_STORAGE_NAME); let grpc_addr = std::env::var("GITKS_ADVERTISE_ADDR").unwrap_or_else(|_| format!("http://{host}:{port}")); let repo_prefix = std::env::var("REPO_PREFIX_PATH") .map_err(|_| "REPO_PREFIX_PATH environment variable is required (e.g. /data/repos)")?; let repo_prefix = PathBuf::from(&repo_prefix); if !repo_prefix.is_absolute() { return Err("REPO_PREFIX_PATH must be an absolute path".into()); } if !repo_prefix.exists() { tracing::info!(path = %repo_prefix.display(), "creating repo prefix directory"); std::fs::create_dir_all(&repo_prefix)?; } // Disk cache configuration let disk_cache_enabled = env_bool("GITKS_DISK_CACHE_ENABLED", false); let disk_cache_max_age = env_u64("GITKS_DISK_CACHE_MAX_AGE", 300); let disk_cache = DiskCache::new( repo_prefix.clone(), env!("CARGO_PKG_VERSION").to_string(), disk_cache_max_age, disk_cache_enabled, ); if disk_cache_enabled { tracing::info!("disk cache enabled, max_age={disk_cache_max_age}s"); disk_cache.cleanup_on_startup()?; gitks::disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300)); } else { tracing::info!("disk cache disabled"); } // Pack cache configuration let pack_cache_enabled = env_bool("GITKS_PACK_CACHE_ENABLED", false); let pack_backpressure = env_bool("GITKS_PACK_CACHE_BACKPRESSURE", true); // Pack cache: needs disk_cache. If disk_cache is enabled, info/refs cache // is always available via PackCache wrapper. pack-objects caching is // additionally controlled by GITKS_PACK_CACHE_ENABLED. let pack_cache = if disk_cache_enabled { tracing::info!( "pack cache wrapper enabled, pack-objects cache={pack_cache_enabled}, backpressure={pack_backpressure}" ); Some(gitks::pack_cache::PackCache::new( disk_cache.clone(), pack_backpressure, )) } else { None }; // Hook manager configuration let hooks_enabled = env_bool("GITKS_HOOKS_ENABLED", true); let server_hooks_dir = std::env::var("GITKS_SERVER_HOOKS_DIR") .ok() .map(PathBuf::from); let hook_callback_addr = std::env::var("GITKS_HOOK_CALLBACK_ADDR").ok(); let hook_timeout = env_u64("GITKS_HOOK_TIMEOUT", 30); let allow_custom_hooks = env_bool("GITKS_ALLOW_CUSTOM_HOOKS", true); let hook_manager = if hooks_enabled { tracing::info!("hooks enabled, timeout={hook_timeout}s, custom_hooks={allow_custom_hooks}"); Some(HookManager::new( repo_prefix.clone(), server_hooks_dir, hook_callback_addr, Duration::from_secs(hook_timeout), allow_custom_hooks, )) } else { tracing::info!("hooks disabled"); None }; // Health check / election configuration let health_check_interval = env_u64("GITKS_HEALTH_CHECK_INTERVAL", 1); let max_health_failures = env_u64("GITKS_MAX_HEALTH_FAILURES", 10); tracing::info!( "health check: interval={health_check_interval}s, max_failures={max_health_failures}" ); let metrics_port = env_u64("GITKS_METRICS_PORT", 9100) as u16; let _metrics_handle = metrics::start_metrics_server(metrics_port); tracing::info!("metrics server on port {metrics_port}"); // // When GITKS_ETCD_ENDPOINTS is set, the node: // 1. Starts a ractor_cluster NodeServer (TCP listener) // 2. Connects to etcd and registers itself // 3. Discovers existing peers → establishes ractor_cluster TCP connections // 4. Watches etcd for future peer join/leave events // // Once ractor_cluster connections are up, pg::get_members() automatically // returns remote actors — no changes needed in actor/handler.rs. // // When GITKS_ETCD_ENDPOINTS is unset or etcd is unreachable, the node // falls back to standalone mode (existing local-only behavior). let etcd_endpoints = std::env::var("GITKS_ETCD_ENDPOINTS") .ok() .filter(|s| !s.is_empty()) .map(|s| { s.split(',') .map(str::trim) .map(String::from) .collect::>() }); let cluster_port = env_or("GITKS_CLUSTER_PORT", "4697") .parse::() .unwrap_or(4697); let cluster_cookie = env_or("GITKS_CLUSTER_COOKIE", "gitks-default-cookie"); let lease_ttl = env_u64("GITKS_LEASE_TTL", 15) as i64; let connect_timeout_ms = env_u64("GITKS_ETCD_CONNECT_TIMEOUT", 5000); // Resolve the hostname/address other nodes use to reach our NodeServer. // Priority: GITKS_CLUSTER_HOSTNAME > POD_IP (K8s) > HOSTNAME env > "localhost" let cluster_hostname = std::env::var("GITKS_CLUSTER_HOSTNAME") .or_else(|_| std::env::var("POD_IP")) .or_else(|_| std::env::var("HOSTNAME")) .unwrap_or_else(|_| "localhost".to_string()); let _cluster: Option = if let Some(endpoints) = etcd_endpoints { tracing::info!( endpoints = ?endpoints, cluster_port = cluster_port, cluster_hostname = %cluster_hostname, "starting cluster discovery via etcd" ); let config = ClusterConfig { etcd_endpoints: endpoints, storage_name: storage_name.clone(), grpc_addr: grpc_addr.clone(), cluster_port, cookie: cluster_cookie, lease_ttl_secs: lease_ttl, connect_timeout_ms, cluster_hostname, }; match ClusterManager::start(config).await { Ok(cm) => { tracing::info!("cluster discovery active"); Some(cm) } Err(e) => { tracing::warn!(error = %e, "etcd unavailable, running in standalone mode"); None } } } else { tracing::info!("GITKS_ETCD_ENDPOINTS not set, running in standalone mode"); None }; let addr: std::net::SocketAddr = format!("{host}:{port}").parse()?; let mut svc = GitksService::new(repo_prefix.clone()); if disk_cache_enabled { svc = svc.with_disk_cache(disk_cache); } if let Some(pc) = pack_cache { svc = svc.with_pack_cache(pc); } if let Some(hm) = hook_manager { svc = svc.with_hook_manager(hm); } let (node_actor, node_handle) = init_actor_cluster(svc.clone(), storage_name.clone(), grpc_addr.clone()).await?; let svc = svc .with_actor(node_actor.clone()) .with_grpc_addr(grpc_addr.clone()); tracing::info!( "starting gitks gRPC server on {addr}, repo prefix: {}, storage: {storage_name}, advertise: {grpc_addr}", repo_prefix.display() ); serve(addr, svc).await?; node_actor.stop(None); node_handle.await?; tracing::info!("gitks shut down"); Ok(()) }