//! Copyright (c) 2022-2026 GitDataAi All rights reserved. //! Pack-objects cache module. //! //! Caches `git pack-objects` output on local disk to reduce CPU load //! during repeated clone/fetch operations (especially CI traffic). //! //! Uses the DiskCache infrastructure for state management and invalidation. //! Implements streaming with backpressure: the producer writes to a cache file //! while consumers read from it concurrently. use std::io::Write; use std::time::Duration; use prost::Message; use tokio_stream::wrappers::ReceiverStream; use crate::disk_cache::DiskCache; use crate::pb::PackfileChunk; /// Namespace for pack-objects cache entries in the disk cache. pub const PACK_CACHE_NAMESPACE: &str = "+gitks-cache/cache"; /// Namespace for info/refs cache entries. pub const INFO_REFS_NAMESPACE: &str = "+gitks-cache/info_refs"; /// Pack-objects cache wrapper around DiskCache. #[derive(Debug, Clone)] pub struct PackCache { disk_cache: DiskCache, backpressure_enabled: bool, } impl PackCache { pub fn new(disk_cache: DiskCache, backpressure: bool) -> Self { Self { disk_cache, backpressure_enabled: backpressure, } } pub fn is_enabled(&self) -> bool { self.disk_cache.is_enabled() } pub fn disk_cache(&self) -> &DiskCache { &self.disk_cache } /// Try to serve a cached pack-objects response as a stream. /// Reads the cache file in streaming fashion, emitting chunks as they are read /// to avoid buffering the entire pack file in memory. pub fn lookup_pack_stream( &self, digest: &str, ) -> Option>> { if !self.is_enabled() { return None; } let file = match self .disk_cache .open_stream_read(PACK_CACHE_NAMESPACE, digest) { Ok(Some(f)) => f, Ok(None) => return None, Err(_) => return None, }; tracing::info!(digest = %digest, "pack-objects cache hit, streaming from disk"); let channel_size = if self.backpressure_enabled { 4 } else { 256 }; let (tx, rx) = tokio::sync::mpsc::channel(channel_size); let sender = tx.clone(); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { use std::io::Read; let mut file = file; let mut buf = vec![0u8; 65536]; loop { match file.read(&mut buf) { Ok(0) => return Ok(()), Ok(n) => { if sender .blocking_send(Ok(PackfileChunk { data: buf[..n].to_vec(), })) .is_err() { return Ok(()); } } Err(e) => { let _ = sender.blocking_send(Err(tonic::Status::internal(format!( "cache read error: {e}" )))); return Err(()); } } } }) .await; if result.is_err() {} }); Some(ReceiverStream::new(rx)) } /// Stream pack-objects output while simultaneously writing to cache. /// This is the "tee" approach: data flows to both the client and the cache file. /// When backpressure is enabled, uses a small channel to slow the producer /// if the consumer is slow. Otherwise uses a large channel for max throughput. pub fn tee_pack_stream( &self, digest: &str, source: ReceiverStream>, ) -> ReceiverStream> { let channel_size = if self.backpressure_enabled { 4 } else { 256 }; let (tx, rx) = tokio::sync::mpsc::channel(channel_size); if !self.is_enabled() { tokio::spawn(async move { use tokio_stream::StreamExt; let mut source = source; while let Some(item) = source.next().await { if tx.send(item).await.is_err() { break; } } }); return ReceiverStream::new(rx); } let cache_dir = self.disk_cache.repo_prefix.join(PACK_CACHE_NAMESPACE); let rel_path = crate::disk_cache::digest_to_path(digest); let cache_path = cache_dir.join(&rel_path); let tmp_path = cache_path.with_extension("tmp_streaming"); tokio::spawn(async move { use tokio_stream::StreamExt; if let Some(parent) = cache_path.parent() { let _ = std::fs::create_dir_all(parent); } let mut cache_file = std::fs::File::create(&tmp_path).ok(); let mut cache_write_ok = true; let mut source = source; while let Some(item) = source.next().await { if tx.send(item.clone()).await.is_err() { cache_write_ok = false; // client disconnected, don't cache partial break; } if cache_write_ok && let Some(ref mut f) = cache_file && let Ok(chunk) = &item && f.write_all(&chunk.data).is_err() { tracing::warn!("cache write failed, dropping cache file handle"); cache_file = None; cache_write_ok = false; } } if cache_write_ok && cache_file.is_some() { let _ = std::fs::rename(&tmp_path, &cache_path); tracing::info!( path = %cache_path.display(), "pack-objects cache entry written" ); } else if tmp_path.exists() { std::fs::remove_file(&tmp_path).ok(); } }); ReceiverStream::new(rx) } /// Look up cached info/refs response. pub fn lookup_info_refs(&self, digest: &str) -> Option { if !self.is_enabled() { return None; } match self.disk_cache.lookup(INFO_REFS_NAMESPACE, digest) { Ok(Some(bytes)) => { if let Ok(resp) = T::decode(bytes.as_slice()) { tracing::debug!(digest = %digest, "info/refs cache hit"); return Some(resp); } tracing::warn!(digest = %digest, "info/refs cache decode failed"); None } Ok(None) => None, Err(e) => { tracing::warn!(error = %e, "info/refs cache lookup error"); None } } } /// Store an info/refs response in cache. pub fn store_info_refs(&self, digest: &str, resp: &T) { if !self.is_enabled() { return; } let mut bytes = Vec::with_capacity(resp.encoded_len()); if resp.encode(&mut bytes).is_ok() && let Err(e) = self.disk_cache.insert(INFO_REFS_NAMESPACE, digest, &bytes) { tracing::warn!(error = %e, "info/refs cache write failed"); } } /// Invalidate cache for a repository (called after mutator RPCs). pub fn invalidate_repo(&self, relative_path: &str) { self.disk_cache.invalidate_repo(relative_path); } /// Create a lease for a mutating operation. pub fn create_lease( &self, relative_path: &str, ) -> Result { self.disk_cache.create_lease(relative_path) } } /// Start background cache cleanup task. pub fn start_cleanup_task(cache: PackCache, interval_secs: u64) -> tokio::task::JoinHandle<()> { crate::disk_cache::start_cache_cleanup_task( cache.disk_cache.clone(), Duration::from_secs(interval_secs), ) }