Files
gitks/pack_cache.rs
T
zhenyi 10a4398e81 refactor(bare): enhance security and performance optimizations
- Remove unnecessary sorting in advertise_refs for deterministic output
- Add path traversal detection and validation in bare_dir construction
- Implement symlink resolution checks to prevent security vulnerabilities
- Refactor cache system with CRC validation and improved metrics
- Integrate repo-specific cache invalidation using indexed keys
- Add comprehensive unit tests for commit operations and diff functionality
- Move configuration constants to centralized config module
- Optimize string operations in disk cache random value generation
- Enhance license detection algorithm with cleaner matching logic
- Streamline argument processing in various git operations
- Update dependencies including crc32fast and flate2 for performance
- Add signal handling capability to tokio runtime configuration
2026-06-12 15:04:12 +08:00

237 lines
8.0 KiB
Rust

//! Pack-objects cache module.
//!
//! Caches `git pack-objects` output on local disk to reduce CPU load
//! during repeated clone/fetch operations (especially CI traffic).
//!
//! Uses the DiskCache infrastructure for state management and invalidation.
//! Implements streaming with backpressure: the producer writes to a cache file
//! while consumers read from it concurrently.
use std::io::Write;
use std::time::Duration;
use prost::Message;
use tokio_stream::wrappers::ReceiverStream;
use crate::disk_cache::DiskCache;
use crate::pb::PackfileChunk;
/// Namespace for pack-objects cache entries in the disk cache.
pub const PACK_CACHE_NAMESPACE: &str = "+gitks-cache/cache";
/// Namespace for info/refs cache entries.
pub const INFO_REFS_NAMESPACE: &str = "+gitks-cache/info_refs";
/// Pack-objects cache wrapper around DiskCache.
#[derive(Debug, Clone)]
pub struct PackCache {
disk_cache: DiskCache,
backpressure_enabled: bool,
}
impl PackCache {
pub fn new(disk_cache: DiskCache, backpressure: bool) -> Self {
Self {
disk_cache,
backpressure_enabled: backpressure,
}
}
pub fn is_enabled(&self) -> bool {
self.disk_cache.is_enabled()
}
pub fn disk_cache(&self) -> &DiskCache {
&self.disk_cache
}
/// Try to serve a cached pack-objects response as a stream.
/// Reads the cache file in streaming fashion, emitting chunks as they are read
/// to avoid buffering the entire pack file in memory.
pub fn lookup_pack_stream(
&self,
digest: &str,
) -> Option<ReceiverStream<Result<PackfileChunk, tonic::Status>>> {
if !self.is_enabled() {
return None;
}
let file = match self
.disk_cache
.open_stream_read(PACK_CACHE_NAMESPACE, digest)
{
Ok(Some(f)) => f,
Ok(None) => return None,
Err(_) => return None,
};
tracing::info!(digest = %digest, "pack-objects cache hit, streaming from disk");
let channel_size = if self.backpressure_enabled { 4 } else { 256 };
let (tx, rx) = tokio::sync::mpsc::channel(channel_size);
let sender = tx.clone();
tokio::spawn(async move {
let result = tokio::task::spawn_blocking(move || {
use std::io::Read;
let mut file = file;
let mut buf = vec![0u8; 65536];
loop {
match file.read(&mut buf) {
Ok(0) => return Ok(()),
Ok(n) => {
if sender
.blocking_send(Ok(PackfileChunk {
data: buf[..n].to_vec(),
}))
.is_err()
{
return Ok(());
}
}
Err(e) => {
let _ = sender.blocking_send(Err(tonic::Status::internal(format!(
"cache read error: {e}"
))));
return Err(());
}
}
}
})
.await;
if result.is_err() {
}
});
Some(ReceiverStream::new(rx))
}
/// Stream pack-objects output while simultaneously writing to cache.
/// This is the "tee" approach: data flows to both the client and the cache file.
/// When backpressure is enabled, uses a small channel to slow the producer
/// if the consumer is slow. Otherwise uses a large channel for max throughput.
pub fn tee_pack_stream(
&self,
digest: &str,
source: ReceiverStream<Result<PackfileChunk, tonic::Status>>,
) -> ReceiverStream<Result<PackfileChunk, tonic::Status>> {
let channel_size = if self.backpressure_enabled { 4 } else { 256 };
let (tx, rx) = tokio::sync::mpsc::channel(channel_size);
if !self.is_enabled() {
tokio::spawn(async move {
use tokio_stream::StreamExt;
let mut source = source;
while let Some(item) = source.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
return ReceiverStream::new(rx);
}
let cache_dir = self.disk_cache.repo_prefix.join(PACK_CACHE_NAMESPACE);
let rel_path = crate::disk_cache::digest_to_path(digest);
let cache_path = cache_dir.join(&rel_path);
let tmp_path = cache_path.with_extension("tmp_streaming");
tokio::spawn(async move {
use tokio_stream::StreamExt;
if let Some(parent) = cache_path.parent() {
let _ = std::fs::create_dir_all(parent);
}
let mut cache_file = std::fs::File::create(&tmp_path).ok();
let mut cache_write_ok = true;
let mut source = source;
while let Some(item) = source.next().await {
if tx.send(item.clone()).await.is_err() {
cache_write_ok = false; // client disconnected, don't cache partial
break;
}
if cache_write_ok
&& let Some(ref mut f) = cache_file
&& let Ok(chunk) = &item
&& f.write_all(&chunk.data).is_err()
{
tracing::warn!("cache write failed, dropping cache file handle");
cache_file = None;
cache_write_ok = false;
}
}
if cache_write_ok && cache_file.is_some() {
let _ = std::fs::rename(&tmp_path, &cache_path);
tracing::info!(
path = %cache_path.display(),
"pack-objects cache entry written"
);
} else if tmp_path.exists() {
std::fs::remove_file(&tmp_path).ok();
}
});
ReceiverStream::new(rx)
}
/// Look up cached info/refs response.
pub fn lookup_info_refs<T: Message + Default>(&self, digest: &str) -> Option<T> {
if !self.is_enabled() {
return None;
}
match self.disk_cache.lookup(INFO_REFS_NAMESPACE, digest) {
Ok(Some(bytes)) => {
if let Ok(resp) = T::decode(bytes.as_slice()) {
tracing::debug!(digest = %digest, "info/refs cache hit");
return Some(resp);
}
tracing::warn!(digest = %digest, "info/refs cache decode failed");
None
}
Ok(None) => None,
Err(e) => {
tracing::warn!(error = %e, "info/refs cache lookup error");
None
}
}
}
/// Store an info/refs response in cache.
pub fn store_info_refs<T: Message>(&self, digest: &str, resp: &T) {
if !self.is_enabled() {
return;
}
let mut bytes = Vec::with_capacity(resp.encoded_len());
if resp.encode(&mut bytes).is_ok()
&& let Err(e) = self.disk_cache.insert(INFO_REFS_NAMESPACE, digest, &bytes)
{
tracing::warn!(error = %e, "info/refs cache write failed");
}
}
/// Invalidate cache for a repository (called after mutator RPCs).
pub fn invalidate_repo(&self, relative_path: &str) {
self.disk_cache.invalidate_repo(relative_path);
}
/// Create a lease for a mutating operation.
pub fn create_lease(
&self,
relative_path: &str,
) -> Result<crate::disk_cache::LeaseGuard, crate::error::GitError> {
self.disk_cache.create_lease(relative_path)
}
}
/// Start background cache cleanup task.
pub fn start_cleanup_task(cache: PackCache, interval_secs: u64) -> tokio::task::JoinHandle<()> {
crate::disk_cache::start_cache_cleanup_task(
cache.disk_cache.clone(),
Duration::from_secs(interval_secs),
)
}