Files
gitks/disk_cache.rs
T

536 lines
18 KiB
Rust

//! Disk-based cache infrastructure for GitKS.
//!
//! Implements the Gitaly-inspired diskcache design:
//! - Per-repository state directory with `latest` (repo state hash) and `pending/` (lease files)
//! - Cache key = SHA256(latest + request digest + version)
//! - Cached responses stored at `${CACHE_DIR}/${digest:0:2}/${digest:2}`
//! - Lease-based invalidation: mutator RPCs create lease files, update `latest` on completion
//! - Background cleanup of stale cache entries
use std::path::{Path, PathBuf};
use std::time::{Duration, SystemTime};
use sha2;
use crate::error::{GitError, GitResult};
/// Lease stale threshold: leases older than this are considered stale.
const LEASE_STALE_THRESHOLD_SECS: u64 = 30;
/// State directory relative path under repo prefix.
const STATE_DIR_RELATIVE: &str = "+gitks-cache/state";
/// Cache directory relative path under repo prefix.
const CACHE_DIR_RELATIVE: &str = "+gitks-cache/cache";
/// Info-refs cache directory relative path under repo prefix.
const INFO_REFS_DIR_RELATIVE: &str = "+gitks-cache/info_refs";
fn random_value() -> String {
use std::fmt::Write;
use std::sync::atomic::{AtomicU64, Ordering};
let mut buf = [0u8; 16];
let nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos() as u64;
buf[..8].copy_from_slice(&nanos.to_le_bytes());
static COUNTER: AtomicU64 = AtomicU64::new(0);
let c = COUNTER.fetch_add(1, Ordering::Relaxed);
buf[8..].copy_from_slice(&c.to_le_bytes());
let mut s = String::with_capacity(32);
for byte in &buf {
write!(s, "{byte:02x}").unwrap();
}
s
}
/// Compute SHA256 digest from multiple input parts.
fn sha256_digest(parts: &[&str]) -> String {
use sha2::Digest;
let mut hasher = sha2::Sha256::new();
for part in parts {
hasher.update(part.as_bytes());
}
let result = hasher.finalize();
let mut s = String::with_capacity(64);
for byte in result {
use std::fmt::Write;
write!(s, "{byte:02x}").unwrap();
}
s
}
/// Convert a digest into a two-level file path: `${digest:0:2}/${digest:2}`.
pub fn digest_to_path(digest: &str) -> PathBuf {
let prefix = &digest[..2];
let rest = &digest[2..];
PathBuf::from(prefix).join(rest)
}
/// DiskCache manages per-repository state and cached response files on local disk.
#[derive(Debug)]
pub struct DiskCache {
pub repo_prefix: PathBuf,
max_age: Duration,
version: String,
enabled: bool,
}
impl DiskCache {
/// Create a new DiskCache.
pub fn new(repo_prefix: PathBuf, version: String, max_age_secs: u64, enabled: bool) -> Self {
Self {
repo_prefix,
max_age: Duration::from_secs(max_age_secs),
version,
enabled,
}
}
/// Whether the cache is enabled.
pub fn is_enabled(&self) -> bool {
self.enabled
}
fn state_dir_for(&self, relative_path: &str) -> PathBuf {
self.repo_prefix
.join(STATE_DIR_RELATIVE)
.join(relative_path)
}
fn latest_path_for(&self, relative_path: &str) -> PathBuf {
self.state_dir_for(relative_path).join("latest")
}
fn pending_dir_for(&self, relative_path: &str) -> PathBuf {
self.state_dir_for(relative_path).join("pending")
}
fn cache_dir(&self, namespace: &str) -> PathBuf {
self.repo_prefix.join(namespace)
}
fn cache_file_path(&self, namespace: &str, digest: &str) -> PathBuf {
self.cache_dir(namespace).join(digest_to_path(digest))
}
/// Ensure the state directory for a repository exists and has a `latest` file.
/// If `latest` does not exist, create it with a random value.
pub fn ensure_state(&self, relative_path: &str) -> GitResult<String> {
if !self.enabled {
return Ok(random_value());
}
let state_dir = self.state_dir_for(relative_path);
std::fs::create_dir_all(&state_dir).map_err(GitError::Io)?;
let pending_dir = self.pending_dir_for(relative_path);
std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?;
let latest_path = self.latest_path_for(relative_path);
if latest_path.exists() {
let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?;
Ok(val.trim().to_string())
} else {
let val = random_value();
std::fs::write(&latest_path, &val).map_err(GitError::Io)?;
Ok(val)
}
}
/// Create a lease file for a mutating RPC.
/// Returns a `LeaseGuard` that removes the lease on drop and updates `latest`.
pub fn create_lease(&self, relative_path: &str) -> GitResult<LeaseGuard> {
if !self.enabled {
let rp = relative_path.to_string();
return Ok(LeaseGuard {
cache: self.clone(),
relative_path: rp,
lease_path: PathBuf::new(),
is_dummy: true,
});
}
let pending_dir = self.pending_dir_for(relative_path);
std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?;
let lease_name = random_value();
let lease_path = pending_dir.join(&lease_name);
std::fs::write(&lease_path, &lease_name).map_err(GitError::Io)?;
tracing::debug!(
relative_path = %relative_path,
lease = %lease_name,
"lease created"
);
Ok(LeaseGuard {
cache: self.clone(),
relative_path: relative_path.to_string(),
lease_path,
is_dummy: false,
})
}
/// Check if a repository is in a deterministic state (no active leases).
/// Returns the `latest` value if deterministic, or None if indeterminate.
pub fn get_repo_state(&self, relative_path: &str) -> GitResult<Option<String>> {
if !self.enabled {
return Ok(Some(random_value()));
}
self.cleanup_stale_leases(relative_path)?;
let pending_dir = self.pending_dir_for(relative_path);
if pending_dir.exists() {
let entries = std::fs::read_dir(&pending_dir).map_err(GitError::Io)?;
let count = entries.count();
if count > 0 {
tracing::debug!(
relative_path = %relative_path,
pending = count,
"repo has in-flight mutator, cache state indeterminate"
);
return Ok(None);
}
}
let latest_path = self.latest_path_for(relative_path);
if latest_path.exists() {
let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?;
Ok(Some(val.trim().to_string()))
} else {
// No latest file → create one
Ok(Some(self.ensure_state(relative_path)?))
}
}
/// Remove stale lease files (older than LEASE_STALE_THRESHOLD_SECS).
fn cleanup_stale_leases(&self, relative_path: &str) -> GitResult<()> {
let pending_dir = self.pending_dir_for(relative_path);
if !pending_dir.exists() {
return Ok(());
}
let now = SystemTime::now();
let threshold = Duration::from_secs(LEASE_STALE_THRESHOLD_SECS);
for entry in std::fs::read_dir(&pending_dir).map_err(GitError::Io)? {
let entry = entry.map_err(GitError::Io)?;
let path = entry.path();
if let Ok(metadata) = entry.metadata()
&& let Ok(modified) = metadata.modified()
&& let Ok(age) = now.duration_since(modified)
&& age > threshold
{
tracing::warn!(
path = %path.display(),
age_secs = age.as_secs(),
"removing stale lease file"
);
std::fs::remove_file(&path).ok();
}
}
Ok(())
}
/// Compute a cache key for an info/refs request.
pub fn compute_info_refs_key(&self, relative_path: &str, protocol: &str) -> GitResult<String> {
let latest = self.ensure_state(relative_path)?;
let parts: &[&str] = &[&latest, "advertise_refs", protocol, &self.version];
Ok(sha256_digest(parts))
}
/// Compute a cache key for a pack-objects request.
pub fn compute_pack_objects_key(
&self,
relative_path: &str,
wants_hex: &[String],
haves_hex: &[String],
thin_pack: bool,
use_bitmaps: bool,
delta_base_offset: bool,
) -> GitResult<String> {
let latest = self.ensure_state(relative_path)?;
// Sort wants and haves for deterministic key
let mut wants_sorted = wants_hex.to_vec();
wants_sorted.sort();
let mut haves_sorted = haves_hex.to_vec();
haves_sorted.sort();
let wants_str = wants_sorted.join(",");
let haves_str = haves_sorted.join(",");
let flags = format!("thin={thin_pack},bitmaps={use_bitmaps},dbo={delta_base_offset}");
let parts: &[&str] = &[
&latest,
"pack_objects",
&wants_str,
&haves_str,
&flags,
&self.version,
];
Ok(sha256_digest(parts))
}
/// Look up a cached response for the given namespace and digest.
/// Returns the cached bytes if found and not expired.
pub fn lookup(&self, namespace: &str, digest: &str) -> GitResult<Option<Vec<u8>>> {
if !self.enabled {
return Ok(None);
}
let start = std::time::Instant::now();
let path = self.cache_file_path(namespace, digest);
if !path.exists() {
crate::metrics::record_cache_op("disk", "miss", start.elapsed());
return Ok(None);
}
if let Ok(metadata) = std::fs::metadata(&path)
&& let Ok(modified) = metadata.modified()
&& let Ok(age) = SystemTime::now().duration_since(modified)
&& age > self.max_age
{
tracing::debug!(
path = %path.display(),
age_secs = age.as_secs(),
"cached entry expired, removing"
);
std::fs::remove_file(&path).ok();
if let Some(parent) = path.parent() {
std::fs::remove_dir(parent).ok();
}
crate::metrics::record_cache_op("disk", "expired", start.elapsed());
return Ok(None);
}
let data = std::fs::read(&path).map_err(GitError::Io)?;
tracing::debug!(
namespace = %namespace,
digest = %digest,
size = data.len(),
elapsed_ms = start.elapsed().as_millis() as u64,
"cache hit"
);
crate::metrics::record_cache_op("disk", "hit", start.elapsed());
Ok(Some(data))
}
/// Insert a cached response for the given namespace and digest.
pub fn insert(&self, namespace: &str, digest: &str, data: &[u8]) -> GitResult<()> {
if !self.enabled {
return Ok(());
}
let start = std::time::Instant::now();
let path = self.cache_file_path(namespace, digest);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(GitError::Io)?;
}
let tmp_path = path.with_extension("tmp");
std::fs::write(&tmp_path, data).map_err(GitError::Io)?;
std::fs::rename(&tmp_path, &path).map_err(GitError::Io)?;
tracing::debug!(
namespace = %namespace,
digest = %digest,
size = data.len(),
elapsed_ms = start.elapsed().as_millis() as u64,
"cache entry written"
);
crate::metrics::record_cache_op("disk", "write", start.elapsed());
Ok(())
}
/// Open a cache file for streaming read.
pub fn open_stream_read(
&self,
namespace: &str,
digest: &str,
) -> GitResult<Option<std::fs::File>> {
if !self.enabled {
return Ok(None);
}
let path = self.cache_file_path(namespace, digest);
if !path.exists() {
return Ok(None);
}
if let Ok(metadata) = std::fs::metadata(&path)
&& let Ok(modified) = metadata.modified()
&& let Ok(age) = SystemTime::now().duration_since(modified)
&& age > self.max_age
{
std::fs::remove_file(&path).ok();
return Ok(None);
}
let file = std::fs::File::open(&path).map_err(GitError::Io)?;
Ok(Some(file))
}
/// Open a cache file for streaming write.
/// Returns the file handle and the final path.
pub fn open_stream_write(
&self,
namespace: &str,
digest: &str,
) -> GitResult<(std::fs::File, PathBuf)> {
if !self.enabled {
return Err(GitError::Internal("disk cache not enabled".into()));
}
let path = self.cache_file_path(namespace, digest);
if let Some(parent) = path.parent() {
std::fs::create_dir_all(parent).map_err(GitError::Io)?;
}
let tmp_path = path.with_extension("tmp_streaming");
let file = std::fs::File::create(&tmp_path).map_err(GitError::Io)?;
Ok((file, path))
}
/// Finalize a streaming write by renaming the temp file to the final path.
pub fn finalize_stream_write(&self, tmp_path: &Path, final_path: &Path) -> GitResult<()> {
let actual_tmp = tmp_path.with_extension("tmp_streaming");
if actual_tmp.exists() {
std::fs::rename(&actual_tmp, final_path).map_err(GitError::Io)?;
}
tracing::debug!(
path = %final_path.display(),
"streaming cache entry finalized"
);
Ok(())
}
/// Invalidate all cache entries for a repository by updating the `latest` file.
/// This is called after any mutator RPC (create branch, create commit, etc.)
pub fn invalidate_repo(&self, relative_path: &str) {
if !self.enabled {
return;
}
let latest_path = self.latest_path_for(relative_path);
let new_val = random_value();
if let Err(e) = std::fs::write(&latest_path, &new_val) {
tracing::warn!(
relative_path = %relative_path,
error = %e,
"failed to update latest for cache invalidation"
);
} else {
tracing::debug!(
relative_path = %relative_path,
new_state = %new_val,
"cache invalidated for repository"
);
}
}
/// Remove all cache entries on startup (guard against inconsistent state from previous run).
pub fn cleanup_on_startup(&self) -> GitResult<()> {
if !self.enabled {
return Ok(());
}
for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] {
let dir = self.repo_prefix.join(namespace);
if dir.exists() {
tracing::info!(dir = %dir.display(), "cleaning cache directory on startup");
std::fs::remove_dir_all(&dir).map_err(GitError::Io)?;
}
}
Ok(())
}
/// Background cleanup: remove expired cache entries.
/// Should be called periodically (e.g., every 5 minutes).
pub fn cleanup_expired(&self) -> GitResult<u64> {
if !self.enabled {
return Ok(0);
}
let now = SystemTime::now();
let mut removed = 0u64;
for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] {
let dir = self.repo_prefix.join(namespace);
if !dir.exists() {
continue;
}
for prefix_entry in std::fs::read_dir(&dir).map_err(GitError::Io)? {
let prefix_entry = prefix_entry.map_err(GitError::Io)?;
let prefix_dir = prefix_entry.path();
if !prefix_dir.is_dir() {
continue;
}
for entry in std::fs::read_dir(&prefix_dir).map_err(GitError::Io)? {
let entry = entry.map_err(GitError::Io)?;
let path = entry.path();
if let Ok(metadata) = entry.metadata()
&& let Ok(modified) = metadata.modified()
&& let Ok(age) = now.duration_since(modified)
&& age > self.max_age
{
tracing::debug!(
path = %path.display(),
age_secs = age.as_secs(),
"removing expired cache entry"
);
std::fs::remove_file(&path).ok();
removed += 1;
}
}
std::fs::remove_dir(&prefix_dir).ok();
}
}
if removed > 0 {
tracing::info!(
entries_removed = removed,
"expired cache entries cleaned up"
);
}
Ok(removed)
}
}
impl Clone for DiskCache {
fn clone(&self) -> Self {
Self {
repo_prefix: self.repo_prefix.clone(),
max_age: self.max_age,
version: self.version.clone(),
enabled: self.enabled,
}
}
}
/// A lease guard that removes the lease file on drop and updates `latest`.
pub struct LeaseGuard {
cache: DiskCache,
relative_path: String,
lease_path: PathBuf,
is_dummy: bool,
}
impl LeaseGuard {
/// Update the `latest` file to invalidate cached responses.
/// Called automatically on drop, but can also be called manually.
pub fn commit(&mut self) {
if self.is_dummy {
self.cache.invalidate_repo(&self.relative_path);
self.is_dummy = false; // Don't double-commit on drop
return;
}
if self.lease_path.exists() {
std::fs::remove_file(&self.lease_path).ok();
}
self.cache.invalidate_repo(&self.relative_path);
self.is_dummy = false;
}
}
impl Drop for LeaseGuard {
fn drop(&mut self) {
if self.is_dummy || self.lease_path.exists() {
self.commit();
}
}
}
/// Start a background task that periodically cleans up expired cache entries.
pub fn start_cache_cleanup_task(
cache: DiskCache,
interval: Duration,
) -> tokio::task::JoinHandle<()> {
tokio::spawn(async move {
let mut ticker = tokio::time::interval(interval);
ticker.tick().await; // First tick is immediate
loop {
ticker.tick().await;
if let Err(e) = cache.cleanup_expired() {
tracing::warn!(error = %e, "cache cleanup task failed");
}
}
})
}