//! Disk-based cache infrastructure for GitKS. //! //! Implements the Gitaly-inspired diskcache design: //! - Per-repository state directory with `latest` (repo state hash) and `pending/` (lease files) //! - Cache key = SHA256(latest + request digest + version) //! - Cached responses stored at `${CACHE_DIR}/${digest:0:2}/${digest:2}` //! - Lease-based invalidation: mutator RPCs create lease files, update `latest` on completion //! - Background cleanup of stale cache entries use std::path::{Path, PathBuf}; use std::time::{Duration, SystemTime}; use sha2; use crate::error::{GitError, GitResult}; /// Lease stale threshold: leases older than this are considered stale. const LEASE_STALE_THRESHOLD_SECS: u64 = 30; /// State directory relative path under repo prefix. const STATE_DIR_RELATIVE: &str = "+gitks-cache/state"; /// Cache directory relative path under repo prefix. const CACHE_DIR_RELATIVE: &str = "+gitks-cache/cache"; /// Info-refs cache directory relative path under repo prefix. const INFO_REFS_DIR_RELATIVE: &str = "+gitks-cache/info_refs"; fn random_value() -> String { use std::fmt::Write; use std::sync::atomic::{AtomicU64, Ordering}; use std::time::{SystemTime, UNIX_EPOCH}; let nanos = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_nanos() as u64; static COUNTER: AtomicU64 = AtomicU64::new(0); let counter = COUNTER.fetch_add(1, Ordering::Relaxed); let mut buf = [0u8; 16]; buf[..8].copy_from_slice(&nanos.to_le_bytes()); buf[8..].copy_from_slice(&counter.to_le_bytes()); let mut s = String::with_capacity(32); for byte in &buf { let _ = write!(s, "{byte:02x}"); } s } /// Compute SHA256 digest from multiple input parts. fn sha256_digest(parts: &[&str]) -> String { use sha2::Digest; let mut hasher = sha2::Sha256::new(); for part in parts { hasher.update(part.as_bytes()); } let result = hasher.finalize(); let mut s = String::with_capacity(64); for byte in result { use std::fmt::Write; let _ = write!(s, "{byte:02x}"); } s } /// Convert a digest into a two-level file path: `${digest:0:2}/${digest:2}`. pub fn digest_to_path(digest: &str) -> PathBuf { match (digest.get(..2), digest.get(2..)) { (Some(prefix), Some(rest)) => PathBuf::from(prefix).join(rest), _ => PathBuf::from(digest), } } /// DiskCache manages per-repository state and cached response files on local disk. #[derive(Debug)] pub struct DiskCache { pub repo_prefix: PathBuf, max_age: Duration, version: String, enabled: bool, } impl DiskCache { /// Create a new DiskCache. pub fn new(repo_prefix: PathBuf, version: String, max_age_secs: u64, enabled: bool) -> Self { Self { repo_prefix, max_age: Duration::from_secs(max_age_secs), version, enabled, } } /// Whether the cache is enabled. pub fn is_enabled(&self) -> bool { self.enabled } fn state_dir_for(&self, relative_path: &str) -> PathBuf { self.repo_prefix .join(STATE_DIR_RELATIVE) .join(relative_path) } fn latest_path_for(&self, relative_path: &str) -> PathBuf { self.state_dir_for(relative_path).join("latest") } fn pending_dir_for(&self, relative_path: &str) -> PathBuf { self.state_dir_for(relative_path).join("pending") } fn cache_dir(&self, namespace: &str) -> PathBuf { self.repo_prefix.join(namespace) } fn cache_file_path(&self, namespace: &str, digest: &str) -> PathBuf { self.cache_dir(namespace).join(digest_to_path(digest)) } /// Ensure the state directory for a repository exists and has a `latest` file. /// If `latest` does not exist, create it atomically with a random value. pub fn ensure_state(&self, relative_path: &str) -> GitResult { if !self.enabled { return Ok(random_value()); } let state_dir = self.state_dir_for(relative_path); std::fs::create_dir_all(&state_dir).map_err(GitError::Io)?; let pending_dir = self.pending_dir_for(relative_path); std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?; let latest_path = self.latest_path_for(relative_path); if latest_path.exists() { let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?; return Ok(val.trim().to_string()); } // Atomic write: create temp file, then rename into place let val = random_value(); let tmp_path = latest_path.with_extension("tmp"); std::fs::write(&tmp_path, &val).map_err(GitError::Io)?; std::fs::rename(&tmp_path, &latest_path).map_err(GitError::Io)?; Ok(val) } /// Create a lease file for a mutating RPC. /// Returns a `LeaseGuard` that removes the lease on drop and updates `latest`. pub fn create_lease(&self, relative_path: &str) -> GitResult { if !self.enabled { let rp = relative_path.to_string(); return Ok(LeaseGuard { cache: self.clone(), relative_path: rp, lease_path: PathBuf::new(), is_dummy: true, }); } let pending_dir = self.pending_dir_for(relative_path); std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?; let lease_name = random_value(); let lease_path = pending_dir.join(&lease_name); std::fs::write(&lease_path, &lease_name).map_err(GitError::Io)?; tracing::debug!( relative_path = %relative_path, lease = %lease_name, "lease created" ); Ok(LeaseGuard { cache: self.clone(), relative_path: relative_path.to_string(), lease_path, is_dummy: false, }) } /// Check if a repository is in a deterministic state (no active leases). /// Returns the `latest` value if deterministic, or None if indeterminate. pub fn get_repo_state(&self, relative_path: &str) -> GitResult> { if !self.enabled { return Ok(Some(random_value())); } self.cleanup_stale_leases(relative_path)?; let pending_dir = self.pending_dir_for(relative_path); if pending_dir.exists() { let entries = std::fs::read_dir(&pending_dir).map_err(GitError::Io)?; let count = entries.count(); if count > 0 { tracing::debug!( relative_path = %relative_path, pending = count, "repo has in-flight mutator, cache state indeterminate" ); return Ok(None); } } let latest_path = self.latest_path_for(relative_path); if latest_path.exists() { let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?; Ok(Some(val.trim().to_string())) } else { // No latest file → create one Ok(Some(self.ensure_state(relative_path)?)) } } /// Remove stale lease files (older than LEASE_STALE_THRESHOLD_SECS). fn cleanup_stale_leases(&self, relative_path: &str) -> GitResult<()> { let pending_dir = self.pending_dir_for(relative_path); if !pending_dir.exists() { return Ok(()); } let now = SystemTime::now(); let threshold = Duration::from_secs(LEASE_STALE_THRESHOLD_SECS); for entry in std::fs::read_dir(&pending_dir).map_err(GitError::Io)? { let entry = entry.map_err(GitError::Io)?; let path = entry.path(); if let Ok(metadata) = entry.metadata() && let Ok(modified) = metadata.modified() && let Ok(age) = now.duration_since(modified) && age > threshold { tracing::warn!( path = %path.display(), age_secs = age.as_secs(), "removing stale lease file" ); std::fs::remove_file(&path).ok(); } } Ok(()) } /// Compute a cache key for an info/refs request. pub fn compute_info_refs_key(&self, relative_path: &str, protocol: &str) -> GitResult { let latest = self.ensure_state(relative_path)?; let parts: &[&str] = &[&latest, "advertise_refs", protocol, &self.version]; Ok(sha256_digest(parts)) } /// Compute a cache key for a pack-objects request. pub fn compute_pack_objects_key( &self, relative_path: &str, wants_hex: &[String], haves_hex: &[String], thin_pack: bool, use_bitmaps: bool, delta_base_offset: bool, ) -> GitResult { let latest = self.ensure_state(relative_path)?; // Sort wants and haves for deterministic key let mut wants_sorted = wants_hex.to_vec(); wants_sorted.sort(); let mut haves_sorted = haves_hex.to_vec(); haves_sorted.sort(); let wants_str = wants_sorted.join(","); let haves_str = haves_sorted.join(","); let flags = format!("thin={thin_pack},bitmaps={use_bitmaps},dbo={delta_base_offset}"); let parts: &[&str] = &[ &latest, "pack_objects", &wants_str, &haves_str, &flags, &self.version, ]; Ok(sha256_digest(parts)) } /// Look up a cached response for the given namespace and digest. /// Returns the cached bytes if found and not expired. pub fn lookup(&self, namespace: &str, digest: &str) -> GitResult>> { if !self.enabled { return Ok(None); } let start = std::time::Instant::now(); let path = self.cache_file_path(namespace, digest); // Check expiry before reading, but handle concurrent deletion gracefully. if let Ok(metadata) = std::fs::metadata(&path) && let Ok(modified) = metadata.modified() && let Ok(age) = SystemTime::now().duration_since(modified) && age > self.max_age { tracing::debug!( path = %path.display(), age_secs = age.as_secs(), "cached entry expired, removing" ); std::fs::remove_file(&path).ok(); if let Some(parent) = path.parent() { std::fs::remove_dir(parent).ok(); } crate::metrics::record_cache_op("disk", "expired", start.elapsed()); return Ok(None); } match std::fs::read(&path) { Ok(data) => { tracing::debug!( namespace = %namespace, digest = %digest, size = data.len(), elapsed_ms = start.elapsed().as_millis() as u64, "cache hit" ); crate::metrics::record_cache_op("disk", "hit", start.elapsed()); Ok(Some(data)) } Err(e) if e.kind() == std::io::ErrorKind::NotFound => { // File was deleted between metadata check and read — treat as miss. crate::metrics::record_cache_op("disk", "miss", start.elapsed()); Ok(None) } Err(e) => Err(GitError::Io(e)), } } /// Insert a cached response for the given namespace and digest. pub fn insert(&self, namespace: &str, digest: &str, data: &[u8]) -> GitResult<()> { if !self.enabled { return Ok(()); } let start = std::time::Instant::now(); let path = self.cache_file_path(namespace, digest); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(GitError::Io)?; } let tmp_path = path.with_extension("tmp"); std::fs::write(&tmp_path, data).map_err(GitError::Io)?; std::fs::rename(&tmp_path, &path).map_err(GitError::Io)?; tracing::debug!( namespace = %namespace, digest = %digest, size = data.len(), elapsed_ms = start.elapsed().as_millis() as u64, "cache entry written" ); crate::metrics::record_cache_op("disk", "write", start.elapsed()); Ok(()) } /// Open a cache file for streaming read. pub fn open_stream_read( &self, namespace: &str, digest: &str, ) -> GitResult> { if !self.enabled { return Ok(None); } let path = self.cache_file_path(namespace, digest); // Check expiry; handle concurrent deletion gracefully. if let Ok(metadata) = std::fs::metadata(&path) && let Ok(modified) = metadata.modified() && let Ok(age) = SystemTime::now().duration_since(modified) && age > self.max_age { std::fs::remove_file(&path).ok(); return Ok(None); } match std::fs::File::open(&path) { Ok(file) => Ok(Some(file)), Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), Err(e) => Err(GitError::Io(e)), } } /// Open a cache file for streaming write. /// Returns the file handle and the final path. pub fn open_stream_write( &self, namespace: &str, digest: &str, ) -> GitResult<(std::fs::File, PathBuf)> { if !self.enabled { return Err(GitError::Internal("disk cache not enabled".into())); } let path = self.cache_file_path(namespace, digest); if let Some(parent) = path.parent() { std::fs::create_dir_all(parent).map_err(GitError::Io)?; } let tmp_path = path.with_extension("tmp_streaming"); let file = std::fs::File::create(&tmp_path).map_err(GitError::Io)?; Ok((file, path)) } /// Finalize a streaming write by renaming the temp file to the final path. pub fn finalize_stream_write(&self, tmp_path: &Path, final_path: &Path) -> GitResult<()> { let actual_tmp = tmp_path.with_extension("tmp_streaming"); if actual_tmp.exists() { std::fs::rename(&actual_tmp, final_path).map_err(GitError::Io)?; } tracing::debug!( path = %final_path.display(), "streaming cache entry finalized" ); Ok(()) } /// Invalidate all cache entries for a repository by updating the `latest` file. /// This is called after any mutator RPC (create branch, create commit, etc.) pub fn invalidate_repo(&self, relative_path: &str) { if !self.enabled { return; } let latest_path = self.latest_path_for(relative_path); let new_val = random_value(); if let Err(e) = std::fs::write(&latest_path, &new_val) { tracing::warn!( relative_path = %relative_path, error = %e, "failed to update latest for cache invalidation" ); } else { tracing::debug!( relative_path = %relative_path, new_state = %new_val, "cache invalidated for repository" ); } } /// Remove all cache entries on startup (guard against inconsistent state from previous run). pub fn cleanup_on_startup(&self) -> GitResult<()> { if !self.enabled { return Ok(()); } for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] { let dir = self.repo_prefix.join(namespace); if dir.exists() { tracing::info!(dir = %dir.display(), "cleaning cache directory on startup"); std::fs::remove_dir_all(&dir).map_err(GitError::Io)?; } } Ok(()) } /// Background cleanup: remove expired cache entries. /// Should be called periodically (e.g., every 5 minutes). pub fn cleanup_expired(&self) -> GitResult { if !self.enabled { return Ok(0); } let now = SystemTime::now(); let mut removed = 0u64; for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] { let dir = self.repo_prefix.join(namespace); if !dir.exists() { continue; } let prefix_iter = match std::fs::read_dir(&dir) { Ok(iter) => iter, Err(_) => continue, }; for prefix_entry in prefix_iter { let prefix_dir = match prefix_entry { Ok(e) => e.path(), Err(_) => continue, }; if !prefix_dir.is_dir() { continue; } // Process all entries in this prefix directory let entries = match std::fs::read_dir(&prefix_dir) { Ok(iter) => iter, Err(_) => continue, }; let mut prefix_empty = true; for entry in entries { let path = match entry { Ok(e) => e.path(), Err(_) => continue, }; let expired = match std::fs::metadata(&path) { Ok(meta) => meta .modified() .ok() .and_then(|mtime| now.duration_since(mtime).ok()) .is_some_and(|age| age > self.max_age), Err(_) => false, }; if expired { tracing::debug!( path = %path.display(), "removing expired cache entry" ); std::fs::remove_file(&path).ok(); removed += 1; } else { prefix_empty = false; } } // Remove empty prefix directory if prefix_empty { std::fs::remove_dir(&prefix_dir).ok(); } } } if removed > 0 { tracing::info!( entries_removed = removed, "expired cache entries cleaned up" ); } Ok(removed) } } impl Clone for DiskCache { fn clone(&self) -> Self { Self { repo_prefix: self.repo_prefix.clone(), max_age: self.max_age, version: self.version.clone(), enabled: self.enabled, } } } /// A lease guard that removes the lease file on drop and updates `latest`. pub struct LeaseGuard { cache: DiskCache, relative_path: String, lease_path: PathBuf, is_dummy: bool, } impl LeaseGuard { /// Update the `latest` file to invalidate cached responses. /// Called automatically on drop, but can also be called manually. pub fn commit(&mut self) { if self.is_dummy { self.cache.invalidate_repo(&self.relative_path); self.is_dummy = false; // Don't double-commit on drop return; } if self.lease_path.exists() { std::fs::remove_file(&self.lease_path).ok(); } self.cache.invalidate_repo(&self.relative_path); self.is_dummy = false; } } impl Drop for LeaseGuard { fn drop(&mut self) { if self.is_dummy || self.lease_path.exists() { self.commit(); } } } /// Start a background task that periodically cleans up expired cache entries. pub fn start_cache_cleanup_task( cache: DiskCache, interval: Duration, ) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { let mut ticker = tokio::time::interval(interval); ticker.tick().await; // First tick is immediate loop { ticker.tick().await; if let Err(e) = cache.cleanup_expired() { tracing::warn!(error = %e, "cache cleanup task failed"); } } }) }