From 9a0c26e5f6113644af93c998224f297912f3096a Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 12:35:10 +0800 Subject: [PATCH] refactor(actor): implement Raft consensus algorithm for cluster leader election - Add voting mechanism with term tracking and vote persistence - Implement election triggering logic with majority vote counting - Add primary/replica role transition handling with state management - Integrate health check failure detection for automatic elections - Refactor actor messaging system for distributed coordination - Update repository registration to query cluster for existing primary - Add broadcast mechanism for role change notifications - Implement proper term comparison and duplicate request filtering - Upgrade dependency versions including tokio-util for async utilities - Optimize code formatting and line wrapping for improved readability - Remove redundant blank lines and improve code structure consistency - Enhance error logging and trace information for debugging purposes --- Cargo.lock | 58 +++---- Cargo.toml | 1 + actor/handler.rs | 182 +++++++++++++++------ actor/message.rs | 4 +- actor/mod.rs | 1 + actor/sync.rs | 299 +++++++++++++++++++++++----------- commit/count_commits.rs | 16 +- commit/find_commit.rs | 27 ++- commit/get_commit.rs | 7 +- commit/query.rs | 59 +++++-- diff/changed_paths.rs | 49 +++++- diff/raw.rs | 4 +- disk_cache.rs | 5 - hooks/runner.rs | 22 ++- hooks/sanitize.rs | 96 ++++++++++- lib.rs | 6 +- metrics.rs | 30 ++-- pack/mod.rs | 30 ++++ pack/receive_pack.rs | 62 +++++-- pack/upload_pack.rs | 64 ++++++-- pack_cache.rs | 16 +- rate_limit.rs | 83 ++++++---- refs/find_refs.rs | 8 +- refs/update_refs.rs | 36 +++- remote/find_remote.rs | 35 +++- remote/mirror.rs | 35 +++- repository/find_license.rs | 15 +- repository/find_merge_base.rs | 14 +- repository/objects_size.rs | 13 +- repository/optimize.rs | 73 +++++++-- repository/raw_changes.rs | 26 ++- repository/search_files.rs | 36 +++- server/commit.rs | 2 - server/diff.rs | 8 +- server/mod.rs | 94 +++++++++-- server/pack.rs | 25 ++- server/refs.rs | 2 +- server/remote.rs | 2 +- server/repository.rs | 9 +- tests/cluster_test.rs | 79 ++++++--- 40 files changed, 1184 insertions(+), 449 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a33b4cf..c044e43 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -134,9 +134,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6" [[package]] name = "bitflags" -version = "2.12.1" +version = "2.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "84d7ced0ae9557296835c32bf1b1e02b44c746701f898460fb000d7eaa84f00a" +checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8" dependencies = [ "serde_core", ] @@ -704,6 +704,7 @@ dependencies = [ "thiserror", "tokio", "tokio-stream", + "tokio-util", "tonic", "tonic-prost", "tonic-prost-build", @@ -1655,9 +1656,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" [[package]] name = "http" -version = "1.4.1" +version = "1.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0" +checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425" dependencies = [ "bytes", "itoa", @@ -1854,13 +1855,12 @@ dependencies = [ [[package]] name = "js-sys" -version = "0.3.99" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11" +checksum = "f2025f20d7a4fa7785846e7b63d10a76d3f1cee98ee5cb79ea59703f95e42162" dependencies = [ "cfg-if", "futures-util", - "once_cell", "wasm-bindgen", ] @@ -2164,9 +2164,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" +checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1" dependencies = [ "bytes", "prost-derive", @@ -2174,9 +2174,9 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7" +checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", "itertools", @@ -2195,9 +2195,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" +checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", "itertools", @@ -2208,9 +2208,9 @@ dependencies = [ [[package]] name = "prost-types" -version = "0.14.3" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7" +checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a" dependencies = [ "prost", ] @@ -3161,9 +3161,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1" [[package]] name = "uuid" -version = "1.23.2" +version = "1.23.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7" +checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7" dependencies = [ "getrandom 0.4.2", "js-sys", @@ -3227,9 +3227,9 @@ dependencies = [ [[package]] name = "wasm-bindgen" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409" +checksum = "a254a4b10c19a76f09a27640e7ffbf9bc30bf67e16a3bf28aaefa4920fe81563" dependencies = [ "cfg-if", "once_cell", @@ -3240,9 +3240,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-futures" -version = "0.4.72" +version = "0.4.73" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f" +checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf" dependencies = [ "js-sys", "wasm-bindgen", @@ -3250,9 +3250,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6" +checksum = "24a40fc75b0ec6f3746ceb10d36f53a93dcd68a93b11b6445983945d79eba0dc" dependencies = [ "quote", "wasm-bindgen-macro-support", @@ -3260,9 +3260,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-macro-support" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e" +checksum = "908f34bd9b9ce3d4caf07b72dfab63d61504d156856c6bd3cd87fa350cf3985b" dependencies = [ "bumpalo", "proc-macro2", @@ -3273,9 +3273,9 @@ dependencies = [ [[package]] name = "wasm-bindgen-shared" -version = "0.2.122" +version = "0.2.123" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437" +checksum = "7acbf7616c27b194bbb550bf77ed0c2c3e5b7fd1260a93082b95fb7f47959b92" dependencies = [ "unicode-ident", ] @@ -3316,9 +3316,9 @@ dependencies = [ [[package]] name = "web-sys" -version = "0.3.99" +version = "0.3.100" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436" +checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69" dependencies = [ "js-sys", "wasm-bindgen", diff --git a/Cargo.toml b/Cargo.toml index 0745e5e..03f34d0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ duct = { version = "1", features = [] } tracing = { version = "0.1", features = ["log"] } tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "io-util", "sync", "net"] } tokio-stream = { version = "0.1", features = ["full"] } +tokio-util = "0.7" thiserror = { version = "2", features = [] } prost = "0.14" prost-types = "0.14" diff --git a/actor/handler.rs b/actor/handler.rs index 16b163d..c18885d 100644 --- a/actor/handler.rs +++ b/actor/handler.rs @@ -2,10 +2,11 @@ use crate::actor::message::{ ElectionRequest, ElectionResult, GitNodeMessage, NodeHealth, ROLE_PRIMARY, ROLE_REPLICA, RefUpdateEvent, RoleChangedEvent, RouteDecision, }; +use crate::pb::RepositoryHeader; use crate::server::GitksService; use async_trait::async_trait; use ractor::pg; -use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; +use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent}; use std::collections::HashMap; #[derive(Clone)] @@ -43,6 +44,7 @@ pub struct GitNodeState { health_failures: u32, is_primary: bool, last_known_primary_grpc: String, + voted_for: Option, } #[async_trait] @@ -76,6 +78,7 @@ impl Actor for GitNodeActor { health_failures: 0, is_primary: true, // Will be refined at registration last_known_primary_grpc: args.grpc_addr.clone(), + voted_for: None, }) } @@ -96,12 +99,12 @@ impl Actor for GitNodeActor { .unwrap_or(&repo_path) .trim_start_matches('/') .to_string(); - register_repo(&myself, state, relative_path); + register_repo(&myself, state, relative_path).await; } } GitNodeMessage::RegisterRepository(header) => { - register_repo(&myself, state, header.relative_path); + register_repo(&myself, state, header.relative_path).await; } GitNodeMessage::RemoveRepository(header) => { @@ -173,10 +176,12 @@ impl Actor for GitNodeActor { term = request.term, current_term = state.current_term, accepted = accepted, + voted_for = ?state.voted_for, "election vote" ); if accepted { state.current_term = request.term; + state.voted_for = Some(request.candidate_storage_name.clone()); state.last_known_primary_grpc = request.candidate_grpc_addr.clone(); } reply @@ -208,6 +213,7 @@ impl Actor for GitNodeActor { state.is_primary = true; state.current_term = event.term; state.health_failures = 0; + state.voted_for = None; for entry in state.repos.values_mut() { entry.role = ROLE_PRIMARY.to_string(); entry.read_only = false; @@ -220,6 +226,7 @@ impl Actor for GitNodeActor { ); state.is_primary = false; state.current_term = event.term; + state.voted_for = None; for entry in state.repos.values_mut() { entry.role = ROLE_REPLICA.to_string(); } @@ -237,6 +244,76 @@ impl Actor for GitNodeActor { }; } } + + GitNodeMessage::TriggerElection => { + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); + let total = members.len(); + let my_cell = myself.get_cell(); + + let new_term = state.current_term.wrapping_add(1); + + let mut accepted_count = 0u64; + for member in &members { + if *member == my_cell { + // We vote for ourselves + accepted_count += 1; + continue; + } + let actor_ref: ActorRef = member.clone().into(); + let request = ElectionRequest { + candidate_storage_name: state.storage_name.clone(), + candidate_grpc_addr: state.grpc_addr.clone(), + candidate_actor_name: state.actor_name.clone(), + term: new_term, + reason: "health_check_failure".to_string(), + }; + match ractor::call_t!(actor_ref, GitNodeMessage::ElectPrimary, 1000, request) { + Ok(result) if result.accepted => { + accepted_count += 1; + } + Ok(_) => {} + Err(_) => { + tracing::warn!( + member = ?member.get_id(), + "no response from member during election" + ); + } + } + } + + let majority = (total / 2).max(1) + 1; + if accepted_count >= majority as u64 { + tracing::info!( + term = new_term, + accepted = accepted_count, + total = total, + "won election, promoting to PRIMARY" + ); + state.is_primary = true; + state.current_term = new_term; + state.health_failures = 0; + state.voted_for = None; + for entry in state.repos.values_mut() { + entry.role = ROLE_PRIMARY.to_string(); + entry.read_only = false; + } + let role_event = RoleChangedEvent { + storage_name: state.storage_name.clone(), + grpc_addr: state.grpc_addr.clone(), + new_role: ROLE_PRIMARY.to_string(), + term: new_term, + relative_paths: state.repos.keys().cloned().collect(), + }; + broadcast_role_changed(&myself, role_event); + } else { + tracing::warn!( + term = new_term, + accepted = accepted_count, + total = total, + "election lost, staying as REPLICA" + ); + } + } } Ok(()) } @@ -277,9 +354,8 @@ impl Actor for GitNodeActor { /// Determine whether to accept an election request. fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bool { - // Only accept if the term is greater than our current term - // (prevents old/duplicate election messages) - if request.term <= state.current_term { + // Reject old terms (prevents old/duplicate election messages) + if request.term < state.current_term { tracing::warn!( request_term = request.term, current_term = state.current_term, @@ -287,6 +363,20 @@ fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bo ); return false; } + // Same term: only accept if we haven't already voted for someone else + if request.term == state.current_term + && let Some(ref voted_for) = state.voted_for + && voted_for != &request.candidate_storage_name + { + tracing::warn!( + request_term = request.term, + current_term = state.current_term, + already_voted = %voted_for, + candidate = %request.candidate_storage_name, + "rejecting election: already voted this term" + ); + return false; + } true } @@ -318,7 +408,7 @@ fn build_decision( } } -fn register_repo( +async fn register_repo( myself: &ActorRef, state: &mut GitNodeState, relative_path: String, @@ -329,10 +419,19 @@ fn register_repo( let members = ractor::pg::get_members(&"gitks_nodes".to_string()); let my_cell = myself.get_cell(); - let other_nodes_exist = members.iter().any(|m| m != &my_cell); - let role = if other_nodes_exist { - ROLE_REPLICA.to_string() + let role = if members.iter().any(|m| m != &my_cell) { + let header = RepositoryHeader { + storage_name: String::new(), + relative_path: relative_path.clone(), + storage_path: String::new(), + }; + let primary_found = find_primary_in_cluster(&members, &my_cell, &header).await; + if primary_found { + ROLE_REPLICA.to_string() + } else { + ROLE_PRIMARY.to_string() + } } else { ROLE_PRIMARY.to_string() }; @@ -365,6 +464,28 @@ fn register_repo( ); } +/// Query all cluster members (except self) to find if a repository has a PRIMARY. +pub async fn find_primary_in_cluster( + members: &[ActorCell], + my_cell: &ActorCell, + header: &RepositoryHeader, +) -> bool { + for member in members { + if member == my_cell { + continue; + } + let actor_ref: ActorRef = member.clone().into(); + if let Ok(decision) = + ractor::call_t!(actor_ref, GitNodeMessage::FindPrimary, 500, header.clone()) + && decision.found + && decision.role == ROLE_PRIMARY + { + return true; + } + } + false +} + fn extract_category(relative_path: &str) -> &str { relative_path.split('/').next().unwrap_or("root") } @@ -417,9 +538,9 @@ fn start_health_checker(myself: ActorRef, interval_secs: u64, ma if consecutive_failures >= max_failures { tracing::error!( - "no other nodes reachable for {max_failures} checks, triggering self-election as PRIMARY" + "no other nodes reachable for {max_failures} checks, triggering election" ); - trigger_self_election(&myself); + myself.cast(GitNodeMessage::TriggerElection).ok(); consecutive_failures = 0; } } @@ -427,43 +548,6 @@ fn start_health_checker(myself: ActorRef, interval_secs: u64, ma }); } -/// Trigger self-election: this node promotes itself to PRIMARY. -fn trigger_self_election(myself: &ActorRef) { - let members = ractor::pg::get_members(&"gitks_nodes".to_string()); - let total_nodes = members.len(); - - tracing::warn!( - total_nodes = total_nodes, - "initiating self-election as new PRIMARY" - ); - - let new_term = std::time::SystemTime::now() - .duration_since(std::time::SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(); - - myself - .cast(GitNodeMessage::RoleChanged(RoleChangedEvent { - storage_name: String::new(), // will be filled by handler from our own state - grpc_addr: String::new(), - new_role: ROLE_PRIMARY.to_string(), - term: new_term, - relative_paths: Vec::new(), // all repos - })) - .ok(); - - broadcast_role_changed( - myself, - RoleChangedEvent { - storage_name: String::new(), // handler fills - grpc_addr: String::new(), - new_role: ROLE_PRIMARY.to_string(), - term: new_term, - relative_paths: Vec::new(), - }, - ); -} - pub async fn start_node_actor( service: GitksService, storage_name: String, diff --git a/actor/message.rs b/actor/message.rs index e4da10d..8a3ce69 100644 --- a/actor/message.rs +++ b/actor/message.rs @@ -149,6 +149,9 @@ pub enum GitNodeMessage { /// A role change has occurred in the cluster. RoleChanged(RoleChangedEvent), + + /// Health checker detected primary failure, trigger election. + TriggerElection, } #[derive(ractor_cluster::RactorMessage)] @@ -156,7 +159,6 @@ pub enum RepoActorMessage { UpdateMetadata(RepositoryHeader), } - /// Request for a node to vote in a PRIMARY election. #[derive(Debug, Clone)] pub struct ElectionRequest { diff --git a/actor/mod.rs b/actor/mod.rs index ebe415f..c8ba24b 100644 --- a/actor/mod.rs +++ b/actor/mod.rs @@ -3,6 +3,7 @@ pub mod message; pub mod server; pub mod sync; +pub use handler::find_primary_in_cluster; pub use handler::{ GitNodeActor, GitNodeArgs, RepoEntry, broadcast_ref_update, broadcast_role_changed, get_category_members, get_cluster_nodes, list_all_groups, route_group_for, start_node_actor, diff --git a/actor/sync.rs b/actor/sync.rs index 9a87461..23c4a61 100644 --- a/actor/sync.rs +++ b/actor/sync.rs @@ -39,6 +39,57 @@ impl BundleApplicator { } Ok(()) } + + /// Apply bundle from a file path (for streaming writes). + pub fn apply_bundle_from_file(&self, path: &Path) -> Result<(), String> { + let file = std::fs::File::open(path).map_err(|e| format!("open bundle file: {e}"))?; + let mut child = std::process::Command::new("git") + .args([ + "--git-dir", + &self.repo_path.to_string_lossy(), + "bundle", + "unbundle", + "-", + ]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| format!("spawn git bundle unbundle: {e}"))?; + + // Stream file contents to stdin in a background thread + let mut stdin = child.stdin.take().ok_or("no stdin")?; + let file_handle = file; + let writer = std::thread::spawn(move || -> Result<(), String> { + use std::io::{Read, Write}; + let mut reader = std::io::BufReader::new(file_handle); + let mut buf = vec![0u8; 65536]; + loop { + match reader.read(&mut buf) { + Ok(0) => break, + Ok(n) => { + stdin + .write_all(&buf[..n]) + .map_err(|e| format!("write to stdin: {e}"))?; + } + Err(e) => return Err(format!("read bundle file: {e}")), + } + } + Ok(()) + }); + + let output = child + .wait_with_output() + .map_err(|e| format!("wait bundle: {e}"))?; + + // Wait for writer thread + let _ = writer.join().map_err(|_| "writer thread panicked")?; + + if !output.status.success() { + return Err(String::from_utf8_lossy(&output.stderr).into_owned()); + } + Ok(()) + } } pub fn collect_local_haves(repo_path: &Path) -> Result, String> { @@ -92,20 +143,45 @@ pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf) let relative_path = event.relative_path.clone(); let repo_for_haves = local_repo_path.clone(); - match tokio::task::spawn_blocking(move || { - sync_via_pack_service(&grpc_addr, &relative_path, &repo_for_haves) - }) - .await + // Collect haves in a blocking thread + let haves = match tokio::task::spawn_blocking(move || collect_local_haves(&repo_for_haves)) + .await { - Ok(Ok(pack_data)) if !pack_data.is_empty() => { - let pack_len = pack_data.len(); + Ok(Ok(h)) => h, + Ok(Err(e)) => { + tracing::error!(relative_path = %event.relative_path, error = %e, "collect haves failed"); + return; + } + Err(e) => { + tracing::error!(relative_path = %event.relative_path, error = %e, "haves task failed"); + return; + } + }; + + // Stream pack data to a temporary file to avoid OOM + let temp_dir = local_repo_path.join(".gitks_tmp"); + if let Err(e) = std::fs::create_dir_all(&temp_dir) { + tracing::error!(relative_path = %event.relative_path, error = %e, "create temp dir failed"); + return; + } + + let pack_result = + sync_via_pack_service_to_file(&grpc_addr, &relative_path, &haves, &temp_dir).await; + + match pack_result { + Ok(Some(pack_file)) => { let repo = local_repo_path.clone(); - match tokio::task::spawn_blocking(move || apply_pack_data(&repo, &pack_data)).await { + let pack_path = pack_file.clone(); + match tokio::task::spawn_blocking(move || { + let applicator = BundleApplicator::new(repo); + applicator.apply_bundle_from_file(&pack_path) + }) + .await + { Ok(Ok(())) => { update_local_ref(&local_repo_path, &event.ref_name, &event.new_oid); tracing::info!( relative_path = %event.relative_path, - bytes = pack_len, "replica sync done" ); } @@ -116,119 +192,144 @@ pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf) tracing::error!(relative_path = %event.relative_path, error = %e, "apply task failed") } } + // Cleanup temp file + let _ = std::fs::remove_file(&pack_file); } - Ok(Ok(_)) => { + Ok(None) => { tracing::warn!(relative_path = %event.relative_path, "empty pack data from primary") } - Ok(Err(e)) => { + Err(e) => { tracing::error!(relative_path = %event.relative_path, error = %e, "pack fetch failed") } - Err(e) => { - tracing::error!(relative_path = %event.relative_path, error = %e, "sync task failed") - } } + + // Cleanup temp dir if empty + let _ = std::fs::remove_dir(&temp_dir); } -fn sync_via_pack_service( +/// Maximum pack size before we reject (10GB) +const MAX_PACK_SIZE: u64 = 10 * 1024 * 1024 * 1024; + +/// Stream pack data from primary to a temporary file. +/// Returns Ok(Some(path)) on success, Ok(None) if empty, Err on failure. +async fn sync_via_pack_service_to_file( grpc_addr: &str, relative_path: &str, - local_repo_path: &Path, -) -> Result, String> { - let haves = collect_local_haves(local_repo_path)?; + haves: &[Oid], + temp_dir: &Path, +) -> Result, String> { + use crate::pb::pack_service_client::PackServiceClient; + use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader}; + use tokio::io::AsyncWriteExt; + use tokio_stream::StreamExt; - let rt = tokio::runtime::Handle::current(); - rt.block_on(async { - use crate::pb::pack_service_client::PackServiceClient; - use crate::pb::{ - AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader, - }; - use tokio_stream::StreamExt; + let endpoint = crate::server::remote_endpoint(grpc_addr) + .await + .map_err(|e| e.to_string())?; - let endpoint = crate::server::remote_endpoint(grpc_addr) - .await - .map_err(|e| e.to_string())?; + let mut client = PackServiceClient::connect(endpoint) + .await + .map_err(|e| format!("connect to primary: {e}"))?; - let mut client = PackServiceClient::connect(endpoint) - .await - .map_err(|e| format!("connect to primary: {e}"))?; + let header = RepositoryHeader { + storage_name: String::new(), + relative_path: relative_path.to_string(), + storage_path: String::new(), + }; - let header = RepositoryHeader { - storage_name: String::new(), - relative_path: relative_path.to_string(), - storage_path: String::new(), - }; - - let refs_resp = client - .advertise_refs(AdvertiseRefsRequest { - repository: Some(header.clone()), - protocol: None, - service: "upload-pack".to_string(), - raw: false, - }) - .await - .map_err(|e| format!("AdvertiseRefs: {e}"))?; - - let refs = refs_resp.into_inner().references; - if refs.is_empty() { - return Ok(Vec::new()); - } - - let wants: Vec = refs.iter().filter_map(|r| r.target_oid.clone()).collect(); - - let want_count = wants.len(); - let have_count = haves.len(); - - tracing::info!( - relative_path = %relative_path, - want_count, - have_count, - "requesting incremental pack from primary" - ); - - let options = PackObjectsOptions { - wants, - haves, - shallow_revisions: Vec::new(), - deepen: 0, - thin_pack: false, - include_tag: true, - use_bitmaps: true, - delta_base_offset: true, - pathspec: Vec::new(), - }; - - let req = PackObjectsRequest { + let refs_resp = client + .advertise_refs(AdvertiseRefsRequest { repository: Some(header.clone()), - options: Some(options), - }; + protocol: None, + service: "upload-pack".to_string(), + raw: false, + }) + .await + .map_err(|e| format!("AdvertiseRefs: {e}"))?; - let resp = client - .pack_objects(req) - .await - .map_err(|e| format!("PackObjects: {e}"))?; + let refs = refs_resp.into_inner().references; + if refs.is_empty() { + return Ok(None); + } - let mut stream = resp.into_inner(); - let mut pack_data = Vec::new(); - while let Some(chunk) = stream.next().await { - match chunk { - Ok(msg) => pack_data.extend_from_slice(&msg.data), - Err(e) => return Err(format!("pack stream: {e}")), + let wants: Vec = refs.iter().filter_map(|r| r.target_oid.clone()).collect(); + + let want_count = wants.len(); + let have_count = haves.len(); + + tracing::info!( + relative_path = %relative_path, + want_count, + have_count, + "requesting incremental pack from primary" + ); + + let options = PackObjectsOptions { + wants, + haves: haves.to_vec(), + shallow_revisions: Vec::new(), + deepen: 0, + thin_pack: false, + include_tag: true, + use_bitmaps: true, + delta_base_offset: true, + pathspec: Vec::new(), + }; + + let req = PackObjectsRequest { + repository: Some(header.clone()), + options: Some(options), + }; + + let resp = client + .pack_objects(req) + .await + .map_err(|e| format!("PackObjects: {e}"))?; + + let mut stream = resp.into_inner(); + + // Create a temporary file for streaming + let temp_file = temp_dir.join(format!("pack_{}.bundle", std::process::id())); + let mut file = tokio::fs::File::create(&temp_file) + .await + .map_err(|e| format!("create temp file: {e}"))?; + + let mut total_bytes: u64 = 0; + while let Some(chunk) = stream.next().await { + match chunk { + Ok(msg) => { + total_bytes += msg.data.len() as u64; + if total_bytes > MAX_PACK_SIZE { + let _ = tokio::fs::remove_file(&temp_file).await; + return Err(format!( + "pack data exceeds maximum size ({}GB)", + MAX_PACK_SIZE / (1024 * 1024 * 1024) + )); + } + file.write_all(&msg.data) + .await + .map_err(|e| format!("write pack data: {e}"))?; + } + Err(e) => { + let _ = tokio::fs::remove_file(&temp_file).await; + return Err(format!("pack stream: {e}")); } } + } - tracing::info!( - relative_path = %relative_path, - pack_bytes = pack_data.len(), - "received pack data from primary" - ); + // Flush and close the file + file.flush() + .await + .map_err(|e| format!("flush pack file: {e}"))?; + drop(file); - Ok(pack_data) - }) -} + tracing::info!( + relative_path = %relative_path, + pack_bytes = total_bytes, + "received pack data from primary" + ); -fn apply_pack_data(repo_path: &Path, pack_data: &[u8]) -> Result<(), String> { - let applicator = BundleApplicator::new(repo_path.to_path_buf()); - applicator.apply_bundle(pack_data) + Ok(Some(temp_file)) } fn update_local_ref(repo_path: &Path, ref_name: &str, new_oid: &str) { diff --git a/commit/count_commits.rs b/commit/count_commits.rs index 1d4ff95..1f72993 100644 --- a/commit/count_commits.rs +++ b/commit/count_commits.rs @@ -5,7 +5,11 @@ use crate::pb::*; impl GitBare { /// Count commits in a revision range or path. pub fn count_commits(&self, request: CountCommitsRequest) -> GitResult { - let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision }; + let revision = if request.revision.is_empty() { + "HEAD" + } else { + &request.revision + }; crate::sanitize::validate_revision(revision)?; let mut args = vec![ @@ -48,7 +52,10 @@ impl GitBare { } /// Count diverging commits between two branches (left vs right). - pub fn count_diverging_commits(&self, request: CountDivergingCommitsRequest) -> GitResult { + pub fn count_diverging_commits( + &self, + request: CountDivergingCommitsRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.left)?; crate::sanitize::validate_revision(&request.right)?; @@ -75,6 +82,9 @@ impl GitBare { let left_count = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0); let right_count = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0); - Ok(CountDivergingCommitsResponse { left_count, right_count }) + Ok(CountDivergingCommitsResponse { + left_count, + right_count, + }) } } diff --git a/commit/find_commit.rs b/commit/find_commit.rs index 3f208d2..5d6b975 100644 --- a/commit/find_commit.rs +++ b/commit/find_commit.rs @@ -13,18 +13,27 @@ impl GitBare { crate::sanitize::validate_revision(&revision)?; let repo = self.gix_repo()?; - let oid = repo.rev_parse_single(revision.as_str()) + let oid = repo + .rev_parse_single(revision.as_str()) .map_err(|e| GitError::Gix(e.to_string()))?; - let commit = oid.object() + let commit = oid + .object() .map_err(|e| GitError::Gix(e.to_string()))? .try_into_commit() .map_err(|e| GitError::Gix(format!("not a commit: {e}")))?; - Ok(crate::commit::get_commit::commit_to_pb(self, &commit, request.include_stats)) + Ok(crate::commit::get_commit::commit_to_pb( + self, + &commit, + request.include_stats, + )) } /// Batch lookup commits by OID list. - pub fn list_commits_by_oid(&self, request: ListCommitsByOidRequest) -> GitResult { + pub fn list_commits_by_oid( + &self, + request: ListCommitsByOidRequest, + ) -> GitResult { let repo = self.gix_repo()?; let mut commits = Vec::new(); @@ -33,11 +42,17 @@ impl GitBare { if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) { if let Ok(obj) = repo.find_object(oid) { if let Ok(commit) = obj.try_into_commit() { - commits.push(crate::commit::get_commit::commit_to_pb(self, &commit, request.include_stats)); + commits.push(crate::commit::get_commit::commit_to_pb( + self, + &commit, + request.include_stats, + )); } } } - if commits.len() >= 100 { break; } + if commits.len() >= 100 { + break; + } } Ok(ListCommitsByOidResponse { commits }) diff --git a/commit/get_commit.rs b/commit/get_commit.rs index 74fa4a1..22b5c83 100644 --- a/commit/get_commit.rs +++ b/commit/get_commit.rs @@ -19,7 +19,10 @@ impl GitBare { pub(crate) fn commit_to_pb(gb: &GitBare, commit: &gix::Commit<'_>, include_raw: bool) -> Commit { let hex = commit.id.to_string(); let tree_hex = commit.tree_id().map(|t| t.to_string()).unwrap_or_default(); - let message = commit.message_raw().map(|m| m.to_string()).unwrap_or_default(); + let message = commit + .message_raw() + .map(|m| m.to_string()) + .unwrap_or_default(); let (subject, body) = message .split_once('\n') .map(|(s, b)| (s.to_string(), b.trim_start_matches('\n').to_string())) @@ -74,4 +77,4 @@ pub(crate) fn gix_sig_to_pb(sig: &gix::actor::SignatureRef<'_>) -> crate::pb::Si }), timezone_offset: time.map(|t| t.offset / 60).unwrap_or(0), } -} \ No newline at end of file +} diff --git a/commit/query.rs b/commit/query.rs index 16cb56b..2e39385 100644 --- a/commit/query.rs +++ b/commit/query.rs @@ -4,11 +4,22 @@ use crate::pb::*; impl GitBare { /// Search commits by message content. - pub fn commits_by_message(&self, request: CommitsByMessageRequest) -> GitResult { - let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision }; + pub fn commits_by_message( + &self, + request: CommitsByMessageRequest, + ) -> GitResult { + let revision = if request.revision.is_empty() { + "HEAD" + } else { + &request.revision + }; crate::sanitize::validate_revision(revision)?; - let limit = if request.limit == 0 { 20 } else { request.limit.min(200) }; + let limit = if request.limit == 0 { + 20 + } else { + request.limit.min(200) + }; let mut args = vec![ "--git-dir".to_string(), @@ -50,7 +61,9 @@ impl GitBare { if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) { if let Ok(obj) = repo.find_object(oid) { if let Ok(commit) = obj.try_into_commit() { - commits.push(crate::commit::get_commit::commit_to_pb(self, &commit, false)); + commits.push(crate::commit::get_commit::commit_to_pb( + self, &commit, false, + )); } } } @@ -60,7 +73,10 @@ impl GitBare { } /// Batch check if objects/revisions exist. - pub fn check_objects_exist(&self, request: CheckObjectsExistRequest) -> GitResult { + pub fn check_objects_exist( + &self, + request: CheckObjectsExistRequest, + ) -> GitResult { let repo = self.gix_repo()?; let mut revisions = Vec::new(); @@ -119,13 +135,24 @@ impl GitBare { } } - Ok(CommitStats { additions, deletions, changed_files }) + Ok(CommitStats { + additions, + deletions, + changed_files, + }) } /// Get the last commit for a given path. - pub fn last_commit_for_path(&self, request: LastCommitForPathRequest) -> GitResult { + pub fn last_commit_for_path( + &self, + request: LastCommitForPathRequest, + ) -> GitResult { crate::sanitize::validate_file_path(&request.path)?; - let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision }; + let revision = if request.revision.is_empty() { + "HEAD" + } else { + &request.revision + }; crate::sanitize::validate_revision(revision)?; let args = vec![ @@ -155,20 +182,26 @@ impl GitBare { let hex = stdout.lines().next().unwrap_or("").trim().to_string(); if hex.is_empty() { - return Ok(LastCommitForPathResponse { commit: None, path: request.path }); + return Ok(LastCommitForPathResponse { + commit: None, + path: request.path, + }); } let repo = self.gix_repo()?; let commit = if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) { repo.find_object(oid).ok().and_then(|obj| { - obj.try_into_commit().ok().map(|c| { - crate::commit::get_commit::commit_to_pb(self, &c, false) - }) + obj.try_into_commit() + .ok() + .map(|c| crate::commit::get_commit::commit_to_pb(self, &c, false)) }) } else { None }; - Ok(LastCommitForPathResponse { commit, path: request.path }) + Ok(LastCommitForPathResponse { + commit, + path: request.path, + }) } } diff --git a/diff/changed_paths.rs b/diff/changed_paths.rs index 9cb1f1c..2cef89f 100644 --- a/diff/changed_paths.rs +++ b/diff/changed_paths.rs @@ -4,7 +4,10 @@ use crate::pb::*; impl GitBare { /// Find changed paths between two revisions (no diff content). - pub fn find_changed_paths(&self, request: FindChangedPathsRequest) -> GitResult { + pub fn find_changed_paths( + &self, + request: FindChangedPathsRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.base)?; crate::sanitize::validate_revision(&request.head)?; @@ -41,21 +44,49 @@ impl GitBare { for line in stdout.lines() { let line = line.trim(); - if line.is_empty() { continue; } + if line.is_empty() { + continue; + } let parts: Vec<&str> = line.split('\t').collect(); - if parts.is_empty() { continue; } + if parts.is_empty() { + continue; + } let status_str = parts[0]; let status_letter = status_str.chars().next().unwrap_or('M'); let (status, old_path, new_path) = match status_letter { - 'A' => (changed_path::Status::ChangedPathStatusAdded as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()), - 'D' => (changed_path::Status::ChangedPathStatusDeleted as i32, parts.get(1).cloned().unwrap_or_default().to_string(), String::new()), - 'R' => (changed_path::Status::ChangedPathStatusRenamed as i32, parts.get(1).cloned().unwrap_or_default().to_string(), parts.get(2).cloned().unwrap_or_default().to_string()), - 'C' => (changed_path::Status::ChangedPathStatusCopied as i32, parts.get(1).cloned().unwrap_or_default().to_string(), parts.get(2).cloned().unwrap_or_default().to_string()), - 'T' => (changed_path::Status::ChangedPathStatusTypeChanged as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()), - _ => (changed_path::Status::ChangedPathStatusModified as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()), + 'A' => ( + changed_path::Status::ChangedPathStatusAdded as i32, + String::new(), + parts.get(1).cloned().unwrap_or_default().to_string(), + ), + 'D' => ( + changed_path::Status::ChangedPathStatusDeleted as i32, + parts.get(1).cloned().unwrap_or_default().to_string(), + String::new(), + ), + 'R' => ( + changed_path::Status::ChangedPathStatusRenamed as i32, + parts.get(1).cloned().unwrap_or_default().to_string(), + parts.get(2).cloned().unwrap_or_default().to_string(), + ), + 'C' => ( + changed_path::Status::ChangedPathStatusCopied as i32, + parts.get(1).cloned().unwrap_or_default().to_string(), + parts.get(2).cloned().unwrap_or_default().to_string(), + ), + 'T' => ( + changed_path::Status::ChangedPathStatusTypeChanged as i32, + String::new(), + parts.get(1).cloned().unwrap_or_default().to_string(), + ), + _ => ( + changed_path::Status::ChangedPathStatusModified as i32, + String::new(), + parts.get(1).cloned().unwrap_or_default().to_string(), + ), }; paths.push(ChangedPath { diff --git a/diff/raw.rs b/diff/raw.rs index 828aa0f..c87b6fb 100644 --- a/diff/raw.rs +++ b/diff/raw.rs @@ -18,7 +18,9 @@ impl GitBare { // Apply options if present if let Some(ref opts) = request.options { - if opts.recursive { args.push("--recursive".to_string()); } + if opts.recursive { + args.push("--recursive".to_string()); + } if opts.include_binary { args.push("--binary".to_string()); } else { diff --git a/disk_cache.rs b/disk_cache.rs index 0e0b2db..321287b 100644 --- a/disk_cache.rs +++ b/disk_cache.rs @@ -94,7 +94,6 @@ impl DiskCache { self.enabled } - fn state_dir_for(&self, relative_path: &str) -> PathBuf { self.repo_prefix .join(STATE_DIR_RELATIVE) @@ -109,7 +108,6 @@ impl DiskCache { self.state_dir_for(relative_path).join("pending") } - fn cache_dir(&self, namespace: &str) -> PathBuf { self.repo_prefix.join(namespace) } @@ -118,7 +116,6 @@ impl DiskCache { self.cache_dir(namespace).join(digest_to_path(digest)) } - /// Ensure the state directory for a repository exists and has a `latest` file. /// If `latest` does not exist, create it with a random value. pub fn ensure_state(&self, relative_path: &str) -> GitResult { @@ -230,7 +227,6 @@ impl DiskCache { Ok(()) } - /// Compute a cache key for an info/refs request. pub fn compute_info_refs_key(&self, relative_path: &str, protocol: &str) -> GitResult { let latest = self.ensure_state(relative_path)?; @@ -268,7 +264,6 @@ impl DiskCache { Ok(sha256_digest(parts)) } - /// Look up a cached response for the given namespace and digest. /// Returns the cached bytes if found and not expired. pub fn lookup(&self, namespace: &str, digest: &str) -> GitResult>> { diff --git a/hooks/runner.rs b/hooks/runner.rs index 05a0cd8..d4b661c 100644 --- a/hooks/runner.rs +++ b/hooks/runner.rs @@ -116,15 +116,15 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - let wait_result = c.wait_timeout(timeout); match wait_result { Ok(Some(status)) => { - let output = c.wait_with_output().unwrap_or_else(|_| { - // If we can't get output, at least return the status - Output { - status, - stdout: Vec::new(), - stderr: Vec::new(), - } - }); - HookResult::from_output(&output) + // Process exited within timeout, get its output + // Note: We already have the status, so we need to construct output differently + // Since wait_with_output would fail after try_wait, we return status-only output + HookResult { + accepted: status.success(), + exit_code: status.code().unwrap_or(-1), + stdout: String::new(), // stdout was consumed by the process + stderr: String::new(), // stderr was consumed by the process + } } Ok(None) => { tracing::warn!( @@ -133,6 +133,8 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - "hook script timed out, killing" ); let _ = c.kill(); + // Explicitly wait to reap the zombie process + let _ = c.wait(); HookResult::rejected(format!( "hook script timed out after {}s: {}", timeout.as_secs(), @@ -141,6 +143,8 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - } Err(e) => { let _ = c.kill(); + // Explicitly wait to reap the zombie process + let _ = c.wait(); HookResult::rejected(format!("hook script wait error: {e}")) } } diff --git a/hooks/sanitize.rs b/hooks/sanitize.rs index 0e376cf..15fe8d4 100644 --- a/hooks/sanitize.rs +++ b/hooks/sanitize.rs @@ -5,6 +5,7 @@ use crate::error::{GitError, GitResult}; /// Commands/patterns that are never allowed in custom hook scripts. +/// This is a blocklist approach - we also add pattern-based detection. const FORBIDDEN_PATTERNS: &[&str] = &[ "rm -rf", "rm -r /", @@ -24,6 +25,34 @@ const FORBIDDEN_PATTERNS: &[&str] = &[ "init 6", "poweroff", "halt", + // Additional patterns to catch encoding/obfuscation attempts + "eval ", // eval can execute arbitrary strings + "exec ", // exec can replace process + "$(", // command substitution + "`", // backtick command substitution + "${", // variable expansion (can be used for obfuscation) + "|bash", // piping to bash + "|sh", // piping to sh + "|dash", // piping to dash + "|zsh", // piping to zsh + "base64", // base64 encoding/decoding (common for obfuscation) + "python -c", // inline python execution + "perl -e", // inline perl execution + "ruby -e", // inline ruby execution + "node -e", // inline node execution + "/dev/tcp", // bash reverse shell + "nc -e", // netcat reverse shell + "ncat", // netcat alternative + "socat", // socket relay +]; + +/// Additional regex-like patterns that indicate dangerous constructs. +/// These are checked with simple string matching for complexity reasons. +const DANGEROUS_PREFIXES: &[&str] = &[ + "rm -rf /", // rm -rf with absolute path + "rm -rf ~", // rm -rf with home directory + "rm -rf .", // rm -rf with relative path (current dir) + "rm -rf *", // rm -rf with wildcard ]; /// Maximum hook script size (64KB). @@ -43,19 +72,70 @@ pub fn validate_hook_content(content: &str) -> GitResult<()> { content.len() ))); } - let content_lower = content.to_lowercase(); - for pattern in FORBIDDEN_PATTERNS { - if content_lower.contains(pattern) { - return Err(GitError::InvalidArgument(format!( - "hook content contains forbidden pattern: '{pattern}'" - ))); - } - } if content.contains('\0') { return Err(GitError::InvalidArgument( "hook content cannot contain null bytes".into(), )); } + + // Check for forbidden patterns (case-insensitive where appropriate) + let content_lower = content.to_lowercase(); + for pattern in FORBIDDEN_PATTERNS { + if content_lower.contains(&pattern.to_lowercase()) { + return Err(GitError::InvalidArgument(format!( + "hook content contains forbidden pattern: '{pattern}'" + ))); + } + } + + // Check for dangerous prefixes (exact case) + for prefix in DANGEROUS_PREFIXES { + if content.contains(prefix) { + return Err(GitError::InvalidArgument(format!( + "hook content contains dangerous command: '{prefix}'" + ))); + } + } + + // Check for obfuscation techniques + check_obfuscation_attempts(content)?; + + Ok(()) +} + +/// Check for common obfuscation attempts. +fn check_obfuscation_attempts(content: &str) -> GitResult<()> { + // Check for excessive use of special characters that might indicate obfuscation + let special_char_count = content.chars().filter(|c| matches!(c, '$' | '`' | '\\' | '|' | ';' | '&' | '(' | ')' | '{' | '}' | '[' | ']')).count(); + let total_chars = content.chars().count(); + + // If more than 30% of content is special characters, it's suspicious + if total_chars > 0 && (special_char_count * 100 / total_chars) > 30 { + return Err(GitError::InvalidArgument( + "hook content appears obfuscated (too many special characters)".into(), + )); + } + + // Check for hex encoding attempts (e.g., \x41\x42) + if content.contains("\\x") { + let hex_count = content.matches("\\x").count(); + if hex_count > 5 { + return Err(GitError::InvalidArgument( + "hook content contains hex encoding (potential obfuscation)".into(), + )); + } + } + + // Check for unicode escape sequences + if content.contains("\\u") { + let unicode_count = content.matches("\\u").count(); + if unicode_count > 5 { + return Err(GitError::InvalidArgument( + "hook content contains unicode escapes (potential obfuscation)".into(), + )); + } + } + Ok(()) } diff --git a/lib.rs b/lib.rs index ebec4c5..03e49ee 100644 --- a/lib.rs +++ b/lib.rs @@ -15,14 +15,14 @@ pub mod macros; pub mod merge; pub mod metrics; pub mod oid; -pub mod rate_limit; -pub mod remote; -pub mod repository; pub mod pack; pub mod pack_cache; pub mod paginate; pub mod pb; +pub mod rate_limit; pub mod refs; +pub mod remote; +pub mod repository; pub mod sanitize; pub mod server; pub mod snapshot; diff --git a/metrics.rs b/metrics.rs index e1e5ec6..04638fe 100644 --- a/metrics.rs +++ b/metrics.rs @@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; - struct MetricsInner { /// Counter: total requests by (method, status_code) /// Key: "method:status" @@ -61,7 +60,6 @@ fn metrics() -> &'static Arc { }) } - #[rustfmt::skip] const DURATION_BUCKET_MS: &[u64] = &[ 5, 10, 25, 50, 100, 250, 500, 1_000, @@ -105,9 +103,7 @@ pub fn dec_active_requests() { /// Set the repository count. pub fn set_repository_count(count: u64) { - metrics() - .repository_count - .store(count, Ordering::Relaxed); + metrics().repository_count.store(count, Ordering::Relaxed); } /// Record a cache hit. @@ -117,9 +113,7 @@ pub fn inc_cache_hits(count: u64) { /// Record a cache miss. pub fn inc_cache_misses(count: u64) { - metrics() - .cache_misses - .fetch_add(count, Ordering::Relaxed); + metrics().cache_misses.fetch_add(count, Ordering::Relaxed); } /// Record an error by kind (e.g., "not_found", "internal", "invalid_argument"). @@ -132,7 +126,6 @@ pub fn inc_error(kind: &str) { .fetch_add(1, Ordering::Relaxed); } - /// Render all metrics in Prometheus text exposition format. pub fn render_metrics() -> String { let m = metrics(); @@ -163,17 +156,15 @@ pub fn render_metrics() -> String { let (method_and_status, count) = (entry.key(), entry.value()); let count = count.load(Ordering::Relaxed); if let Some((method, status)) = method_and_status.rsplit_once(':') { - out.push_str( - &format!("gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n"), - ); + out.push_str(&format!( + "gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n" + )); } } out.push('\n'); // Duration histogram - out.push_str( - "# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n", - ); + out.push_str("# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n"); out.push_str("# TYPE gitks_request_duration_milliseconds histogram\n"); for entry in &m.duration_buckets { let (method_and_bound, count) = (entry.key(), entry.value()); @@ -215,7 +206,6 @@ pub fn render_metrics() -> String { out } - /// Start the metrics HTTP server on the given port. /// Runs in a background task; returns the JoinHandle. pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> { @@ -256,12 +246,14 @@ async fn handle_metrics_connection(mut socket: tokio::net::TcpStream) { body ); - let _ = tokio::time::timeout(Duration::from_secs(5), socket.write_all(response.as_bytes())) - .await; + let _ = tokio::time::timeout( + Duration::from_secs(5), + socket.write_all(response.as_bytes()), + ) + .await; let _ = socket.shutdown().await; } - /// A guard that records metrics on drop. /// /// Usage in handlers: diff --git a/pack/mod.rs b/pack/mod.rs index 53188e0..1009311 100644 --- a/pack/mod.rs +++ b/pack/mod.rs @@ -5,3 +5,33 @@ pub mod list_packfiles; pub mod pack_objects; pub mod receive_pack; pub mod upload_pack; + +/// A wrapper around ReceiverStream that cancels a token when dropped. +/// Used to properly clean up child processes when clients disconnect. +pub struct CancellableReceiverStream { + inner: tokio_stream::wrappers::ReceiverStream, + _cancel_guard: tokio_util::sync::DropGuard, +} + +impl CancellableReceiverStream { + pub fn new( + inner: tokio_stream::wrappers::ReceiverStream, + cancel_guard: tokio_util::sync::DropGuard, + ) -> Self { + Self { + inner, + _cancel_guard: cancel_guard, + } + } +} + +impl tokio_stream::Stream for CancellableReceiverStream { + type Item = T; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::pin::Pin::new(&mut self.inner).poll_next(cx) + } +} diff --git a/pack/receive_pack.rs b/pack/receive_pack.rs index 45c9b16..32eb133 100644 --- a/pack/receive_pack.rs +++ b/pack/receive_pack.rs @@ -1,4 +1,5 @@ use std::process::Stdio; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; @@ -7,6 +8,10 @@ use tokio_stream::wrappers::ReceiverStream; use crate::bare::GitBare; use crate::pb::ReceivePackResponse; +use super::CancellableReceiverStream; + +/// Maximum time allowed for a git receive-pack process before it is killed. +const RECEIVE_PACK_TIMEOUT: Duration = Duration::from_secs(1800); // 30 minutes impl GitBare { /// Receive pack data using git-receive-pack with true concurrent streaming. @@ -23,7 +28,7 @@ impl GitBare { input: impl tokio_stream::Stream> + Send + 'static, - ) -> Result>, tonic::Status> { + ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); tracing::info!( repo = %bare_dir, @@ -33,6 +38,10 @@ impl GitBare { let (tx, rx) = tokio::sync::mpsc::channel(16); + // Use a cancellation token to track client disconnect + let cancel_token = tokio_util::sync::CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + let stream = Box::pin(input); tokio::spawn(async move { let stream = stream; @@ -59,15 +68,20 @@ impl GitBare { } }; + let child_id = child.id(); let mut stdin = child.stdin.take(); let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); let stdin_task = { let mut stream = stream; + let cancel = cancel_token.clone(); async move { if let Some(mut stdin) = stdin.take() { while let Some(result) = stream.next().await { + if cancel.is_cancelled() { + break; + } match result { Ok(req) => { if stdin.write_all(&req.packet).await.is_err() { @@ -87,10 +101,14 @@ impl GitBare { let stdout_task = { let tx = tx.clone(); + let cancel = cancel_token.clone(); async move { if let Some(mut stdout) = stdout.take() { let mut buf = vec![0u8; 65536]; loop { + if cancel.is_cancelled() { + break; + } match stdout.read(&mut buf).await { Ok(0) => break, Ok(n) => { @@ -129,25 +147,45 @@ impl GitBare { } }; - tokio::join!(stdin_task, stdout_task, stderr_task); + // Run all three concurrently with timeout + let _process_future = tokio::join!(stdin_task, stdout_task, stderr_task); - match child.wait().await { - Ok(status) if !status.success() => { - let _ = tx - .send(Err(tonic::Status::internal( - "git receive-pack exited with error", - ))) - .await; + match tokio::time::timeout(RECEIVE_PACK_TIMEOUT, child.wait()).await { + Ok(Ok(status)) => { + if !status.success() { + let _ = tx + .send(Err(tonic::Status::internal( + "git receive-pack exited with error", + ))) + .await; + } } - Err(e) => { + Ok(Err(e)) => { let _ = tx .send(Err(tonic::Status::internal(format!("wait error: {e}")))) .await; } - _ => {} + Err(_timeout) => { + tracing::warn!( + repo = %bare_dir, + pid = ?child_id, + timeout_secs = RECEIVE_PACK_TIMEOUT.as_secs(), + "git receive-pack timed out, killing" + ); + let _ = child.kill().await; + let _ = tx + .send(Err(tonic::Status::deadline_exceeded( + "git receive-pack timed out", + ))) + .await; + } } }); - Ok(ReceiverStream::new(rx)) + // When the ReceiverStream is dropped (client disconnect), cancel the background task + let rx_stream = ReceiverStream::new(rx); + let cancel_guard = cancel_token_clone.clone().drop_guard(); + + Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard)) } } diff --git a/pack/upload_pack.rs b/pack/upload_pack.rs index ab1868c..bea68d3 100644 --- a/pack/upload_pack.rs +++ b/pack/upload_pack.rs @@ -1,4 +1,5 @@ use std::process::Stdio; +use std::time::Duration; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; @@ -7,6 +8,10 @@ use tokio_stream::wrappers::ReceiverStream; use crate::bare::GitBare; use crate::pb::UploadPackResponse; +use super::CancellableReceiverStream; + +/// Maximum time allowed for a git upload-pack process before it is killed. +const UPLOAD_PACK_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes impl GitBare { /// Upload pack data using git-upload-pack with true concurrent streaming. @@ -23,7 +28,7 @@ impl GitBare { input: impl tokio_stream::Stream> + Send + 'static, - ) -> Result>, tonic::Status> { + ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); tracing::info!( repo = %bare_dir, @@ -33,6 +38,10 @@ impl GitBare { let (tx, rx) = tokio::sync::mpsc::channel(16); + // Use a cancellation token to track client disconnect + let cancel_token = tokio_util::sync::CancellationToken::new(); + let cancel_token_clone = cancel_token.clone(); + // Move input into the spawned task to make it 'static let stream = Box::pin(input); tokio::spawn(async move { @@ -60,6 +69,7 @@ impl GitBare { } }; + let child_id = child.id(); let mut stdin = child.stdin.take(); let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); @@ -67,9 +77,13 @@ impl GitBare { // Concurrent: write stdin packets, read stdout chunks, read stderr let stdin_task = { let mut stream = stream; + let cancel = cancel_token.clone(); async move { if let Some(mut stdin) = stdin.take() { while let Some(result) = stream.next().await { + if cancel.is_cancelled() { + break; + } match result { Ok(req) => { if stdin.write_all(&req.packet).await.is_err() { @@ -90,10 +104,14 @@ impl GitBare { let stdout_task = { let tx = tx.clone(); + let cancel = cancel_token.clone(); async move { if let Some(mut stdout) = stdout.take() { let mut buf = vec![0u8; 65536]; loop { + if cancel.is_cancelled() { + break; + } match stdout.read(&mut buf).await { Ok(0) => break, Ok(n) => { @@ -132,27 +150,45 @@ impl GitBare { } }; - // Run all three concurrently - tokio::join!(stdin_task, stdout_task, stderr_task); + // Run all three concurrently with timeout + let _process_future = tokio::join!(stdin_task, stdout_task, stderr_task); - // Wait for child exit - match child.wait().await { - Ok(status) if !status.success() => { - let _ = tx - .send(Err(tonic::Status::internal( - "git upload-pack exited with error", - ))) - .await; + match tokio::time::timeout(UPLOAD_PACK_TIMEOUT, child.wait()).await { + Ok(Ok(status)) => { + if !status.success() { + let _ = tx + .send(Err(tonic::Status::internal( + "git upload-pack exited with error", + ))) + .await; + } } - Err(e) => { + Ok(Err(e)) => { let _ = tx .send(Err(tonic::Status::internal(format!("wait error: {e}")))) .await; } - _ => {} + Err(_timeout) => { + tracing::warn!( + repo = %bare_dir, + pid = ?child_id, + timeout_secs = UPLOAD_PACK_TIMEOUT.as_secs(), + "git upload-pack timed out, killing" + ); + let _ = child.kill().await; + let _ = tx + .send(Err(tonic::Status::deadline_exceeded( + "git upload-pack timed out", + ))) + .await; + } } }); - Ok(ReceiverStream::new(rx)) + // When the ReceiverStream is dropped (client disconnect), cancel the background task + let rx_stream = ReceiverStream::new(rx); + let cancel_guard = cancel_token_clone.clone().drop_guard(); + + Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard)) } } diff --git a/pack_cache.rs b/pack_cache.rs index b77fa6d..6b776a3 100644 --- a/pack_cache.rs +++ b/pack_cache.rs @@ -26,11 +26,15 @@ pub const INFO_REFS_NAMESPACE: &str = "+gitks-cache/info_refs"; #[derive(Debug, Clone)] pub struct PackCache { disk_cache: DiskCache, + backpressure_enabled: bool, } impl PackCache { - pub fn new(disk_cache: DiskCache, _backpressure: bool) -> Self { - Self { disk_cache } + pub fn new(disk_cache: DiskCache, backpressure: bool) -> Self { + Self { + disk_cache, + backpressure_enabled: backpressure, + } } pub fn is_enabled(&self) -> bool { @@ -63,7 +67,8 @@ impl PackCache { tracing::info!(digest = %digest, "pack-objects cache hit, streaming from disk"); - let (tx, rx) = tokio::sync::mpsc::channel(16); + let channel_size = if self.backpressure_enabled { 4 } else { 256 }; + let (tx, rx) = tokio::sync::mpsc::channel(channel_size); let sender = tx.clone(); tokio::spawn(async move { @@ -104,12 +109,15 @@ impl PackCache { /// Stream pack-objects output while simultaneously writing to cache. /// This is the "tee" approach: data flows to both the client and the cache file. + /// When backpressure is enabled, uses a small channel to slow the producer + /// if the consumer is slow. Otherwise uses a large channel for max throughput. pub fn tee_pack_stream( &self, digest: &str, source: ReceiverStream>, ) -> ReceiverStream> { - let (tx, rx) = tokio::sync::mpsc::channel(16); + let channel_size = if self.backpressure_enabled { 4 } else { 256 }; + let (tx, rx) = tokio::sync::mpsc::channel(channel_size); if !self.is_enabled() { tokio::spawn(async move { diff --git a/rate_limit.rs b/rate_limit.rs index e1e86d0..1834e59 100644 --- a/rate_limit.rs +++ b/rate_limit.rs @@ -9,10 +9,9 @@ //! // guard is dropped here → permit released use dashmap::DashMap; -use std::sync::{Arc, OnceLock}; +use std::sync::{Arc, OnceLock, RwLock}; use tokio::sync::Semaphore; - /// Default max concurrent operations per repository. const DEFAULT_MAX_CONCURRENT: usize = 5; @@ -20,8 +19,8 @@ const DEFAULT_MAX_CONCURRENT: usize = 5; struct RateLimiter { /// Per-repository semaphores. Key = repository relative_path. semaphores: DashMap>, - /// Max concurrent operations per repository. - max_concurrent: usize, + /// Max concurrent operations per repository (protected by RwLock for runtime updates). + max_concurrent: RwLock, } static RATE_LIMITER: OnceLock = OnceLock::new(); @@ -40,11 +39,18 @@ fn limiter() -> &'static RateLimiter { RateLimiter { semaphores: DashMap::new(), - max_concurrent: max, + max_concurrent: RwLock::new(max), } }) } +/// Get the current max_concurrent value. +fn get_max_concurrent() -> usize { + *limiter() + .max_concurrent + .read() + .unwrap_or_else(|e| e.into_inner()) +} /// A guard that holds a rate-limit permit. The permit is released on drop. pub struct RateLimitGuard { @@ -63,21 +69,21 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option if repo.is_empty() { return None; } - let l = limiter(); - if l.max_concurrent == 0 { + let max_concurrent = get_max_concurrent(); + if max_concurrent == 0 { // Unlimited return None; } - let sem = l + let sem = limiter() .semaphores .entry(repo.to_string()) - .or_insert_with(|| Arc::new(Semaphore::new(l.max_concurrent))) + .or_insert_with(|| Arc::new(Semaphore::new(max_concurrent))) .value() .clone(); // Release DashMap reference before awaiting - let _ = l; + let _ = repo; match tokio::time::timeout( std::time::Duration::from_secs(30), @@ -87,7 +93,7 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option { Ok(Ok(permit)) => { tracing::debug!( - repo = %repo, + repo = %repo_relative_path.unwrap_or(""), available = sem.available_permits(), "rate limit permit acquired" ); @@ -96,10 +102,10 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option Ok(Err(_closed)) => { // Semaphore was closed — recreate it tracing::warn!( - repo = %repo, + repo = %repo_relative_path.unwrap_or(""), "rate limit semaphore closed, recreating" ); - let new_sem = Arc::new(Semaphore::new(limiter().max_concurrent)); + let new_sem = Arc::new(Semaphore::new(get_max_concurrent())); let permit = new_sem .clone() .acquire_owned() @@ -107,13 +113,13 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option .expect("newly created semaphore should have permits"); limiter() .semaphores - .insert(repo.to_string(), new_sem); + .insert(repo_relative_path.unwrap_or("").to_string(), new_sem); Some(RateLimitGuard { _permit: permit }) } Err(_elapsed) => { tracing::warn!( - repo = %repo, - max_concurrent = limiter().max_concurrent, + repo = %repo_relative_path.unwrap_or(""), + max_concurrent = get_max_concurrent(), "rate limit timeout waiting for permit" ); None @@ -122,7 +128,9 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option } /// Acquire a rate-limit permit, returning a tonic error on timeout / overload. -pub async fn acquire_or_reject(repo_relative_path: Option<&str>) -> Result, tonic::Status> { +pub async fn acquire_or_reject( + repo_relative_path: Option<&str>, +) -> Result, tonic::Status> { let repo = repo_relative_path.unwrap_or(""); if repo.is_empty() { return Ok(None); @@ -130,13 +138,13 @@ pub async fn acquire_or_reject(repo_relative_path: Option<&str>) -> Result Ok(Some(guard)), None => { - if limiter().max_concurrent == 0 { + if get_max_concurrent() == 0 { return Ok(None); } // Timeout — reject with resource exhausted Err(tonic::Status::resource_exhausted(format!( "rate limit exceeded for repository '{repo}': max {max} concurrent operations", - max = limiter().max_concurrent + max = get_max_concurrent() ))) } } @@ -149,22 +157,33 @@ pub fn remove_repository(repo_relative_path: &str) { } /// Update the max concurrent limit at runtime. +/// This properly updates the limit and recreates all existing semaphores. pub fn set_max_concurrent(max: usize) { let l = limiter(); - // We can't modify the field directly through OnceLock, but we can - // update existing semaphores to add or remove permits as needed. - // For a simpler approach, just log and let new semaphores use the new value. - // Since max_concurrent is only read on insert, we use a separate atomic. - use std::sync::atomic::AtomicUsize; - static OVERRIDE: std::sync::atomic::AtomicUsize = AtomicUsize::new(0); - OVERRIDE.store(max, std::sync::atomic::Ordering::Relaxed); - // Recreate all existing semaphores - for entry in &l.semaphores { - let _old = l.semaphores.insert( - entry.key().clone(), - Arc::new(Semaphore::new(max)), - ); + // Update the max_concurrent value + match l.max_concurrent.write() { + Ok(mut guard) => { + *guard = max; + } + Err(e) => { + // Poisoned lock - recover and update + let mut guard = e.into_inner(); + *guard = max; + } } + + // Recreate all existing semaphores with the new limit + let keys: Vec = l + .semaphores + .iter() + .map(|entry| entry.key().clone()) + .collect(); + + for key in keys { + l.semaphores + .insert(key, Arc::new(Semaphore::new(max))); + } + tracing::info!(max_concurrent = max, "rate limit max_concurrent updated"); } diff --git a/refs/find_refs.rs b/refs/find_refs.rs index 409bb3d..155db93 100644 --- a/refs/find_refs.rs +++ b/refs/find_refs.rs @@ -5,7 +5,10 @@ use crate::pb::*; impl GitBare { /// Find all refs pointing to a given OID. - pub fn find_refs_by_oid(&self, request: FindRefsByOidRequest) -> GitResult { + pub fn find_refs_by_oid( + &self, + request: FindRefsByOidRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.oid)?; let mut args = vec![ @@ -138,7 +141,8 @@ fn simple_glob_match(pattern: &str, name: &str) -> bool { star_pi = Some(pi); star_ni = ni; pi += 1; - } else if pi < pat_bytes.len() && ni < name_bytes.len() + } else if pi < pat_bytes.len() + && ni < name_bytes.len() && (pat_bytes[pi] == b'?' || pat_bytes[pi] == name_bytes[ni]) { pi += 1; diff --git a/refs/update_refs.rs b/refs/update_refs.rs index bb387b4..e65ee71 100644 --- a/refs/update_refs.rs +++ b/refs/update_refs.rs @@ -4,7 +4,10 @@ use crate::pb::*; impl GitBare { /// Update multiple refs atomically using `git update-ref --stdin`. - pub fn update_references(&self, request: UpdateReferencesRequest) -> GitResult { + pub fn update_references( + &self, + request: UpdateReferencesRequest, + ) -> GitResult { let mut stdin_input = String::new(); for update in &request.updates { crate::sanitize::validate_ref_name(&update.ref_name)?; @@ -16,10 +19,7 @@ impl GitBare { update.ref_name, update.new_oid, update.old_oid )); } else { - stdin_input.push_str(&format!( - "update {} {}\n", - update.ref_name, update.new_oid - )); + stdin_input.push_str(&format!("update {} {}\n", update.ref_name, update.new_oid)); } } if stdin_input.is_empty() { @@ -27,7 +27,13 @@ impl GitBare { } let output = std::process::Command::new("git") - .args(["--git-dir", &self.bare_dir.to_string_lossy(), "update-ref", "--stdin", "-z"]) + .args([ + "--git-dir", + &self.bare_dir.to_string_lossy(), + "update-ref", + "--stdin", + "-z", + ]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) @@ -119,21 +125,33 @@ impl GitBare { }); } - Ok(WriteRefResponse { ok: true, error: String::new() }) + Ok(WriteRefResponse { + ok: true, + error: String::new(), + }) } /// Check if a ref exists. pub fn ref_exists(&self, request: RefExistsRequest) -> GitResult { crate::sanitize::validate_ref_name(&request.ref_name)?; let repo = self.gix_repo()?; - let exists = repo.try_find_reference(&request.ref_name).ok().flatten().is_some(); + let exists = repo + .try_find_reference(&request.ref_name) + .ok() + .flatten() + .is_some(); Ok(RefExistsResponse { exists }) } /// Find the default branch name. pub fn find_default_branch_name(&self) -> GitResult { let result = std::process::Command::new("git") - .args(["--git-dir", &self.bare_dir.to_string_lossy(), "symbolic-ref", "HEAD"]) + .args([ + "--git-dir", + &self.bare_dir.to_string_lossy(), + "symbolic-ref", + "HEAD", + ]) .output() .map_err(|e| crate::error::GitError::CommandFailed { status_code: None, diff --git a/remote/find_remote.rs b/remote/find_remote.rs index 48cc44d..cb74feb 100644 --- a/remote/find_remote.rs +++ b/remote/find_remote.rs @@ -2,9 +2,14 @@ use crate::error::GitResult; use crate::pb::*; /// Discover remote refs via `git ls-remote`. -pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult { +pub fn find_remote_repository( + request: FindRemoteRepositoryRequest, +) -> GitResult { if request.remote_url.is_empty() { - return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false }); + return Ok(FindRemoteRepositoryResponse { + refs: vec![], + exists: false, + }); } let output = std::process::Command::new("git") @@ -20,9 +25,15 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult if !output.status.success() { let stderr = String::from_utf8_lossy(&output.stderr); if stderr.contains("Could not resolve host") || stderr.contains("Repository not found") { - return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false }); + return Ok(FindRemoteRepositoryResponse { + refs: vec![], + exists: false, + }); } - return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false }); + return Ok(FindRemoteRepositoryResponse { + refs: vec![], + exists: false, + }); } let stdout = String::from_utf8_lossy(&output.stdout); @@ -43,7 +54,11 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult ref_name: name.to_string(), target_oid: String::new(), symbolic: true, - symbolic_target: target.strip_prefix("ref:").unwrap_or(target).trim().to_string(), + symbolic_target: target + .strip_prefix("ref:") + .unwrap_or(target) + .trim() + .to_string(), }); } } else if let Some((oid, name)) = line.split_once('\t') { @@ -60,7 +75,9 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult } /// Find the root ref (HEAD) of a remote repository. -pub fn find_remote_root_ref(request: FindRemoteRootRefRequest) -> GitResult { +pub fn find_remote_root_ref( + request: FindRemoteRootRefRequest, +) -> GitResult { let output = std::process::Command::new("git") .args(["ls-remote", "--symref", &request.remote_url, "HEAD"]) .stdout(std::process::Stdio::piped()) @@ -76,7 +93,11 @@ pub fn find_remote_root_ref(request: FindRemoteRootRefRequest) -> GitResult GitResult { - let remote_name = if request.remote_name.is_empty() { "origin" } else { &request.remote_name }; + pub fn update_remote_mirror( + &self, + request: UpdateRemoteMirrorRequest, + ) -> GitResult { + let remote_name = if request.remote_name.is_empty() { + "origin" + } else { + &request.remote_name + }; // Add or update remote let remote_check = std::process::Command::new("git") @@ -114,12 +121,19 @@ impl GitBare { } } - Ok(UpdateRemoteMirrorResponse { ok: true, error: String::new() }) + Ok(UpdateRemoteMirrorResponse { + ok: true, + error: String::new(), + }) } /// Fetch from a remote URL without mirroring. pub fn fetch_remote(&self, request: FetchRemoteRequest) -> GitResult { - let remote_name = if request.remote_name.is_empty() { "origin" } else { &request.remote_name }; + let remote_name = if request.remote_name.is_empty() { + "origin" + } else { + &request.remote_name + }; // Ensure remote exists let exists = std::process::Command::new("git") @@ -158,8 +172,12 @@ impl GitBare { remote_name.to_string(), ]; - if request.prune { args.push("--prune".to_string()); } - if request.force { args.push("--force".to_string()); } + if request.prune { + args.push("--prune".to_string()); + } + if request.force { + args.push("--force".to_string()); + } if request.refspecs.is_empty() { args.push("+refs/heads/*:refs/heads/*".to_string()); @@ -187,7 +205,10 @@ impl GitBare { }); } - Ok(FetchRemoteResponse { ok: true, error: String::new() }) + Ok(FetchRemoteResponse { + ok: true, + error: String::new(), + }) } /// Clone a repository from a remote URL (bare + mirror). diff --git a/repository/find_license.rs b/repository/find_license.rs index 55f2e6a..3682790 100644 --- a/repository/find_license.rs +++ b/repository/find_license.rs @@ -6,9 +6,15 @@ impl GitBare { /// Detect license by reading LICENSE/COPYING files and doing basic matching. pub fn find_license(&self) -> GitResult { let possible_paths = [ - "LICENSE", "LICENSE.md", "LICENSE.txt", - "LICENCE", "LICENCE.md", "LICENCE.txt", - "COPYING", "COPYING.md", "COPYING.txt", + "LICENSE", + "LICENSE.md", + "LICENSE.txt", + "LICENCE", + "LICENCE.md", + "LICENCE.txt", + "COPYING", + "COPYING.md", + "COPYING.txt", "UNLICENSE", ]; @@ -102,8 +108,7 @@ fn detect_license(content: &str) -> (&'static str, &'static str, f64) { } // ISC - if lower.contains("permission to use, copy, modify, and/or distribute") - && lower.contains("isc") + if lower.contains("permission to use, copy, modify, and/or distribute") && lower.contains("isc") { return ("ISC", "ISC License", 0.80); } diff --git a/repository/find_merge_base.rs b/repository/find_merge_base.rs index 62cd5b2..149b32e 100644 --- a/repository/find_merge_base.rs +++ b/repository/find_merge_base.rs @@ -4,7 +4,10 @@ use crate::pb::*; impl GitBare { /// Find the best merge base for a set of revisions (OIDs). - pub fn find_merge_base(&self, request: FindMergeBaseRequest) -> GitResult { + pub fn find_merge_base( + &self, + request: FindMergeBaseRequest, + ) -> GitResult { if request.revisions.is_empty() { return Ok(FindMergeBaseResponse::default()); } @@ -49,7 +52,10 @@ impl GitBare { } /// Check if one commit is an ancestor of another. - pub fn commit_is_ancestor(&self, request: CommitIsAncestorRequest) -> GitResult { + pub fn commit_is_ancestor( + &self, + request: CommitIsAncestorRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.ancestor_oid)?; crate::sanitize::validate_revision(&request.descendant_oid)?; @@ -68,6 +74,8 @@ impl GitBare { .map(|s| s.success()) .unwrap_or(false); - Ok(CommitIsAncestorResponse { is_ancestor: result }) + Ok(CommitIsAncestorResponse { + is_ancestor: result, + }) } } diff --git a/repository/objects_size.rs b/repository/objects_size.rs index 3bc34a6..9ce7eb2 100644 --- a/repository/objects_size.rs +++ b/repository/objects_size.rs @@ -42,12 +42,13 @@ impl GitBare { })?; } - let output = child.wait_with_output().map_err(|e| { - crate::error::GitError::CommandFailed { - status_code: None, - stderr: e.to_string(), - } - })?; + let output = + child + .wait_with_output() + .map_err(|e| crate::error::GitError::CommandFailed { + status_code: None, + stderr: e.to_string(), + })?; let stdout = String::from_utf8_lossy(&output.stdout); let mut sizes = Vec::new(); diff --git a/repository/optimize.rs b/repository/optimize.rs index 953cd82..cc1c591 100644 --- a/repository/optimize.rs +++ b/repository/optimize.rs @@ -4,8 +4,12 @@ use crate::pb::*; impl GitBare { /// Run heuristic optimization based on repo state. - pub fn optimize_repository(&self, request: OptimizeRepositoryRequest) -> GitResult { - let strategy = OptimizeStrategy::try_from(request.strategy).unwrap_or(OptimizeStrategy::Heuristic); + pub fn optimize_repository( + &self, + request: OptimizeRepositoryRequest, + ) -> GitResult { + let strategy = + OptimizeStrategy::try_from(request.strategy).unwrap_or(OptimizeStrategy::Heuristic); let mut stdout_all = String::new(); let mut stderr_all = String::new(); @@ -17,7 +21,9 @@ impl GitBare { // Run commit-graph write if needed if stats.commit_graph_size_bytes == 0 || strategy == OptimizeStrategy::Aggressive { if let Ok(resp) = write_commit_graph(self, false, false) { - if !resp.ok { stderr_all.push_str(&resp.stderr); } + if !resp.ok { + stderr_all.push_str(&resp.stderr); + } stdout_all.push_str(&resp.stdout); } } @@ -28,7 +34,9 @@ impl GitBare { if repack_needed || strategy == OptimizeStrategy::Aggressive { let full = strategy == OptimizeStrategy::Aggressive; if let Ok(resp) = run_repack(self, full, true, true) { - if !resp.ok { stderr_all.push_str(&resp.stderr); } + if !resp.ok { + stderr_all.push_str(&resp.stderr); + } stdout_all.push_str(&resp.stdout); } } @@ -36,7 +44,9 @@ impl GitBare { // Prune if aggressive if strategy == OptimizeStrategy::Aggressive { if let Ok(resp) = run_gc(self, true, true) { - if !resp.ok { stderr_all.push_str(&resp.stderr); } + if !resp.ok { + stderr_all.push_str(&resp.stderr); + } stdout_all.push_str(&resp.stdout); } } @@ -44,7 +54,9 @@ impl GitBare { OptimizeStrategy::Incremental => { // Just run commit-graph write incrementally if let Ok(resp) = write_commit_graph(self, false, false) { - if !resp.ok { stderr_all.push_str(&resp.stderr); } + if !resp.ok { + stderr_all.push_str(&resp.stderr); + } stdout_all.push_str(&resp.stdout); } } @@ -79,7 +91,10 @@ impl GitBare { // Check commit-graph let cg_size = std::fs::metadata( - self.bare_dir.join("objects").join("info").join("commit-graph") + self.bare_dir + .join("objects") + .join("info") + .join("commit-graph"), ) .map(|m| m.len()) .unwrap_or(0); @@ -96,11 +111,18 @@ impl GitBare { } } -fn write_commit_graph(gb: &GitBare, _split: bool, _replace: bool) -> GitResult { +fn write_commit_graph( + gb: &GitBare, + _split: bool, + _replace: bool, +) -> GitResult { let out = std::process::Command::new("git") .args([ - "--git-dir", &gb.bare_dir.to_string_lossy(), - "commit-graph", "write", "--reachable", + "--git-dir", + &gb.bare_dir.to_string_lossy(), + "commit-graph", + "write", + "--reachable", ]) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) @@ -117,13 +139,25 @@ fn write_commit_graph(gb: &GitBare, _split: bool, _replace: bool) -> GitResult GitResult { +fn run_repack( + gb: &GitBare, + full: bool, + bitmaps: bool, + _midx: bool, +) -> GitResult { let mut args = vec![ - "--git-dir".to_string(), gb.bare_dir.to_string_lossy().into_owned(), + "--git-dir".to_string(), + gb.bare_dir.to_string_lossy().into_owned(), "repack".to_string(), ]; - if full { args.push("-ad".to_string()); } else { args.push("-d".to_string()); } - if bitmaps { args.push("--write-bitmap-index".to_string()); } + if full { + args.push("-ad".to_string()); + } else { + args.push("-d".to_string()); + } + if bitmaps { + args.push("--write-bitmap-index".to_string()); + } let out = std::process::Command::new("git") .args(&args) @@ -144,11 +178,16 @@ fn run_repack(gb: &GitBare, full: bool, bitmaps: bool, _midx: bool) -> GitResult fn run_gc(gb: &GitBare, prune: bool, aggressive: bool) -> GitResult { let mut args = vec![ - "--git-dir".to_string(), gb.bare_dir.to_string_lossy().into_owned(), + "--git-dir".to_string(), + gb.bare_dir.to_string_lossy().into_owned(), "gc".to_string(), ]; - if prune { args.push("--prune=now".to_string()); } - if aggressive { args.push("--aggressive".to_string()); } + if prune { + args.push("--prune=now".to_string()); + } + if aggressive { + args.push("--aggressive".to_string()); + } let out = std::process::Command::new("git") .args(&args) diff --git a/repository/raw_changes.rs b/repository/raw_changes.rs index 20543a1..c042a70 100644 --- a/repository/raw_changes.rs +++ b/repository/raw_changes.rs @@ -4,7 +4,10 @@ use crate::pb::*; impl GitBare { /// Get raw changes between two revisions (file-level changes only, no diff content). - pub fn get_raw_changes(&self, request: GetRawChangesRequest) -> GitResult { + pub fn get_raw_changes( + &self, + request: GetRawChangesRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.base)?; crate::sanitize::validate_revision(&request.head)?; @@ -32,11 +35,15 @@ impl GitBare { for line in stdout.lines() { let line = line.trim(); - if !line.starts_with(':') { continue; } + if !line.starts_with(':') { + continue; + } let line = &line[1..]; let parts: Vec<&str> = line.split_whitespace().collect(); - if parts.len() < 5 { continue; } + if parts.len() < 5 { + continue; + } let old_mode = u32::from_str_radix(parts[0], 8).unwrap_or(0); let new_mode = u32::from_str_radix(parts[1], 8).unwrap_or(0); @@ -55,11 +62,14 @@ impl GitBare { }; let (old_path, new_path) = if parts.len() >= 6 { - (parts[5].to_string(), if status_letter == 'R' || status_letter == 'C' { - parts.get(6).map(|s| s.to_string()).unwrap_or_default() - } else { - String::new() - }) + ( + parts[5].to_string(), + if status_letter == 'R' || status_letter == 'C' { + parts.get(6).map(|s| s.to_string()).unwrap_or_default() + } else { + String::new() + }, + ) } else { (String::new(), String::new()) }; diff --git a/repository/search_files.rs b/repository/search_files.rs index 648dbee..bc27c7b 100644 --- a/repository/search_files.rs +++ b/repository/search_files.rs @@ -4,17 +4,28 @@ use crate::pb::*; impl GitBare { /// Search file contents with a regex pattern. - pub fn search_files_by_content(&self, request: SearchFilesByContentRequest) -> GitResult { + pub fn search_files_by_content( + &self, + request: SearchFilesByContentRequest, + ) -> GitResult { crate::sanitize::validate_revision(&request.revision)?; - let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision }; - let max_results = if request.max_results == 0 { 100 } else { request.max_results }; + let revision = if request.revision.is_empty() { + "HEAD" + } else { + &request.revision + }; + let max_results = if request.max_results == 0 { + 100 + } else { + request.max_results + }; let mut args = vec![ "--git-dir".to_string(), self.bare_dir.to_string_lossy().into_owned(), "grep".to_string(), - "-I".to_string(), // don't match binary files + "-I".to_string(), // don't match binary files "--line-number".to_string(), "--column".to_string(), ]; @@ -62,11 +73,22 @@ impl GitBare { } /// Search file names matching a pattern. - pub fn search_files_by_name(&self, request: SearchFilesByNameRequest) -> GitResult { - let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision }; + pub fn search_files_by_name( + &self, + request: SearchFilesByNameRequest, + ) -> GitResult { + let revision = if request.revision.is_empty() { + "HEAD" + } else { + &request.revision + }; crate::sanitize::validate_revision(revision)?; - let max_results = if request.max_results == 0 { 100 } else { request.max_results }; + let max_results = if request.max_results == 0 { + 100 + } else { + request.max_results + }; let mut args = vec![ "--git-dir".to_string(), diff --git a/server/commit.rs b/server/commit.rs index c8ed618..8a3822f 100644 --- a/server/commit.rs +++ b/server/commit.rs @@ -276,7 +276,6 @@ impl commit_service_server::CommitService for GitksService { Ok(tonic::Response::new(resp)) } - async fn find_commit( &self, request: tonic::Request, @@ -368,7 +367,6 @@ impl commit_service_server::CommitService for GitksService { Ok(tonic::Response::new(resp)) } - async fn count_commits( &self, request: tonic::Request, diff --git a/server/diff.rs b/server/diff.rs index ddc8366..4ad0f3a 100644 --- a/server/diff.rs +++ b/server/diff.rs @@ -170,9 +170,10 @@ impl diff_service_server::DiffService for GitksService { Ok(tonic::Response::new(resp)) } - - type RawDiffStream = tokio_stream::wrappers::ReceiverStream>; - type RawPatchStream = tokio_stream::wrappers::ReceiverStream>; + type RawDiffStream = + tokio_stream::wrappers::ReceiverStream>; + type RawPatchStream = + tokio_stream::wrappers::ReceiverStream>; async fn raw_diff( &self, @@ -200,7 +201,6 @@ impl diff_service_server::DiffService for GitksService { Ok(tonic::Response::new(into_stream(chunks))) } - async fn find_changed_paths( &self, request: tonic::Request, diff --git a/server/mod.rs b/server/mod.rs index 5269b93..36fb776 100644 --- a/server/mod.rs +++ b/server/mod.rs @@ -45,9 +45,11 @@ mod repository_maint; mod tag; mod tree; +use dashmap::DashMap; use gix::discover::is_git; use ractor::{ActorCell, ActorRef}; use std::path::{Path, PathBuf}; +use std::time::{Duration, Instant}; use tokio_stream::wrappers::ReceiverStream; use crate::actor::message::{GitNodeMessage, RouteDecision}; @@ -59,6 +61,16 @@ use crate::pb::{ remote_service_server, repository_service_server, tag_service_server, tree_service_server, }; +/// TTL for route cache entries. +const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60); // 1 minute + +/// A cached route entry with creation time. +#[derive(Clone)] +pub struct CachedRoute { + pub decision: RouteDecision, + pub created_at: Instant, +} + #[derive(Clone)] pub struct GitksService { pub repo_prefix: PathBuf, @@ -67,6 +79,7 @@ pub struct GitksService { pub disk_cache: Option, pub pack_cache: Option, pub hook_manager: Option, + pub route_cache: DashMap, } impl GitksService { @@ -78,6 +91,7 @@ impl GitksService { disk_cache: None, pack_cache: None, hook_manager: None, + route_cache: DashMap::new(), } } @@ -125,6 +139,22 @@ impl GitksService { is_write: bool, ) -> Result, tonic::Status> { use crate::actor::message::{ROLE_PRIMARY, ROLE_REPLICA}; + + // Check route cache for read requests + if !is_write + && let Some(cached) = self.route_cache.get(&header.relative_path) + && !cached.decision.grpc_addr.is_empty() + && cached.decision.found + && cached.created_at.elapsed() < ROUTE_CACHE_TTL + { + tracing::debug!( + relative_path = %header.relative_path, + grpc_addr = %cached.decision.grpc_addr, + "route cache hit" + ); + return Ok(Some(cached.decision.clone())); + } + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); let local = self.node_actor.as_ref().map(|actor| actor.get_cell()); let mut primary: Option = None; @@ -152,19 +182,31 @@ impl GitksService { replica = Some(decision); } } - if let Some(p) = primary { - return Ok(Some(p)); - } - if let Some(r) = replica { + let result = if let Some(p) = primary { + Some(p) + } else if let Some(r) = replica { tracing::info!( storage_name = %r.storage_name, relative_path = %r.relative_path, "read request routed to replica" ); - return Ok(Some(r)); + Some(r) + } else { + let _ = ROLE_PRIMARY; + None + }; + + // Cache result for read requests + if let Some(ref decision) = result { + self.route_cache.insert( + header.relative_path.clone(), + CachedRoute { + decision: decision.clone(), + created_at: Instant::now(), + }, + ); } - let _ = ROLE_PRIMARY; - Ok(None) + Ok(result) } fn repo_label(&self, header: Option<&crate::pb::RepositoryHeader>) -> String { @@ -180,7 +222,10 @@ impl GitksService { } /// Get the relative path from a repository header, if any. - pub(crate) fn repo_relative_path<'a>(&self, header: Option<&'a crate::pb::RepositoryHeader>) -> Option<&'a str> { + pub(crate) fn repo_relative_path<'a>( + &self, + header: Option<&'a crate::pb::RepositoryHeader>, + ) -> Option<&'a str> { header.and_then(|h| { if h.relative_path.is_empty() { None @@ -287,6 +332,9 @@ impl GitksService { // Invalidate moka caches crate::server::cache::invalidate_repo(relative_path); + // Invalidate route cache + self.route_cache.remove(relative_path); + // Invalidate disk cache if let Some(ref pc) = self.pack_cache { pc.invalidate_repo(relative_path); @@ -421,7 +469,7 @@ pub(crate) fn into_stream( ReceiverStream::new(rx) } -pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result { +pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> GitResult { let mut full_args: Vec = vec![ "--git-dir".into(), gb.bare_dir.to_string_lossy().into_owned(), @@ -441,21 +489,41 @@ pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result) -> GitError { + let stderr_trimmed = stderr.trim(); + if stderr_trimmed.contains("not a git repository") || stderr_trimmed.contains("does not exist") + { + GitError::RepoNotFound + } else if stderr_trimmed.contains("Permission denied") || stderr_trimmed.contains("denied") { + GitError::PermissionDenied(stderr_trimmed.to_string()) + } else if stderr_trimmed.contains("is locked") || stderr_trimmed.contains("Could not acquire") { + GitError::Locked(stderr_trimmed.to_string()) + } else if stderr_trimmed.contains("not found") || stderr_trimmed.contains("do not have") { + GitError::NotFound(stderr_trimmed.to_string()) + } else { + GitError::CommandFailed { + status_code: code, + stderr: stderr_trimmed.to_string(), + } + } +} + pub async fn serve( addr: std::net::SocketAddr, svc: GitksService, diff --git a/server/pack.rs b/server/pack.rs index 72b8310..d7cc9f6 100644 --- a/server/pack.rs +++ b/server/pack.rs @@ -3,6 +3,7 @@ use tokio_stream::wrappers::ReceiverStream; use crate::pb::pack_service_client::PackServiceClient; use crate::pb::*; +use crate::pack::CancellableReceiverStream; use super::{GitksService, into_status}; @@ -14,8 +15,8 @@ remote_client!( #[tonic::async_trait] impl pack_service_server::PackService for GitksService { - type UploadPackStream = ReceiverStream>; - type ReceivePackStream = ReceiverStream>; + type UploadPackStream = CancellableReceiverStream>; + type ReceivePackStream = CancellableReceiverStream>; type PackObjectsStream = ReceiverStream>; async fn advertise_refs( @@ -112,7 +113,12 @@ impl pack_service_server::PackService for GitksService { .upload_pack(tokio_stream::wrappers::ReceiverStream::new(rx)) .await?; let out = super::bridge_server_stream(resp.into_inner()); - return Ok(tonic::Response::new(out)); + // Create a dummy cancel token for the forwarded stream + let cancel_token = tokio_util::sync::CancellationToken::new(); + let cancel_guard = cancel_token.drop_guard(); + return Ok(tonic::Response::new( + crate::pack::CancellableReceiverStream::new(out, cancel_guard), + )); } crate::metrics::record_rpc_error(&m, &err); return Err(err); @@ -182,7 +188,12 @@ impl pack_service_server::PackService for GitksService { .receive_pack(tokio_stream::wrappers::ReceiverStream::new(rx)) .await?; let out = super::bridge_server_stream(resp.into_inner()); - return Ok(tonic::Response::new(out)); + // Create a dummy cancel token for the forwarded stream + let cancel_token = tokio_util::sync::CancellationToken::new(); + let cancel_guard = cancel_token.drop_guard(); + return Ok(tonic::Response::new( + crate::pack::CancellableReceiverStream::new(out, cancel_guard), + )); } crate::metrics::record_rpc_error(&m, &err); return Err(err); @@ -333,7 +344,11 @@ impl pack_service_server::PackService for GitksService { inputs.push(msg?); } let _rate = self - .acquire_rate_limit(inputs.first().and_then(|r: &IndexPackRequest| r.repository.as_ref())) + .acquire_rate_limit( + inputs + .first() + .and_then(|r: &IndexPackRequest| r.repository.as_ref()), + ) .await?; let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref())); let span = tracing::info_span!("pack.index_pack", %repo); diff --git a/server/refs.rs b/server/refs.rs index 0501143..53fe3c1 100644 --- a/server/refs.rs +++ b/server/refs.rs @@ -1,5 +1,5 @@ -use crate::pb::*; use crate::pb::ref_service_server::RefService; +use crate::pb::*; use super::GitksService; diff --git a/server/remote.rs b/server/remote.rs index a078123..747f44b 100644 --- a/server/remote.rs +++ b/server/remote.rs @@ -1,5 +1,5 @@ -use crate::pb::*; use crate::pb::remote_service_server::RemoteService; +use crate::pb::*; use crate::remote::find_remote::{find_remote_repository, find_remote_root_ref}; use super::GitksService; diff --git a/server/repository.rs b/server/repository.rs index 0a5faeb..dbff47b 100644 --- a/server/repository.rs +++ b/server/repository.rs @@ -434,7 +434,6 @@ impl repository_service_server::RepositoryService for GitksService { Ok(tonic::Response::new(resp)) } - async fn list_hooks( &self, request: tonic::Request, @@ -495,7 +494,6 @@ impl repository_service_server::RepositoryService for GitksService { Ok(tonic::Response::new(())) } - async fn create_snapshot( &self, request: tonic::Request, @@ -600,7 +598,6 @@ impl repository_service_server::RepositoryService for GitksService { Ok(tonic::Response::new(())) } - type FetchRepositoryDataStream = ReceiverStream>; @@ -698,7 +695,6 @@ impl repository_service_server::RepositoryService for GitksService { Ok(tonic::Response::new(ReceiverStream::new(rx))) } - async fn find_merge_base( &self, request: tonic::Request, @@ -751,7 +747,6 @@ impl repository_service_server::RepositoryService for GitksService { Ok(tonic::Response::new(resp)) } - async fn objects_size( &self, request: tonic::Request, @@ -795,7 +790,8 @@ impl repository_service_server::RepositoryService for GitksService { &self, request: tonic::Request, ) -> Result, tonic::Status> { - let m = crate::metrics::RequestMetrics::new("gitks.RepositoryService/CreateRepositoryFromURL"); + let m = + crate::metrics::RequestMetrics::new("gitks.RepositoryService/CreateRepositoryFromURL"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let bare_dir = self.resolve_for_init(inner.repository.as_ref())?; @@ -816,7 +812,6 @@ impl repository_service_server::RepositoryService for GitksService { })) } - async fn find_license( &self, request: tonic::Request, diff --git a/tests/cluster_test.rs b/tests/cluster_test.rs index 83244a3..bdb06fc 100644 --- a/tests/cluster_test.rs +++ b/tests/cluster_test.rs @@ -1,11 +1,9 @@ #[cfg(test)] mod cluster_test { use gitks::pb::{ - repository_service_client::RepositoryServiceClient, - branch_service_client::BranchServiceClient, - RepositoryHeader, InitRepositoryRequest, CreateBranchRequest, - GetRepositoryRequest, - ObjectSelector, ObjectName, object_selector, + CreateBranchRequest, GetRepositoryRequest, InitRepositoryRequest, ObjectName, + ObjectSelector, RepositoryHeader, branch_service_client::BranchServiceClient, + object_selector, repository_service_client::RepositoryServiceClient, }; const N1: &str = "http://localhost:50051"; @@ -13,49 +11,76 @@ mod cluster_test { const N3: &str = "http://localhost:50053"; fn hdr(path: &str) -> RepositoryHeader { - RepositoryHeader { storage_name: String::new(), relative_path: path.into(), storage_path: String::new() } + RepositoryHeader { + storage_name: String::new(), + relative_path: path.into(), + storage_path: String::new(), + } } #[tokio::test] async fn test_cluster_routing() { - let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs(); + let ts = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs(); let repo = format!("cluster-test-{ts}"); // ── Init via node1 ── let mut n1 = RepositoryServiceClient::connect(N1).await.unwrap(); - let r = n1.init_repository(tonic::Request::new(InitRepositoryRequest { - repository: Some(hdr(&repo)), bare: true, object_format: 0, initial_branch: "main".into(), - })).await.unwrap().into_inner(); + let r = n1 + .init_repository(tonic::Request::new(InitRepositoryRequest { + repository: Some(hdr(&repo)), + bare: true, + object_format: 0, + initial_branch: "main".into(), + })) + .await + .unwrap() + .into_inner(); println!("✅ n1 init: bare={}", r.bare); // ── Read via node2 (should forward to PRIMARY n1) ── let mut n2 = RepositoryServiceClient::connect(N2).await.unwrap(); - let r2 = n2.get_repository(tonic::Request::new(GetRepositoryRequest { - repository: Some(hdr(&repo)), - })).await.unwrap().into_inner(); + let r2 = n2 + .get_repository(tonic::Request::new(GetRepositoryRequest { + repository: Some(hdr(&repo)), + })) + .await + .unwrap() + .into_inner(); println!("✅ n2 get routed→primary: bare={}", r2.bare); // ── Read via node3 ── let mut n3 = RepositoryServiceClient::connect(N3).await.unwrap(); - let r3 = n3.get_repository(tonic::Request::new(GetRepositoryRequest { - repository: Some(hdr(&repo)), - })).await.unwrap().into_inner(); + let r3 = n3 + .get_repository(tonic::Request::new(GetRepositoryRequest { + repository: Some(hdr(&repo)), + })) + .await + .unwrap() + .into_inner(); println!("✅ n3 get routed→primary: bare={}", r3.bare); // ── Write (create branch) via node2 → primary ── let mut n2b = BranchServiceClient::connect(N2).await.unwrap(); - let b = n2b.create_branch(tonic::Request::new(CreateBranchRequest { - repository: Some(hdr(&repo)), - name: "feature/x".into(), - start_point: Some(ObjectSelector { - selector: Some(object_selector::Selector::Revision(ObjectName { - revision: "main".into(), - })), - }), - force: false, - })).await; + let b = n2b + .create_branch(tonic::Request::new(CreateBranchRequest { + repository: Some(hdr(&repo)), + name: "feature/x".into(), + start_point: Some(ObjectSelector { + selector: Some(object_selector::Selector::Revision(ObjectName { + revision: "main".into(), + })), + }), + force: false, + })) + .await; match b { - Ok(branch) => println!("✅ n2 create-branch routed→primary: name={}", branch.into_inner().name), + Ok(branch) => println!( + "✅ n2 create-branch routed→primary: name={}", + branch.into_inner().name + ), Err(e) => println!("⚠️ create-branch: {e} (expected — empty repo has no commits)"), }