refactor(cache): redesign cache system with structured keys and improved performance

- Add repo_path parameter to cached_response and cached_vec_response functions
- Implement structured cache key format with namespace, repo_path, and request proto
- Replace global cache with Moka in-memory cache using weight-based eviction
- Set 256MB memory cap with 10-minute TTL and 2-minute TTI policy
- Add metrics collection for cache operations and evictions
- Implement efficient repo-scoped invalidation using key structure
- Add detailed documentation comments explaining cache architecture
- Remove outdated dependencies and update dependency versions
- Add error handling for encoding failures in cache operations
- Optimize Vec responses with length-delimited encoding and pre-allocation
This commit is contained in:
zhenyi
2026-06-12 12:53:23 +08:00
parent a40da90ef9
commit 934858bebf
82 changed files with 1273 additions and 4969 deletions
+129 -85
View File
@@ -1,13 +1,15 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use gitks::actor::init_actor_cluster;
use gitks::cluster::{ClusterConfig, ClusterManager};
use gitks::disk_cache::DiskCache;
use gitks::hooks::HookManager;
use gitks::metrics;
use gitks::server::{GitksService, serve};
use etcd_client::{Client, PutOptions};
use tokio::sync::Mutex;
use tracing_subscriber::EnvFilter;
use tracing_subscriber::fmt;
use tracing_subscriber::prelude::*;
@@ -16,6 +18,88 @@ const DEFAULT_HOST: &str = "0.0.0.0";
const DEFAULT_PORT: &str = "50051";
const DEFAULT_STORAGE_NAME: &str = "default";
/// etcd-backed config reader. Priority: etcd > env > default.
struct EtcdConfig {
client: Arc<Mutex<Client>>,
prefix: String,
}
impl EtcdConfig {
async fn connect(endpoints: Vec<String>, prefix: &str) -> Result<Self, String> {
let client = Client::connect(endpoints, None)
.await
.map_err(|e| format!("etcd connect: {e}"))?;
Ok(Self {
client: Arc::new(Mutex::new(client)),
prefix: prefix.to_string(),
})
}
/// Get config: etcd first, env second, default last.
async fn get(&self, key: &str, default: &str) -> String {
let etcd_key = format!("{}config/{}", self.prefix, key);
if let Ok(mut c) = self.client.try_lock()
&& let Ok(resp) = c.get(etcd_key.as_str(), None).await
&& let Some(kv) = resp.kvs().first()
&& let Ok(v) = kv.value_str()
&& !v.is_empty()
{
return v.to_string();
}
std::env::var(key).unwrap_or_else(|_| default.to_string())
}
/// Register this service under the common prefix for discovery by other services.
async fn register(&self, service_name: &str, addr: &str) -> Result<(), String> {
let instance_id = uuid::Uuid::now_v7().to_string();
let addr = addr.to_string();
let key = format!("{}services/{}/{}", self.prefix, service_name, instance_id);
let instance =
serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")});
let value = serde_json::to_string(&instance).map_err(|e| format!("json: {e}"))?;
let lease = {
let mut c = self.client.lock().await;
c.lease_grant(15, None)
.await
.map_err(|e| format!("lease: {e}"))?
};
{
let mut c = self.client.lock().await;
let opts = PutOptions::new().with_lease(lease.id());
c.put(key.clone(), value, Some(opts))
.await
.map_err(|e| format!("put: {e}"))?;
}
tracing::info!(service = service_name, addr = %addr, "registered in etcd");
let c = self.client.clone();
tokio::spawn(async move {
loop {
let r = {
let mut cl = c.lock().await;
cl.lease_keep_alive(lease.id()).await
};
drop(r);
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
if let Ok(lr) = {
let mut cl = c.lock().await;
cl.lease_grant(15, None).await
} {
let inst = serde_json::json!({"addr": &addr, "port": 0, "version": env!("CARGO_PKG_VERSION")});
if let Ok(v) = serde_json::to_string(&inst) {
let mut cl = c.lock().await;
let _ = cl
.put(key.clone(), v, Some(PutOptions::new().with_lease(lr.id())))
.await;
}
}
}
});
Ok(())
}
}
fn env_or(key: &str, default: &str) -> String {
std::env::var(key).unwrap_or_else(|_| default.into())
}
@@ -75,9 +159,17 @@ fn init_tracing() -> Option<tracing_appender::non_blocking::WorkerGuard> {
builder = builder.max_log_files(retention);
}
let file_appender = builder
.build(&log_dir)
.expect("failed to create log directory");
let file_appender = match builder.build(&log_dir) {
Ok(file_appender) => file_appender,
Err(err) => {
eprintln!("failed to create log directory '{log_dir}': {err}");
tracing_subscriber::registry()
.with(env_filter)
.with(fmt_layer)
.init();
return None;
}
};
let (non_blocking, guard) = tracing_appender::non_blocking(file_appender);
let file_layer = fmt::layer()
@@ -119,9 +211,40 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let host = env_or("GITKS_HOST", DEFAULT_HOST);
let port = env_or("GITKS_PORT", DEFAULT_PORT);
let storage_name = env_or("STORAGE_NAME", DEFAULT_STORAGE_NAME);
// --- etcd config overlay: connect etcd, override key settings ---
let etcd_endpoints: Vec<String> = std::env::var("GITKS_ETCD_ENDPOINTS")
.ok()
.filter(|s| !s.is_empty())
.map(|s| s.split(',').map(str::trim).map(String::from).collect())
.unwrap_or_else(|| vec!["http://localhost:2379".to_string()]);
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX").unwrap_or_else(|_| "/appks/".to_string());
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await.ok();
let host = if let Some(ref e) = etcd {
e.get("GITKS_HOST", &host).await
} else {
host
};
let port = if let Some(ref e) = etcd {
e.get("GITKS_PORT", &port).await
} else {
port
};
let storage_name = if let Some(ref e) = etcd {
e.get("GITKS_STORAGE_NAME", &storage_name).await
} else {
storage_name
};
let grpc_addr =
std::env::var("GITKS_ADVERTISE_ADDR").unwrap_or_else(|_| format!("http://{host}:{port}"));
// Register this service so other services (appks) can discover us
if let Some(ref e) = etcd {
let addr_str = format!("{host}:{port}");
e.register("gitks", &addr_str).await.ok();
}
let repo_prefix = std::env::var("REPO_PREFIX_PATH")
.map_err(|_| "REPO_PREFIX_PATH environment variable is required (e.g. /data/repos)")?;
let repo_prefix = PathBuf::from(&repo_prefix);
@@ -197,16 +320,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
None
};
// Health check / election configuration
let health_check_interval = env_u64("GITKS_HEALTH_CHECK_INTERVAL", 1);
let max_health_failures = env_u64("GITKS_MAX_HEALTH_FAILURES", 10);
tracing::info!(
interval_secs = health_check_interval,
max_failures = max_health_failures,
"health check configured"
);
let metrics_port = env_u64("GITKS_METRICS_PORT", 9100) as u16;
let http_cancel = tokio_util::sync::CancellationToken::new();
metrics::set_http_cancel_token(http_cancel.clone());
@@ -221,60 +334,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"slow request detection configured"
);
let etcd_endpoints = std::env::var("GITKS_ETCD_ENDPOINTS")
.ok()
.filter(|s| !s.is_empty())
.map(|s| {
s.split(',')
.map(str::trim)
.map(String::from)
.collect::<Vec<_>>()
});
let cluster_port = env_or("GITKS_CLUSTER_PORT", "4697")
.parse::<u16>()
.unwrap_or(4697);
let cluster_cookie = env_or("GITKS_CLUSTER_COOKIE", "gitks-default-cookie");
let lease_ttl = env_u64("GITKS_LEASE_TTL", 15) as i64;
let connect_timeout_ms = env_u64("GITKS_ETCD_CONNECT_TIMEOUT", 5000);
let cluster_hostname = std::env::var("GITKS_CLUSTER_HOSTNAME")
.or_else(|_| std::env::var("POD_IP"))
.or_else(|_| std::env::var("HOSTNAME"))
.unwrap_or_else(|_| "localhost".to_string());
let _cluster: Option<ClusterManager> = if let Some(endpoints) = etcd_endpoints {
tracing::info!(
?endpoints,
cluster_port,
cluster_hostname = %cluster_hostname,
"starting cluster discovery via etcd"
);
let config = ClusterConfig {
etcd_endpoints: endpoints,
storage_name: storage_name.clone(),
grpc_addr: grpc_addr.clone(),
cluster_port,
cookie: cluster_cookie,
lease_ttl_secs: lease_ttl,
connect_timeout_ms,
cluster_hostname,
};
match ClusterManager::start(config).await {
Ok(cm) => {
tracing::info!("cluster discovery active");
Some(cm)
}
Err(e) => {
tracing::warn!(error = %e, "etcd unavailable, running in standalone mode");
None
}
}
} else {
tracing::info!("GITKS_ETCD_ENDPOINTS not set, running in standalone mode");
None
};
let addr: std::net::SocketAddr = format!("{host}:{port}").parse()?;
let mut svc = GitksService::new(repo_prefix.clone());
@@ -288,17 +347,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
svc = svc.with_hook_manager(hm);
}
let raft_data_dir = repo_prefix.join(".gitks_raft");
let (node_actor, node_handle) = init_actor_cluster(
svc.clone(),
storage_name.clone(),
grpc_addr.clone(),
raft_data_dir,
)
.await?;
let svc = svc
.with_actor(node_actor.clone())
.with_grpc_addr(grpc_addr.clone());
let svc = svc.with_grpc_addr(grpc_addr.clone());
tracing::info!(
addr = %addr,
@@ -308,16 +357,11 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
"starting gitks gRPC server"
);
let _route_cache_cleanup = gitks::server::GitksService::start_route_cache_cleanup(svc.clone());
serve(addr, svc).await?;
// Gracefully shut down the HTTP metrics server
http_cancel.cancel();
node_actor.stop(None);
node_handle.await?;
tracing::info!("gitks shut down");
Ok(())
}