Files
gitks/main.rs
T

397 lines
13 KiB
Rust

//! Copyright (c) 2022-2026 GitDataAi All rights reserved.
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use gitks::disk_cache::DiskCache;
use gitks::hooks::HookManager;
use gitks::metrics;
use gitks::server::{GitksService, serve_with_shutdown};
use etcd_client::{Client, PutOptions};
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
const DEFAULT_HOST: &str = "0.0.0.0";
const DEFAULT_PORT: &str = "50051";
const DEFAULT_STORAGE_NAME: &str = "default";
/// etcd-backed config reader. Priority: etcd > env > default.
struct EtcdConfig {
client: Arc<Mutex<Client>>,
prefix: String,
}
impl EtcdConfig {
async fn connect(endpoints: Vec<String>, prefix: &str) -> Result<Self, String> {
let client = Client::connect(endpoints, None)
.await
.map_err(|e| format!("etcd connect: {e}"))?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
prefix: prefix.to_string(),
})
}
/// Get config: etcd first, env second, default last.
async fn get(&self, key: &str, default: &str) -> String {
let etcd_key = format!("{}config/{}", self.prefix, key);
if let Ok(mut c) = self.client.try_lock()
&& let Ok(resp) = c.get(etcd_key.as_str(), None).await
&& let Some(kv) = resp.kvs().first()
&& let Ok(v) = kv.value_str()
&& !v.is_empty()
{
return v.to_string();
}
std::env::var(key).unwrap_or_else(|_| default.to_string())
}
/// Register this service under the common prefix for discovery by other services.
async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> {
let instance_id = uuid::Uuid::now_v7().to_string();
let addr = addr.to_string();
let key = format!("{}services/{}/{}", self.prefix, service_name, instance_id);
let instance =
serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")});
let value = serde_json::to_string(&instance).map_err(|e| format!("json: {e}"))?;
let lease = {
let mut c = self.client.lock().await;
c.lease_grant(15, None)
.await
.map_err(|e| format!("lease: {e}"))?
};
{
let mut c = self.client.lock().await;
let opts = PutOptions::new().with_lease(lease.id());
c.put(key.clone(), value, Some(opts))
.await
.map_err(|e| format!("put: {e}"))?;
}
tracing::info!(service = service_name, addr = %addr, "registered in etcd");
let c = self.client.clone();
tokio::spawn(async move {
loop {
let r = {
let mut cl = c.lock().await;
cl.lease_keep_alive(lease.id()).await
};
drop(r);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if let Ok(lr) = {
let mut cl = c.lock().await;
cl.lease_grant(15, None).await
} {
let inst = serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")});
if let Ok(v) = serde_json::to_string(&inst) {
let mut cl = c.lock().await;
let _ = cl
.put(key.clone(), v, Some(PutOptions::new().with_lease(lr.id())))
.await;
}
}
}
});
Ok(())
}
}
fn env_or(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|_| default.into())
}
fn env_bool(key: &str, default: bool) -> bool {
match std::env::var(key).as_deref() {
Ok("true" | "1" | "yes") => true,
Ok("false" | "0" | "no") => false,
_ => default,
}
}
fn env_u64(key: &str, default: u64) -> u64 {
std::env::var(key)
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(default)
}
fn init_tracing() -> Option<tracing_appender::non_blocking::WorkerGuard> {
let env_filter = EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"));
let log_format = env_or("GITKS_LOG_FORMAT", "pretty");
let fmt_layer = match log_format.as_str() {
"json" => fmt::layer()
.json()
.with_target(true)
.with_file(true)
.with_line_number(true)
.with_thread_ids(true)
.with_span_events(fmt::format::FmtSpan::NEW | fmt::format::FmtSpan::CLOSE)
.boxed(),
_ => fmt::layer()
.pretty()
.with_target(true)
.with_file(true)
.with_line_number(true)
.boxed(),
};
if let Ok(log_dir) = std::env::var("GITKS_LOG_DIR") {
let rotation = match env_or("GITKS_LOG_ROTATION", "daily").as_str() {
"hourly" => tracing_appender::rolling::Rotation::HOURLY,
"never" => tracing_appender::rolling::Rotation::NEVER,
_ => tracing_appender::rolling::Rotation::DAILY,
};
let retention = env_u64("GITKS_LOG_RETENTION", 7) as usize;
let mut builder = tracing_appender::rolling::Builder::new()
.rotation(rotation)
.filename_prefix("gitks")
.filename_suffix("log");
if retention > 0 {
builder = builder.max_log_files(retention);
}
let file_appender = match builder.build(&log_dir) {
Ok(file_appender) => file_appender,
Err(err) => {
eprintln!("failed to create log directory '{log_dir}': {err}");
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
return None;
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let file_layer = fmt::layer()
.json()
.with_target(true)
.with_file(true)
.with_line_number(true)
.with_writer(non_blocking)
.with_filter(EnvFilter::new("info"))
.boxed();
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.with(file_layer)
.init();
Some(guard)
} else {
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
None
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
dotenvy::dotenv().ok();
let _log_guard = init_tracing();
tracing::info!(
version = env!("CARGO_PKG_VERSION"),
log_format = %env_or("GITKS_LOG_FORMAT", "pretty"),
"gitks starting up"
);
let host = env_or("GITKS_HOST", DEFAULT_HOST);
let port = env_or("GITKS_PORT", DEFAULT_PORT);
let storage_name = env_or("STORAGE_NAME", DEFAULT_STORAGE_NAME);
let etcd_endpoints: Vec<String> = std::env::var("GITKS_ETCD_ENDPOINTS")
.ok()
.filter(|s| !s.is_empty())
.map(|s| s.split(',').map(str::trim).map(String::from).collect())
.unwrap_or_else(|| vec!["http://localhost:2379".to_string()]);
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string());
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await.ok();
let host = if let Some(ref e) = etcd {
e.get("GITKS_HOST", &host).await
} else {
host
};
let port = if let Some(ref e) = etcd {
e.get("GITKS_PORT", &port).await
} else {
port
};
let storage_name = if let Some(ref e) = etcd {
e.get("GITKS_STORAGE_NAME", &storage_name).await
} else {
storage_name
};
let grpc_addr =
std::env::var("GITKS_ADVERTISE_ADDR").unwrap_or_else(|_| format!("http://{host}:{port}"));
if let Some(ref e) = etcd {
let addr_str = format!("{host}:{port}");
e.register("gitks", &addr_str).await.ok();
}
let repo_prefix = std::env::var("REPO_PREFIX_PATH")
.map_err(|_| "REPO_PREFIX_PATH environment variable is required (e.g. /data/repos)")?;
let repo_prefix = PathBuf::from(&repo_prefix);
if !repo_prefix.is_absolute() {
return Err("REPO_PREFIX_PATH must be an absolute path".into());
}
if !repo_prefix.exists() {
tracing::info!(path = %repo_prefix.display(), "creating repo prefix directory");
std::fs::create_dir_all(&repo_prefix)?;
}
let disk_cache_enabled = env_bool("GITKS_DISK_CACHE_ENABLED", false);
let disk_cache_max_age = env_u64("GITKS_DISK_CACHE_MAX_AGE", 300);
let disk_cache = DiskCache::new(
repo_prefix.clone(),
env!("CARGO_PKG_VERSION").to_string(),
disk_cache_max_age,
disk_cache_enabled,
);
if disk_cache_enabled {
tracing::info!(max_age_secs = disk_cache_max_age, "disk cache enabled");
disk_cache.cleanup_on_startup()?;
gitks::disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300));
} else {
tracing::info!("disk cache disabled");
}
let pack_cache_enabled = env_bool("GITKS_PACK_CACHE_ENABLED", false);
let pack_backpressure = env_bool("GITKS_PACK_CACHE_BACKPRESSURE", true);
let pack_cache = if disk_cache_enabled {
tracing::info!(
pack_objects_cache = pack_cache_enabled,
backpressure = pack_backpressure,
"pack cache wrapper enabled"
);
Some(gitks::pack_cache::PackCache::new(
disk_cache.clone(),
pack_backpressure,
))
} else {
None
};
let hooks_enabled = env_bool("GITKS_HOOKS_ENABLED", true);
let server_hooks_dir = std::env::var("GITKS_SERVER_HOOKS_DIR")
.ok()
.map(PathBuf::from);
let hook_callback_addr = std::env::var("GITKS_HOOK_CALLBACK_ADDR").ok();
let hook_timeout = env_u64("GITKS_HOOK_TIMEOUT", 30);
let allow_custom_hooks = env_bool("GITKS_ALLOW_CUSTOM_HOOKS", true);
let hook_manager = if hooks_enabled {
tracing::info!(
timeout_secs = hook_timeout,
custom_hooks = allow_custom_hooks,
"hooks enabled"
);
Some(HookManager::new(
repo_prefix.clone(),
server_hooks_dir,
hook_callback_addr,
Duration::from_secs(hook_timeout),
allow_custom_hooks,
))
} else {
tracing::info!("hooks disabled");
None
};
let metrics_port = env_u64("GITKS_METRICS_PORT", 9100) as u16;
let http_cancel = tokio_util::sync::CancellationToken::new();
metrics::set_http_cancel_token(http_cancel.clone());
let _metrics_handle = metrics::start_metrics_server(metrics_port);
tracing::info!(port = metrics_port, "metrics server started");
let _semaphore_cleanup = gitks::rate_limit::start_semaphore_cleanup_task();
let slow_request_threshold = env_u64("GITKS_SLOW_REQUEST_THRESHOLD_MS", 5000);
metrics::set_slow_request_threshold(slow_request_threshold);
tracing::info!(
threshold_ms = slow_request_threshold,
"slow request detection configured"
);
let addr: std::net::SocketAddr = format!("{host}:{port}").parse()?;
let mut svc = GitksService::new(repo_prefix.clone());
if disk_cache_enabled {
svc = svc.with_disk_cache(disk_cache);
}
if let Some(pc) = pack_cache {
svc = svc.with_pack_cache(pc);
}
if let Some(hm) = hook_manager {
svc = svc.with_hook_manager(hm);
}
let svc = svc.with_grpc_addr(grpc_addr.clone());
tracing::info!(
addr = %addr,
repo_prefix = %repo_prefix.display(),
storage = %storage_name,
advertise = %grpc_addr,
"starting gitks gRPC server"
);
metrics::set_ready(true);
serve_with_shutdown(addr, svc, shutdown_signal()).await?;
metrics::set_ready(false);
http_cancel.cancel();
tracing::info!("gitks shut down complete");
Ok(())
}
/// Resolves when the process receives SIGTERM or SIGINT (Ctrl+C).
async fn shutdown_signal() {
let ctrl_c = async {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
};
#[cfg(unix)]
let terminate = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("failed to install SIGTERM handler")
.recv()
.await;
};
#[cfg(not(unix))]
let terminate = std::future::pending::<()>();
tokio::select! {
_ = ctrl_c => {
tracing::info!("received Ctrl+C, starting graceful shutdown");
}
_ = terminate => {
tracing::info!("received SIGTERM, starting graceful shutdown");
}
}
}