66afd932ed
- Add FindCommit, ListCommitsByOid, CommitIsAncestor RPCs to CommitService - Add CheckObjectsExist, CommitsByMessage, GetCommitStats RPCs to CommitService - Add LastCommitForPath, CountCommits, CountDivergingCommits RPCs to CommitService - Add RawDiff, RawPatch, FindChangedPaths RPCs to DiffService - Add FindMergeBase, WriteRef, SearchFilesByContent RPCs to RepositoryService - Add SearchFilesByName, ObjectsSize, RepositorySize RPCs to RepositoryService - Add FindLicense, OptimizeRepository, GetRawChanges RPCs to RepositoryService - Add FetchRemote, CreateRepositoryFromURL RPCs to RepositoryService - Implement server handlers for all new RPC methods - Add new modules for commit counting, finding, and querying features - Add new modules for diff changed paths and raw operations - Add new modules for refs and remote operations - Remove unnecessary comments from various source files - Update proto definitions with new message types and service methods
534 lines
18 KiB
Rust
534 lines
18 KiB
Rust
//! Disk-based cache infrastructure for GitKS.
|
|
//!
|
|
//! Implements the Gitaly-inspired diskcache design:
|
|
//! - Per-repository state directory with `latest` (repo state hash) and `pending/` (lease files)
|
|
//! - Cache key = SHA256(latest + request digest + version)
|
|
//! - Cached responses stored at `${CACHE_DIR}/${digest:0:2}/${digest:2}`
|
|
//! - Lease-based invalidation: mutator RPCs create lease files, update `latest` on completion
|
|
//! - Background cleanup of stale cache entries
|
|
|
|
use std::path::{Path, PathBuf};
|
|
use std::time::{Duration, SystemTime};
|
|
|
|
use sha2;
|
|
|
|
use crate::error::{GitError, GitResult};
|
|
|
|
/// Lease stale threshold: leases older than this are considered stale.
|
|
const LEASE_STALE_THRESHOLD_SECS: u64 = 30;
|
|
|
|
/// State directory relative path under repo prefix.
|
|
const STATE_DIR_RELATIVE: &str = "+gitks-cache/state";
|
|
|
|
/// Cache directory relative path under repo prefix.
|
|
const CACHE_DIR_RELATIVE: &str = "+gitks-cache/cache";
|
|
|
|
/// Info-refs cache directory relative path under repo prefix.
|
|
const INFO_REFS_DIR_RELATIVE: &str = "+gitks-cache/info_refs";
|
|
|
|
/// Generate a random value for the `latest` file.
|
|
fn random_value() -> String {
|
|
use std::fmt::Write;
|
|
use std::sync::atomic::{AtomicU64, Ordering};
|
|
let mut buf = [0u8; 16];
|
|
let nanos = SystemTime::now()
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
.unwrap_or_default()
|
|
.as_nanos() as u64;
|
|
buf[..8].copy_from_slice(&nanos.to_le_bytes());
|
|
static COUNTER: AtomicU64 = AtomicU64::new(0);
|
|
let c = COUNTER.fetch_add(1, Ordering::Relaxed);
|
|
buf[8..].copy_from_slice(&c.to_le_bytes());
|
|
let mut s = String::with_capacity(32);
|
|
for byte in &buf {
|
|
write!(s, "{byte:02x}").unwrap();
|
|
}
|
|
s
|
|
}
|
|
|
|
/// Compute SHA256 digest from multiple input parts.
|
|
fn sha256_digest(parts: &[&str]) -> String {
|
|
use sha2::Digest;
|
|
let mut hasher = sha2::Sha256::new();
|
|
for part in parts {
|
|
hasher.update(part.as_bytes());
|
|
}
|
|
let result = hasher.finalize();
|
|
let mut s = String::with_capacity(64);
|
|
for byte in result {
|
|
use std::fmt::Write;
|
|
write!(s, "{byte:02x}").unwrap();
|
|
}
|
|
s
|
|
}
|
|
|
|
/// Convert a digest into a two-level file path: `${digest:0:2}/${digest:2}`.
|
|
pub fn digest_to_path(digest: &str) -> PathBuf {
|
|
let prefix = &digest[..2];
|
|
let rest = &digest[2..];
|
|
PathBuf::from(prefix).join(rest)
|
|
}
|
|
|
|
/// DiskCache manages per-repository state and cached response files on local disk.
|
|
#[derive(Debug)]
|
|
pub struct DiskCache {
|
|
pub repo_prefix: PathBuf,
|
|
max_age: Duration,
|
|
version: String,
|
|
enabled: bool,
|
|
}
|
|
|
|
impl DiskCache {
|
|
/// Create a new DiskCache.
|
|
pub fn new(repo_prefix: PathBuf, version: String, max_age_secs: u64, enabled: bool) -> Self {
|
|
Self {
|
|
repo_prefix,
|
|
max_age: Duration::from_secs(max_age_secs),
|
|
version,
|
|
enabled,
|
|
}
|
|
}
|
|
|
|
/// Whether the cache is enabled.
|
|
pub fn is_enabled(&self) -> bool {
|
|
self.enabled
|
|
}
|
|
|
|
|
|
fn state_dir_for(&self, relative_path: &str) -> PathBuf {
|
|
self.repo_prefix
|
|
.join(STATE_DIR_RELATIVE)
|
|
.join(relative_path)
|
|
}
|
|
|
|
fn latest_path_for(&self, relative_path: &str) -> PathBuf {
|
|
self.state_dir_for(relative_path).join("latest")
|
|
}
|
|
|
|
fn pending_dir_for(&self, relative_path: &str) -> PathBuf {
|
|
self.state_dir_for(relative_path).join("pending")
|
|
}
|
|
|
|
|
|
fn cache_dir(&self, namespace: &str) -> PathBuf {
|
|
self.repo_prefix.join(namespace)
|
|
}
|
|
|
|
fn cache_file_path(&self, namespace: &str, digest: &str) -> PathBuf {
|
|
self.cache_dir(namespace).join(digest_to_path(digest))
|
|
}
|
|
|
|
|
|
/// Ensure the state directory for a repository exists and has a `latest` file.
|
|
/// If `latest` does not exist, create it with a random value.
|
|
pub fn ensure_state(&self, relative_path: &str) -> GitResult<String> {
|
|
if !self.enabled {
|
|
return Ok(random_value());
|
|
}
|
|
let state_dir = self.state_dir_for(relative_path);
|
|
std::fs::create_dir_all(&state_dir).map_err(GitError::Io)?;
|
|
let pending_dir = self.pending_dir_for(relative_path);
|
|
std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?;
|
|
|
|
let latest_path = self.latest_path_for(relative_path);
|
|
if latest_path.exists() {
|
|
let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?;
|
|
Ok(val.trim().to_string())
|
|
} else {
|
|
let val = random_value();
|
|
std::fs::write(&latest_path, &val).map_err(GitError::Io)?;
|
|
Ok(val)
|
|
}
|
|
}
|
|
|
|
/// Create a lease file for a mutating RPC.
|
|
/// Returns a `LeaseGuard` that removes the lease on drop and updates `latest`.
|
|
pub fn create_lease(&self, relative_path: &str) -> GitResult<LeaseGuard> {
|
|
if !self.enabled {
|
|
let rp = relative_path.to_string();
|
|
return Ok(LeaseGuard {
|
|
cache: self.clone(),
|
|
relative_path: rp,
|
|
lease_path: PathBuf::new(),
|
|
is_dummy: true,
|
|
});
|
|
}
|
|
let pending_dir = self.pending_dir_for(relative_path);
|
|
std::fs::create_dir_all(&pending_dir).map_err(GitError::Io)?;
|
|
let lease_name = random_value();
|
|
let lease_path = pending_dir.join(&lease_name);
|
|
std::fs::write(&lease_path, &lease_name).map_err(GitError::Io)?;
|
|
tracing::debug!(
|
|
relative_path = %relative_path,
|
|
lease = %lease_name,
|
|
"lease created"
|
|
);
|
|
Ok(LeaseGuard {
|
|
cache: self.clone(),
|
|
relative_path: relative_path.to_string(),
|
|
lease_path,
|
|
is_dummy: false,
|
|
})
|
|
}
|
|
|
|
/// Check if a repository is in a deterministic state (no active leases).
|
|
/// Returns the `latest` value if deterministic, or None if indeterminate.
|
|
pub fn get_repo_state(&self, relative_path: &str) -> GitResult<Option<String>> {
|
|
if !self.enabled {
|
|
return Ok(Some(random_value()));
|
|
}
|
|
self.cleanup_stale_leases(relative_path)?;
|
|
|
|
let pending_dir = self.pending_dir_for(relative_path);
|
|
if pending_dir.exists() {
|
|
let entries = std::fs::read_dir(&pending_dir).map_err(GitError::Io)?;
|
|
let count = entries.count();
|
|
if count > 0 {
|
|
tracing::debug!(
|
|
relative_path = %relative_path,
|
|
pending = count,
|
|
"repo has in-flight mutator, cache state indeterminate"
|
|
);
|
|
return Ok(None);
|
|
}
|
|
}
|
|
|
|
let latest_path = self.latest_path_for(relative_path);
|
|
if latest_path.exists() {
|
|
let val = std::fs::read_to_string(&latest_path).map_err(GitError::Io)?;
|
|
Ok(Some(val.trim().to_string()))
|
|
} else {
|
|
// No latest file → create one
|
|
Ok(Some(self.ensure_state(relative_path)?))
|
|
}
|
|
}
|
|
|
|
/// Remove stale lease files (older than LEASE_STALE_THRESHOLD_SECS).
|
|
fn cleanup_stale_leases(&self, relative_path: &str) -> GitResult<()> {
|
|
let pending_dir = self.pending_dir_for(relative_path);
|
|
if !pending_dir.exists() {
|
|
return Ok(());
|
|
}
|
|
let now = SystemTime::now();
|
|
let threshold = Duration::from_secs(LEASE_STALE_THRESHOLD_SECS);
|
|
for entry in std::fs::read_dir(&pending_dir).map_err(GitError::Io)? {
|
|
let entry = entry.map_err(GitError::Io)?;
|
|
let path = entry.path();
|
|
if let Ok(metadata) = entry.metadata()
|
|
&& let Ok(modified) = metadata.modified()
|
|
&& let Ok(age) = now.duration_since(modified)
|
|
&& age > threshold
|
|
{
|
|
tracing::warn!(
|
|
path = %path.display(),
|
|
age_secs = age.as_secs(),
|
|
"removing stale lease file"
|
|
);
|
|
std::fs::remove_file(&path).ok();
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
|
|
/// Compute a cache key for an info/refs request.
|
|
pub fn compute_info_refs_key(&self, relative_path: &str, protocol: &str) -> GitResult<String> {
|
|
let latest = self.ensure_state(relative_path)?;
|
|
let parts: &[&str] = &[&latest, "advertise_refs", protocol, &self.version];
|
|
Ok(sha256_digest(parts))
|
|
}
|
|
|
|
/// Compute a cache key for a pack-objects request.
|
|
pub fn compute_pack_objects_key(
|
|
&self,
|
|
relative_path: &str,
|
|
wants_hex: &[String],
|
|
haves_hex: &[String],
|
|
thin_pack: bool,
|
|
use_bitmaps: bool,
|
|
delta_base_offset: bool,
|
|
) -> GitResult<String> {
|
|
let latest = self.ensure_state(relative_path)?;
|
|
// Sort wants and haves for deterministic key
|
|
let mut wants_sorted = wants_hex.to_vec();
|
|
wants_sorted.sort();
|
|
let mut haves_sorted = haves_hex.to_vec();
|
|
haves_sorted.sort();
|
|
let wants_str = wants_sorted.join(",");
|
|
let haves_str = haves_sorted.join(",");
|
|
let flags = format!("thin={thin_pack},bitmaps={use_bitmaps},dbo={delta_base_offset}");
|
|
let parts: &[&str] = &[
|
|
&latest,
|
|
"pack_objects",
|
|
&wants_str,
|
|
&haves_str,
|
|
&flags,
|
|
&self.version,
|
|
];
|
|
Ok(sha256_digest(parts))
|
|
}
|
|
|
|
|
|
/// Look up a cached response for the given namespace and digest.
|
|
/// Returns the cached bytes if found and not expired.
|
|
pub fn lookup(&self, namespace: &str, digest: &str) -> GitResult<Option<Vec<u8>>> {
|
|
if !self.enabled {
|
|
return Ok(None);
|
|
}
|
|
let path = self.cache_file_path(namespace, digest);
|
|
if !path.exists() {
|
|
return Ok(None);
|
|
}
|
|
if let Ok(metadata) = std::fs::metadata(&path)
|
|
&& let Ok(modified) = metadata.modified()
|
|
&& let Ok(age) = SystemTime::now().duration_since(modified)
|
|
&& age > self.max_age
|
|
{
|
|
tracing::debug!(
|
|
path = %path.display(),
|
|
age_secs = age.as_secs(),
|
|
"cached entry expired, removing"
|
|
);
|
|
std::fs::remove_file(&path).ok();
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::remove_dir(parent).ok();
|
|
}
|
|
return Ok(None);
|
|
}
|
|
let data = std::fs::read(&path).map_err(GitError::Io)?;
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
digest = %digest,
|
|
size = data.len(),
|
|
"cache hit"
|
|
);
|
|
Ok(Some(data))
|
|
}
|
|
|
|
/// Insert a cached response for the given namespace and digest.
|
|
pub fn insert(&self, namespace: &str, digest: &str, data: &[u8]) -> GitResult<()> {
|
|
if !self.enabled {
|
|
return Ok(());
|
|
}
|
|
let path = self.cache_file_path(namespace, digest);
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent).map_err(GitError::Io)?;
|
|
}
|
|
let tmp_path = path.with_extension("tmp");
|
|
std::fs::write(&tmp_path, data).map_err(GitError::Io)?;
|
|
std::fs::rename(&tmp_path, &path).map_err(GitError::Io)?;
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
digest = %digest,
|
|
size = data.len(),
|
|
"cache entry written"
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Open a cache file for streaming read.
|
|
pub fn open_stream_read(
|
|
&self,
|
|
namespace: &str,
|
|
digest: &str,
|
|
) -> GitResult<Option<std::fs::File>> {
|
|
if !self.enabled {
|
|
return Ok(None);
|
|
}
|
|
let path = self.cache_file_path(namespace, digest);
|
|
if !path.exists() {
|
|
return Ok(None);
|
|
}
|
|
if let Ok(metadata) = std::fs::metadata(&path)
|
|
&& let Ok(modified) = metadata.modified()
|
|
&& let Ok(age) = SystemTime::now().duration_since(modified)
|
|
&& age > self.max_age
|
|
{
|
|
std::fs::remove_file(&path).ok();
|
|
return Ok(None);
|
|
}
|
|
let file = std::fs::File::open(&path).map_err(GitError::Io)?;
|
|
Ok(Some(file))
|
|
}
|
|
|
|
/// Open a cache file for streaming write.
|
|
/// Returns the file handle and the final path.
|
|
pub fn open_stream_write(
|
|
&self,
|
|
namespace: &str,
|
|
digest: &str,
|
|
) -> GitResult<(std::fs::File, PathBuf)> {
|
|
if !self.enabled {
|
|
return Err(GitError::Internal("disk cache not enabled".into()));
|
|
}
|
|
let path = self.cache_file_path(namespace, digest);
|
|
if let Some(parent) = path.parent() {
|
|
std::fs::create_dir_all(parent).map_err(GitError::Io)?;
|
|
}
|
|
let tmp_path = path.with_extension("tmp_streaming");
|
|
let file = std::fs::File::create(&tmp_path).map_err(GitError::Io)?;
|
|
Ok((file, path))
|
|
}
|
|
|
|
/// Finalize a streaming write by renaming the temp file to the final path.
|
|
pub fn finalize_stream_write(&self, tmp_path: &Path, final_path: &Path) -> GitResult<()> {
|
|
let actual_tmp = tmp_path.with_extension("tmp_streaming");
|
|
if actual_tmp.exists() {
|
|
std::fs::rename(&actual_tmp, final_path).map_err(GitError::Io)?;
|
|
}
|
|
tracing::debug!(
|
|
path = %final_path.display(),
|
|
"streaming cache entry finalized"
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
/// Invalidate all cache entries for a repository by updating the `latest` file.
|
|
/// This is called after any mutator RPC (create branch, create commit, etc.)
|
|
pub fn invalidate_repo(&self, relative_path: &str) {
|
|
if !self.enabled {
|
|
return;
|
|
}
|
|
let latest_path = self.latest_path_for(relative_path);
|
|
let new_val = random_value();
|
|
if let Err(e) = std::fs::write(&latest_path, &new_val) {
|
|
tracing::warn!(
|
|
relative_path = %relative_path,
|
|
error = %e,
|
|
"failed to update latest for cache invalidation"
|
|
);
|
|
} else {
|
|
tracing::debug!(
|
|
relative_path = %relative_path,
|
|
new_state = %new_val,
|
|
"cache invalidated for repository"
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Remove all cache entries on startup (guard against inconsistent state from previous run).
|
|
pub fn cleanup_on_startup(&self) -> GitResult<()> {
|
|
if !self.enabled {
|
|
return Ok(());
|
|
}
|
|
for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] {
|
|
let dir = self.repo_prefix.join(namespace);
|
|
if dir.exists() {
|
|
tracing::info!(dir = %dir.display(), "cleaning cache directory on startup");
|
|
std::fs::remove_dir_all(&dir).map_err(GitError::Io)?;
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Background cleanup: remove expired cache entries.
|
|
/// Should be called periodically (e.g., every 5 minutes).
|
|
pub fn cleanup_expired(&self) -> GitResult<u64> {
|
|
if !self.enabled {
|
|
return Ok(0);
|
|
}
|
|
let now = SystemTime::now();
|
|
let mut removed = 0u64;
|
|
|
|
for namespace in &[CACHE_DIR_RELATIVE, INFO_REFS_DIR_RELATIVE] {
|
|
let dir = self.repo_prefix.join(namespace);
|
|
if !dir.exists() {
|
|
continue;
|
|
}
|
|
for prefix_entry in std::fs::read_dir(&dir).map_err(GitError::Io)? {
|
|
let prefix_entry = prefix_entry.map_err(GitError::Io)?;
|
|
let prefix_dir = prefix_entry.path();
|
|
if !prefix_dir.is_dir() {
|
|
continue;
|
|
}
|
|
for entry in std::fs::read_dir(&prefix_dir).map_err(GitError::Io)? {
|
|
let entry = entry.map_err(GitError::Io)?;
|
|
let path = entry.path();
|
|
if let Ok(metadata) = entry.metadata()
|
|
&& let Ok(modified) = metadata.modified()
|
|
&& let Ok(age) = now.duration_since(modified)
|
|
&& age > self.max_age
|
|
{
|
|
tracing::debug!(
|
|
path = %path.display(),
|
|
age_secs = age.as_secs(),
|
|
"removing expired cache entry"
|
|
);
|
|
std::fs::remove_file(&path).ok();
|
|
removed += 1;
|
|
}
|
|
}
|
|
std::fs::remove_dir(&prefix_dir).ok();
|
|
}
|
|
}
|
|
if removed > 0 {
|
|
tracing::info!(
|
|
entries_removed = removed,
|
|
"expired cache entries cleaned up"
|
|
);
|
|
}
|
|
Ok(removed)
|
|
}
|
|
}
|
|
|
|
impl Clone for DiskCache {
|
|
fn clone(&self) -> Self {
|
|
Self {
|
|
repo_prefix: self.repo_prefix.clone(),
|
|
max_age: self.max_age,
|
|
version: self.version.clone(),
|
|
enabled: self.enabled,
|
|
}
|
|
}
|
|
}
|
|
|
|
/// A lease guard that removes the lease file on drop and updates `latest`.
|
|
pub struct LeaseGuard {
|
|
cache: DiskCache,
|
|
relative_path: String,
|
|
lease_path: PathBuf,
|
|
is_dummy: bool,
|
|
}
|
|
|
|
impl LeaseGuard {
|
|
/// Update the `latest` file to invalidate cached responses.
|
|
/// Called automatically on drop, but can also be called manually.
|
|
pub fn commit(&mut self) {
|
|
if self.is_dummy {
|
|
self.cache.invalidate_repo(&self.relative_path);
|
|
self.is_dummy = false; // Don't double-commit on drop
|
|
return;
|
|
}
|
|
if self.lease_path.exists() {
|
|
std::fs::remove_file(&self.lease_path).ok();
|
|
}
|
|
self.cache.invalidate_repo(&self.relative_path);
|
|
self.is_dummy = false;
|
|
}
|
|
}
|
|
|
|
impl Drop for LeaseGuard {
|
|
fn drop(&mut self) {
|
|
if self.is_dummy || self.lease_path.exists() {
|
|
self.commit();
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Start a background task that periodically cleans up expired cache entries.
|
|
pub fn start_cache_cleanup_task(
|
|
cache: DiskCache,
|
|
interval: Duration,
|
|
) -> tokio::task::JoinHandle<()> {
|
|
tokio::spawn(async move {
|
|
let mut ticker = tokio::time::interval(interval);
|
|
ticker.tick().await; // First tick is immediate
|
|
loop {
|
|
ticker.tick().await;
|
|
if let Err(e) = cache.cleanup_expired() {
|
|
tracing::warn!(error = %e, "cache cleanup task failed");
|
|
}
|
|
}
|
|
})
|
|
}
|