934858bebf
- Add repo_path parameter to cached_response and cached_vec_response functions - Implement structured cache key format with namespace, repo_path, and request proto - Replace global cache with Moka in-memory cache using weight-based eviction - Set 256MB memory cap with 10-minute TTL and 2-minute TTI policy - Add metrics collection for cache operations and evictions - Implement efficient repo-scoped invalidation using key structure - Add detailed documentation comments explaining cache architecture - Remove outdated dependencies and update dependency versions - Add error handling for encoding failures in cache operations - Optimize Vec responses with length-delimited encoding and pre-allocation
387 lines
12 KiB
Rust
387 lines
12 KiB
Rust
//! In-memory response cache layer for GitKS.
|
|
//!
|
|
//! Two-tier architecture:
|
|
//! 1. **Moka in-memory cache** (this module) — sub-microsecond lookups for hot data
|
|
//! 2. **Disk cache** (disk_cache.rs) — persistent cache for pack-objects / info-refs
|
|
//!
|
|
//! # Cache Key Format
|
|
//!
|
|
//! Keys are structured to enable efficient repo-scoped invalidation:
|
|
//!
|
|
//! ```text
|
|
//! [namespace_len: u8][namespace: &[u8]][repo_path_len: u16 LE][repo_path: &[u8]][request_proto: &[u8]]
|
|
//! ```
|
|
//!
|
|
//! This allows `invalidate_repo` to extract and match the repo_path without
|
|
//! protobuf decoding or substring scanning.
|
|
//!
|
|
//! # Eviction Policy
|
|
//!
|
|
//! - **Weight-based**: total memory capped at 256 MB (weighed by key+value capacity)
|
|
//! - **TTI** (time-to-idle): 2 minutes — frequently accessed entries stay hot
|
|
//! - **TTL** (time-to-live): 10 minutes — hard upper bound for safety
|
|
//! - Evictions are tracked via metrics for observability
|
|
|
|
use std::sync::OnceLock;
|
|
use std::time::Duration;
|
|
|
|
use moka::sync::Cache;
|
|
use prost::Message;
|
|
|
|
/// Maximum total cache weight (key + value allocated bytes): 256 MB.
|
|
const CACHE_MAX_WEIGHT: u64 = 256 * 1024 * 1024;
|
|
|
|
/// Hard time-to-live: entries older than this are unconditionally evicted.
|
|
const CACHE_MAX_TTL: Duration = Duration::from_secs(600); // 10 min
|
|
|
|
/// Time-to-idle: entries not accessed within this window are evicted.
|
|
/// Frequently accessed entries survive up to TTL, cold entries expire quickly.
|
|
const CACHE_TTI: Duration = Duration::from_secs(120); // 2 min
|
|
|
|
/// Estimated per-entry overhead (Moka internal Arc + metadata).
|
|
/// Added to the weigher result to prevent underestimation.
|
|
const ENTRY_OVERHEAD: u32 = 128;
|
|
|
|
struct CacheState {
|
|
store: Cache<Vec<u8>, Vec<u8>>,
|
|
}
|
|
|
|
static CACHE: OnceLock<CacheState> = OnceLock::new();
|
|
|
|
fn state() -> &'static CacheState {
|
|
CACHE.get_or_init(|| {
|
|
let store = Cache::builder()
|
|
.weigher(|key: &Vec<u8>, value: &Vec<u8>| -> u32 {
|
|
// capacity() reflects actual allocation including spare capacity
|
|
key.capacity() as u32 + value.capacity() as u32 + ENTRY_OVERHEAD
|
|
})
|
|
.max_capacity(CACHE_MAX_WEIGHT)
|
|
.time_to_live(CACHE_MAX_TTL)
|
|
.time_to_idle(CACHE_TTI)
|
|
.eviction_listener(|key: std::sync::Arc<Vec<u8>>, _value: Vec<u8>, cause| {
|
|
let cause_str = match cause {
|
|
moka::notification::RemovalCause::Expired => "expired",
|
|
moka::notification::RemovalCause::Explicit => "explicit",
|
|
moka::notification::RemovalCause::Replaced => "replaced",
|
|
moka::notification::RemovalCause::Size => "size",
|
|
};
|
|
// Extract namespace for per-namespace metrics
|
|
let namespace = decode_namespace(&key);
|
|
crate::metrics::record_cache_eviction(namespace, cause_str);
|
|
})
|
|
.build();
|
|
|
|
tracing::info!(
|
|
max_weight_mb = CACHE_MAX_WEIGHT / (1024 * 1024),
|
|
ttl_secs = CACHE_MAX_TTL.as_secs(),
|
|
tti_secs = CACHE_TTI.as_secs(),
|
|
"Moka in-memory cache initialized"
|
|
);
|
|
|
|
CacheState { store }
|
|
})
|
|
}
|
|
|
|
#[inline]
|
|
fn cache() -> &'static Cache<Vec<u8>, Vec<u8>> {
|
|
&state().store
|
|
}
|
|
|
|
// Key encoding
|
|
|
|
/// Encode a structured cache key.
|
|
///
|
|
/// Format: `namespace_len(u8) + namespace + repo_path_len(u16 LE) + repo_path + request_proto`
|
|
///
|
|
fn encode_key(namespace: &str, repo_path: &str, request_bytes: &[u8]) -> Option<Vec<u8>> {
|
|
let ns = namespace.as_bytes();
|
|
let rp = repo_path.as_bytes();
|
|
if ns.len() > u8::MAX as usize || rp.len() > u16::MAX as usize {
|
|
tracing::warn!(
|
|
namespace_len = ns.len(),
|
|
repo_path_len = rp.len(),
|
|
"cache key too long, bypassing cache"
|
|
);
|
|
return None;
|
|
}
|
|
|
|
let total = 1 + ns.len() + 2 + rp.len() + request_bytes.len();
|
|
let mut key = Vec::with_capacity(total);
|
|
key.push(ns.len() as u8);
|
|
key.extend_from_slice(ns);
|
|
key.extend_from_slice(&(rp.len() as u16).to_le_bytes());
|
|
key.extend_from_slice(rp);
|
|
key.extend_from_slice(request_bytes);
|
|
Some(key)
|
|
}
|
|
|
|
/// Extract the namespace string from a cache key.
|
|
fn decode_namespace(key: &[u8]) -> &str {
|
|
if key.is_empty() {
|
|
return "unknown";
|
|
}
|
|
let ns_len = key[0] as usize;
|
|
let end = (1 + ns_len).min(key.len());
|
|
std::str::from_utf8(&key[1..end]).unwrap_or("unknown")
|
|
}
|
|
|
|
/// Extract the repo_path from a cache key (returns slice into the key).
|
|
fn extract_repo_path_bytes(key: &[u8]) -> Option<&[u8]> {
|
|
if key.len() < 3 {
|
|
return None;
|
|
}
|
|
let ns_len = key[0] as usize;
|
|
let rp_len_offset = 1 + ns_len;
|
|
if key.len() < rp_len_offset + 2 {
|
|
return None;
|
|
}
|
|
let rp_len = u16::from_le_bytes([key[rp_len_offset], key[rp_len_offset + 1]]) as usize;
|
|
let rp_start = rp_len_offset + 2;
|
|
let rp_end = rp_start.checked_add(rp_len)?;
|
|
if rp_end > key.len() {
|
|
return None;
|
|
}
|
|
Some(&key[rp_start..rp_end])
|
|
}
|
|
|
|
/// Check if a cache key belongs to the given repository.
|
|
fn key_matches_repo(key: &[u8], target_repo: &[u8]) -> bool {
|
|
extract_repo_path_bytes(key).is_some_and(|rp| rp == target_repo)
|
|
}
|
|
|
|
// Single-message cache
|
|
|
|
/// Cache a single protobuf response.
|
|
///
|
|
/// On cache hit, decodes and returns the cached response.
|
|
/// On cache miss, calls `build`, caches the result, and returns it.
|
|
///
|
|
/// `repo_path` should be the repository's relative path (used for scoped invalidation).
|
|
pub(crate) fn cached_response<Req, Res, E, F>(
|
|
namespace: &'static str,
|
|
repo_path: &str,
|
|
request: &Req,
|
|
build: F,
|
|
) -> Result<Res, E>
|
|
where
|
|
Req: Message,
|
|
Res: Message + Default,
|
|
F: FnOnce() -> Result<Res, E>,
|
|
{
|
|
let req_bytes = encode_request(request);
|
|
let Some(key) = encode_key(namespace, repo_path, &req_bytes) else {
|
|
return build();
|
|
};
|
|
|
|
if let Some(bytes) = cache().get(&key)
|
|
&& let Ok(response) = Res::decode(bytes.as_slice())
|
|
{
|
|
let elapsed = std::time::Duration::ZERO; // Moka get is memory-only, effectively instant
|
|
crate::metrics::record_cache_op("moka", "hit", elapsed);
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
key_len = key.len(),
|
|
value_len = bytes.len(),
|
|
"cache hit"
|
|
);
|
|
return Ok(response);
|
|
}
|
|
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
key_len = key.len(),
|
|
"cache miss, building response"
|
|
);
|
|
|
|
let start = std::time::Instant::now();
|
|
let response = build()?;
|
|
let build_elapsed = start.elapsed();
|
|
|
|
let mut bytes = Vec::with_capacity(response.encoded_len());
|
|
if let Err(err) = response.encode(&mut bytes) {
|
|
tracing::warn!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
error = %err,
|
|
"failed to encode cache response"
|
|
);
|
|
} else {
|
|
cache().insert(key, bytes);
|
|
}
|
|
|
|
crate::metrics::record_cache_op("moka", "miss", build_elapsed);
|
|
Ok(response)
|
|
}
|
|
|
|
// Vec-message cache
|
|
|
|
/// Cache a `Vec<Item>` protobuf response using length-delimited encoding.
|
|
///
|
|
/// Each item is stored sequentially with length-delimited framing, allowing
|
|
/// partial decode resilience: if any single item fails to decode, the entire
|
|
/// entry is discarded and rebuilt.
|
|
pub(crate) fn cached_vec_response<Req, Item, E, F>(
|
|
namespace: &'static str,
|
|
repo_path: &str,
|
|
request: &Req,
|
|
build: F,
|
|
) -> Result<Vec<Item>, E>
|
|
where
|
|
Req: Message,
|
|
Item: Message + Default,
|
|
F: FnOnce() -> Result<Vec<Item>, E>,
|
|
{
|
|
let req_bytes = encode_request(request);
|
|
let Some(key) = encode_key(namespace, repo_path, &req_bytes) else {
|
|
return build();
|
|
};
|
|
|
|
// Try cache hit
|
|
if let Some(bytes) = cache().get(&key) {
|
|
let mut items = Vec::new();
|
|
let mut remaining = bytes.as_slice();
|
|
let mut valid = true;
|
|
|
|
// Pre-allocate based on first size hint
|
|
if let Ok(first) = Item::decode_length_delimited(&mut remaining) {
|
|
items.push(first);
|
|
while !remaining.is_empty() {
|
|
match Item::decode_length_delimited(&mut remaining) {
|
|
Ok(item) => items.push(item),
|
|
Err(_) => {
|
|
valid = false;
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
} else if !remaining.is_empty() {
|
|
valid = false;
|
|
}
|
|
|
|
if valid {
|
|
crate::metrics::record_cache_op("moka", "hit", std::time::Duration::ZERO);
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
item_count = items.len(),
|
|
"vec cache hit"
|
|
);
|
|
return Ok(items);
|
|
}
|
|
|
|
tracing::warn!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
"vec cache decode failed, rebuilding"
|
|
);
|
|
// Invalidate the corrupt entry
|
|
cache().invalidate(&key);
|
|
}
|
|
|
|
tracing::debug!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
"vec cache miss, building response"
|
|
);
|
|
|
|
let start = std::time::Instant::now();
|
|
let response = build()?;
|
|
let build_elapsed = start.elapsed();
|
|
|
|
// Encode all items into a single buffer with length-delimited framing
|
|
let total_est: usize = response
|
|
.iter()
|
|
.map(|item| item.encoded_len() + 10) // 10 = prost length-delimited overhead
|
|
.sum();
|
|
let mut bytes = Vec::with_capacity(total_est);
|
|
let mut encode_ok = true;
|
|
for item in &response {
|
|
if let Err(err) = item.encode_length_delimited(&mut bytes) {
|
|
tracing::warn!(
|
|
namespace = %namespace,
|
|
repo = %repo_path,
|
|
error = %err,
|
|
"failed to encode vec cache item"
|
|
);
|
|
encode_ok = false;
|
|
break;
|
|
}
|
|
}
|
|
|
|
if encode_ok {
|
|
cache().insert(key, bytes);
|
|
}
|
|
crate::metrics::record_cache_op("moka", "miss", build_elapsed);
|
|
Ok(response)
|
|
}
|
|
|
|
// Request encoding helpers
|
|
|
|
/// Encode a protobuf request into a byte vector.
|
|
#[inline]
|
|
fn encode_request<Req: Message>(request: &Req) -> Vec<u8> {
|
|
let mut buf = Vec::with_capacity(request.encoded_len());
|
|
if let Err(err) = request.encode(&mut buf) {
|
|
tracing::warn!(error = %err, "failed to encode cache request");
|
|
}
|
|
buf
|
|
}
|
|
|
|
// Repository-scoped invalidation
|
|
|
|
/// Invalidate all cache entries for a specific repository.
|
|
///
|
|
/// Uses the structured key format to extract and match repository paths
|
|
/// without protobuf decoding or substring scanning. O(n) where n is the
|
|
/// number of cached entries, with O(1) per-key comparison.
|
|
///
|
|
/// Called by `notify_ref_update` after any mutator RPC (create commit,
|
|
/// create branch, etc.) to prevent serving stale data.
|
|
pub(crate) fn invalidate_repo(relative_path: &str) {
|
|
let c = cache();
|
|
let target = relative_path.as_bytes();
|
|
let mut keys_to_remove: Vec<std::sync::Arc<Vec<u8>>> = Vec::with_capacity(64);
|
|
|
|
for (key, _value) in c.iter() {
|
|
if key_matches_repo(&key, target) {
|
|
keys_to_remove.push(key);
|
|
}
|
|
}
|
|
|
|
let removed = keys_to_remove.len();
|
|
for key in &keys_to_remove {
|
|
c.invalidate(key.as_ref());
|
|
}
|
|
|
|
if removed > 0 {
|
|
tracing::debug!(
|
|
relative_path = %relative_path,
|
|
entries_removed = removed,
|
|
"cache invalidated for repository"
|
|
);
|
|
}
|
|
}
|
|
|
|
// Selector helpers
|
|
|
|
use crate::pb::{ObjectSelector, object_selector};
|
|
|
|
/// Returns true if the selector is an OID-based reference.
|
|
/// OID-based selectors are cacheable because they are immutable.
|
|
pub(crate) fn selector_is_oid(selector: &Option<ObjectSelector>) -> bool {
|
|
matches!(
|
|
selector.as_ref().and_then(|s| s.selector.as_ref()),
|
|
Some(object_selector::Selector::Oid(_))
|
|
)
|
|
}
|
|
|
|
/// Returns true if both selectors are OID-based.
|
|
pub(crate) fn selectors_are_oid(
|
|
left: &Option<ObjectSelector>,
|
|
right: &Option<ObjectSelector>,
|
|
) -> bool {
|
|
selector_is_oid(left) && selector_is_oid(right)
|
|
}
|