8f472a0443
- Integrate etcd-client for distributed coordination and leader election - Add remote client macros with proper formatting for all services - Implement RequestMetrics for tracking RPC performance and errors - Add rate limiting mechanism across all service endpoints - Create ElectionRequest and ElectionResult message types for leader election - Add role management with primary/replica switching capabilities - Implement health checker with automatic failover detection - Add repository count metrics for cluster monitoring - Update Cargo.toml with etcd-client and dashmap dependencies - Modify RepoEntry to include read_only flag for replica handling - Implement should_accept_election logic to prevent duplicate elections - Add RoleChangedEvent handling for cluster role updates
230 lines
7.7 KiB
Rust
230 lines
7.7 KiB
Rust
//! Pack-objects cache module.
|
|
//!
|
|
//! Caches `git pack-objects` output on local disk to reduce CPU load
|
|
//! during repeated clone/fetch operations (especially CI traffic).
|
|
//!
|
|
//! Uses the DiskCache infrastructure for state management and invalidation.
|
|
//! Implements streaming with backpressure: the producer writes to a cache file
|
|
//! while consumers read from it concurrently.
|
|
|
|
use std::io::Write;
|
|
use std::time::Duration;
|
|
|
|
use prost::Message;
|
|
use tokio_stream::wrappers::ReceiverStream;
|
|
|
|
use crate::disk_cache::DiskCache;
|
|
use crate::pb::PackfileChunk;
|
|
|
|
/// Namespace for pack-objects cache entries in the disk cache.
|
|
pub const PACK_CACHE_NAMESPACE: &str = "+gitks-cache/cache";
|
|
|
|
/// Namespace for info/refs cache entries.
|
|
pub const INFO_REFS_NAMESPACE: &str = "+gitks-cache/info_refs";
|
|
|
|
/// Pack-objects cache wrapper around DiskCache.
|
|
#[derive(Debug, Clone)]
|
|
pub struct PackCache {
|
|
disk_cache: DiskCache,
|
|
}
|
|
|
|
impl PackCache {
|
|
pub fn new(disk_cache: DiskCache, _backpressure: bool) -> Self {
|
|
Self { disk_cache }
|
|
}
|
|
|
|
pub fn is_enabled(&self) -> bool {
|
|
self.disk_cache.is_enabled()
|
|
}
|
|
|
|
pub fn disk_cache(&self) -> &DiskCache {
|
|
&self.disk_cache
|
|
}
|
|
|
|
/// Try to serve a cached pack-objects response as a stream.
|
|
/// Reads the cache file in streaming fashion, emitting chunks as they are read
|
|
/// to avoid buffering the entire pack file in memory.
|
|
pub fn lookup_pack_stream(
|
|
&self,
|
|
digest: &str,
|
|
) -> Option<ReceiverStream<Result<PackfileChunk, tonic::Status>>> {
|
|
if !self.is_enabled() {
|
|
return None;
|
|
}
|
|
|
|
let file = match self
|
|
.disk_cache
|
|
.open_stream_read(PACK_CACHE_NAMESPACE, digest)
|
|
{
|
|
Ok(Some(f)) => f,
|
|
Ok(None) => return None,
|
|
Err(_) => return None,
|
|
};
|
|
|
|
tracing::info!(digest = %digest, "pack-objects cache hit, streaming from disk");
|
|
|
|
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
|
|
|
let sender = tx.clone();
|
|
tokio::spawn(async move {
|
|
let result = tokio::task::spawn_blocking(move || {
|
|
use std::io::Read;
|
|
let mut file = file;
|
|
let mut buf = vec![0u8; 65536];
|
|
loop {
|
|
match file.read(&mut buf) {
|
|
Ok(0) => return Ok(()),
|
|
Ok(n) => {
|
|
if sender
|
|
.blocking_send(Ok(PackfileChunk {
|
|
data: buf[..n].to_vec(),
|
|
}))
|
|
.is_err()
|
|
{
|
|
return Ok(());
|
|
}
|
|
}
|
|
Err(e) => {
|
|
let _ = sender.blocking_send(Err(tonic::Status::internal(format!(
|
|
"cache read error: {e}"
|
|
))));
|
|
return Err(());
|
|
}
|
|
}
|
|
}
|
|
})
|
|
.await;
|
|
if result.is_err() {
|
|
// Task join error or I/O error already sent
|
|
}
|
|
});
|
|
|
|
Some(ReceiverStream::new(rx))
|
|
}
|
|
|
|
/// Stream pack-objects output while simultaneously writing to cache.
|
|
/// This is the "tee" approach: data flows to both the client and the cache file.
|
|
pub fn tee_pack_stream(
|
|
&self,
|
|
digest: &str,
|
|
source: ReceiverStream<Result<PackfileChunk, tonic::Status>>,
|
|
) -> ReceiverStream<Result<PackfileChunk, tonic::Status>> {
|
|
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
|
|
|
if !self.is_enabled() {
|
|
tokio::spawn(async move {
|
|
use tokio_stream::StreamExt;
|
|
let mut source = source;
|
|
while let Some(item) = source.next().await {
|
|
if tx.send(item).await.is_err() {
|
|
break;
|
|
}
|
|
}
|
|
});
|
|
return ReceiverStream::new(rx);
|
|
}
|
|
|
|
let cache_dir = self.disk_cache.repo_prefix.join(PACK_CACHE_NAMESPACE);
|
|
let rel_path = crate::disk_cache::digest_to_path(digest);
|
|
let cache_path = cache_dir.join(&rel_path);
|
|
let tmp_path = cache_path.with_extension("tmp_streaming");
|
|
|
|
tokio::spawn(async move {
|
|
use tokio_stream::StreamExt;
|
|
|
|
if let Some(parent) = cache_path.parent() {
|
|
let _ = std::fs::create_dir_all(parent);
|
|
}
|
|
|
|
let mut cache_file = std::fs::File::create(&tmp_path).ok();
|
|
let mut cache_write_ok = true;
|
|
|
|
let mut source = source;
|
|
while let Some(item) = source.next().await {
|
|
if tx.send(item.clone()).await.is_err() {
|
|
cache_write_ok = false; // client disconnected, don't cache partial
|
|
break;
|
|
}
|
|
|
|
if cache_write_ok
|
|
&& let Some(ref mut f) = cache_file
|
|
&& let Ok(chunk) = &item
|
|
&& f.write_all(&chunk.data).is_err()
|
|
{
|
|
tracing::warn!("cache write failed, dropping cache file handle");
|
|
cache_file = None;
|
|
cache_write_ok = false;
|
|
}
|
|
}
|
|
|
|
if cache_write_ok && cache_file.is_some() {
|
|
let _ = std::fs::rename(&tmp_path, &cache_path);
|
|
tracing::info!(
|
|
path = %cache_path.display(),
|
|
"pack-objects cache entry written"
|
|
);
|
|
} else if tmp_path.exists() {
|
|
std::fs::remove_file(&tmp_path).ok();
|
|
}
|
|
});
|
|
|
|
ReceiverStream::new(rx)
|
|
}
|
|
|
|
/// Look up cached info/refs response.
|
|
pub fn lookup_info_refs<T: Message + Default>(&self, digest: &str) -> Option<T> {
|
|
if !self.is_enabled() {
|
|
return None;
|
|
}
|
|
match self.disk_cache.lookup(INFO_REFS_NAMESPACE, digest) {
|
|
Ok(Some(bytes)) => {
|
|
if let Ok(resp) = T::decode(bytes.as_slice()) {
|
|
tracing::debug!(digest = %digest, "info/refs cache hit");
|
|
return Some(resp);
|
|
}
|
|
tracing::warn!(digest = %digest, "info/refs cache decode failed");
|
|
None
|
|
}
|
|
Ok(None) => None,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "info/refs cache lookup error");
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Store an info/refs response in cache.
|
|
pub fn store_info_refs<T: Message>(&self, digest: &str, resp: &T) {
|
|
if !self.is_enabled() {
|
|
return;
|
|
}
|
|
let mut bytes = Vec::with_capacity(resp.encoded_len());
|
|
if resp.encode(&mut bytes).is_ok()
|
|
&& let Err(e) = self.disk_cache.insert(INFO_REFS_NAMESPACE, digest, &bytes)
|
|
{
|
|
tracing::warn!(error = %e, "info/refs cache write failed");
|
|
}
|
|
}
|
|
|
|
/// Invalidate cache for a repository (called after mutator RPCs).
|
|
pub fn invalidate_repo(&self, relative_path: &str) {
|
|
self.disk_cache.invalidate_repo(relative_path);
|
|
}
|
|
|
|
/// Create a lease for a mutating operation.
|
|
pub fn create_lease(
|
|
&self,
|
|
relative_path: &str,
|
|
) -> Result<crate::disk_cache::LeaseGuard, crate::error::GitError> {
|
|
self.disk_cache.create_lease(relative_path)
|
|
}
|
|
}
|
|
|
|
/// Start background cache cleanup task.
|
|
pub fn start_cleanup_task(cache: PackCache, interval_secs: u64) -> tokio::task::JoinHandle<()> {
|
|
crate::disk_cache::start_cache_cleanup_task(
|
|
cache.disk_cache.clone(),
|
|
Duration::from_secs(interval_secs),
|
|
)
|
|
}
|