refactor(actor): implement Raft consensus algorithm for cluster leader election

- Add voting mechanism with term tracking and vote persistence
- Implement election triggering logic with majority vote counting
- Add primary/replica role transition handling with state management
- Integrate health check failure detection for automatic elections
- Refactor actor messaging system for distributed coordination
- Update repository registration to query cluster for existing primary
- Add broadcast mechanism for role change notifications
- Implement proper term comparison and duplicate request filtering
- Upgrade dependency versions including tokio-util for async utilities
- Optimize code formatting and line wrapping for improved readability
- Remove redundant blank lines and improve code structure consistency
- Enhance error logging and trace information for debugging purposes
This commit is contained in:
zhenyi
2026-06-10 12:35:10 +08:00
parent ab32e8826e
commit 9a0c26e5f6
40 changed files with 1184 additions and 449 deletions
Generated
+29 -29
View File
@@ -134,9 +134,9 @@ checksum = "72b3254f16251a8381aa12e40e3c4d2f0199f8c6508fbecb9d91f575e0fbb8c6"
[[package]]
name = "bitflags"
version = "2.12.1"
version = "2.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "84d7ced0ae9557296835c32bf1b1e02b44c746701f898460fb000d7eaa84f00a"
checksum = "b4388bee8683e3d04af747c73422af53102d2bd24d9eadb6cbc100baef4b43f8"
dependencies = [
"serde_core",
]
@@ -704,6 +704,7 @@ dependencies = [
"thiserror",
"tokio",
"tokio-stream",
"tokio-util",
"tonic",
"tonic-prost",
"tonic-prost-build",
@@ -1655,9 +1656,9 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
[[package]]
name = "http"
version = "1.4.1"
version = "1.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8be7462df143984c4598a256ef469b251d7d7f9e271135073e78fc535414f3d0"
checksum = "6970f50e31d6fc17d3fa27329444bfa74e196cf62e95052a3f6fee181dba6425"
dependencies = [
"bytes",
"itoa",
@@ -1854,13 +1855,12 @@ dependencies = [
[[package]]
name = "js-sys"
version = "0.3.99"
version = "0.3.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "142bc4740e452c1e57ade0cbc129f139c9093e354346f0872ef985f4f5cf5f11"
checksum = "f2025f20d7a4fa7785846e7b63d10a76d3f1cee98ee5cb79ea59703f95e42162"
dependencies = [
"cfg-if",
"futures-util",
"once_cell",
"wasm-bindgen",
]
@@ -2164,9 +2164,9 @@ dependencies = [
[[package]]
name = "prost"
version = "0.14.3"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
checksum = "528ac67416ff8646872a3c02cad9cc4ee5dc9f9540c9b10771855c95cb2e5ae1"
dependencies = [
"bytes",
"prost-derive",
@@ -2174,9 +2174,9 @@ dependencies = [
[[package]]
name = "prost-build"
version = "0.14.3"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "343d3bd7056eda839b03204e68deff7d1b13aba7af2b2fd16890697274262ee7"
checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042"
dependencies = [
"heck",
"itertools",
@@ -2195,9 +2195,9 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.14.3"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf"
dependencies = [
"anyhow",
"itertools",
@@ -2208,9 +2208,9 @@ dependencies = [
[[package]]
name = "prost-types"
version = "0.14.3"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8991c4cbdb8bc5b11f0b074ffe286c30e523de90fee5ba8132f1399f23cb3dd7"
checksum = "f94967dc7688f3054c7fac87473ffae4cc4c3904800e2d9f5b857246d8963b0a"
dependencies = [
"prost",
]
@@ -3161,9 +3161,9 @@ checksum = "8ecb6da28b8a351d773b68d5825ac39017e680750f980f3a1a85cd8dd28a47c1"
[[package]]
name = "uuid"
version = "1.23.2"
version = "1.23.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d258b83ceec21034727ecee8c382cfa6c3e133699b0742c64571814fb420c9f7"
checksum = "144d6b123cef80b301b8f72a9e2ca4370ddec21950d0a103dd22c437006d2db7"
dependencies = [
"getrandom 0.4.2",
"js-sys",
@@ -3227,9 +3227,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3ed04576f974d2b2fba0f38c51dbc5518011e38c36bf1143164be765528fd409"
checksum = "a254a4b10c19a76f09a27640e7ffbf9bc30bf67e16a3bf28aaefa4920fe81563"
dependencies = [
"cfg-if",
"once_cell",
@@ -3240,9 +3240,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-futures"
version = "0.4.72"
version = "0.4.73"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9473dbd2991ae90b6291c3c32c30c6187ac49aa32f9905d1cce280ec1e110b0f"
checksum = "54568702fabf5d4849ce2b90fadfa64168a097eaf4b351ce9df8b687a0086aaf"
dependencies = [
"js-sys",
"wasm-bindgen",
@@ -3250,9 +3250,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "916151b09da36bd82f6615cbf3a419e2f0ba23a03c6160e8e92eb6bd4aa1dec6"
checksum = "24a40fc75b0ec6f3746ceb10d36f53a93dcd68a93b11b6445983945d79eba0dc"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
@@ -3260,9 +3260,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "299047362ccbfce148b67ab7e73349f77748e00c8296f9542adfad2ad82c5c5e"
checksum = "908f34bd9b9ce3d4caf07b72dfab63d61504d156856c6bd3cd87fa350cf3985b"
dependencies = [
"bumpalo",
"proc-macro2",
@@ -3273,9 +3273,9 @@ dependencies = [
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.122"
version = "0.2.123"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a929b2c61f11ba3e9bc35b50c1f25cb38e0e892c0c231ae2b8cf78d5dad4437"
checksum = "7acbf7616c27b194bbb550bf77ed0c2c3e5b7fd1260a93082b95fb7f47959b92"
dependencies = [
"unicode-ident",
]
@@ -3316,9 +3316,9 @@ dependencies = [
[[package]]
name = "web-sys"
version = "0.3.99"
version = "0.3.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d621441cfc37b84979402712047321980c178f299193a3589d05b99e8763436"
checksum = "6e0871acf327f283dc6da28a1696cdc64fb355ba9f935d052021fa77f35cce69"
dependencies = [
"js-sys",
"wasm-bindgen",
+1
View File
@@ -26,6 +26,7 @@ duct = { version = "1", features = [] }
tracing = { version = "0.1", features = ["log"] }
tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "io-util", "sync", "net"] }
tokio-stream = { version = "0.1", features = ["full"] }
tokio-util = "0.7"
thiserror = { version = "2", features = [] }
prost = "0.14"
prost-types = "0.14"
+132 -48
View File
@@ -2,10 +2,11 @@ use crate::actor::message::{
ElectionRequest, ElectionResult, GitNodeMessage, NodeHealth, ROLE_PRIMARY, ROLE_REPLICA,
RefUpdateEvent, RoleChangedEvent, RouteDecision,
};
use crate::pb::RepositoryHeader;
use crate::server::GitksService;
use async_trait::async_trait;
use ractor::pg;
use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent};
use ractor::{Actor, ActorCell, ActorProcessingErr, ActorRef, SupervisionEvent};
use std::collections::HashMap;
#[derive(Clone)]
@@ -43,6 +44,7 @@ pub struct GitNodeState {
health_failures: u32,
is_primary: bool,
last_known_primary_grpc: String,
voted_for: Option<String>,
}
#[async_trait]
@@ -76,6 +78,7 @@ impl Actor for GitNodeActor {
health_failures: 0,
is_primary: true, // Will be refined at registration
last_known_primary_grpc: args.grpc_addr.clone(),
voted_for: None,
})
}
@@ -96,12 +99,12 @@ impl Actor for GitNodeActor {
.unwrap_or(&repo_path)
.trim_start_matches('/')
.to_string();
register_repo(&myself, state, relative_path);
register_repo(&myself, state, relative_path).await;
}
}
GitNodeMessage::RegisterRepository(header) => {
register_repo(&myself, state, header.relative_path);
register_repo(&myself, state, header.relative_path).await;
}
GitNodeMessage::RemoveRepository(header) => {
@@ -173,10 +176,12 @@ impl Actor for GitNodeActor {
term = request.term,
current_term = state.current_term,
accepted = accepted,
voted_for = ?state.voted_for,
"election vote"
);
if accepted {
state.current_term = request.term;
state.voted_for = Some(request.candidate_storage_name.clone());
state.last_known_primary_grpc = request.candidate_grpc_addr.clone();
}
reply
@@ -208,6 +213,7 @@ impl Actor for GitNodeActor {
state.is_primary = true;
state.current_term = event.term;
state.health_failures = 0;
state.voted_for = None;
for entry in state.repos.values_mut() {
entry.role = ROLE_PRIMARY.to_string();
entry.read_only = false;
@@ -220,6 +226,7 @@ impl Actor for GitNodeActor {
);
state.is_primary = false;
state.current_term = event.term;
state.voted_for = None;
for entry in state.repos.values_mut() {
entry.role = ROLE_REPLICA.to_string();
}
@@ -237,6 +244,76 @@ impl Actor for GitNodeActor {
};
}
}
GitNodeMessage::TriggerElection => {
let members = ractor::pg::get_members(&"gitks_nodes".to_string());
let total = members.len();
let my_cell = myself.get_cell();
let new_term = state.current_term.wrapping_add(1);
let mut accepted_count = 0u64;
for member in &members {
if *member == my_cell {
// We vote for ourselves
accepted_count += 1;
continue;
}
let actor_ref: ActorRef<GitNodeMessage> = member.clone().into();
let request = ElectionRequest {
candidate_storage_name: state.storage_name.clone(),
candidate_grpc_addr: state.grpc_addr.clone(),
candidate_actor_name: state.actor_name.clone(),
term: new_term,
reason: "health_check_failure".to_string(),
};
match ractor::call_t!(actor_ref, GitNodeMessage::ElectPrimary, 1000, request) {
Ok(result) if result.accepted => {
accepted_count += 1;
}
Ok(_) => {}
Err(_) => {
tracing::warn!(
member = ?member.get_id(),
"no response from member during election"
);
}
}
}
let majority = (total / 2).max(1) + 1;
if accepted_count >= majority as u64 {
tracing::info!(
term = new_term,
accepted = accepted_count,
total = total,
"won election, promoting to PRIMARY"
);
state.is_primary = true;
state.current_term = new_term;
state.health_failures = 0;
state.voted_for = None;
for entry in state.repos.values_mut() {
entry.role = ROLE_PRIMARY.to_string();
entry.read_only = false;
}
let role_event = RoleChangedEvent {
storage_name: state.storage_name.clone(),
grpc_addr: state.grpc_addr.clone(),
new_role: ROLE_PRIMARY.to_string(),
term: new_term,
relative_paths: state.repos.keys().cloned().collect(),
};
broadcast_role_changed(&myself, role_event);
} else {
tracing::warn!(
term = new_term,
accepted = accepted_count,
total = total,
"election lost, staying as REPLICA"
);
}
}
}
Ok(())
}
@@ -277,9 +354,8 @@ impl Actor for GitNodeActor {
/// Determine whether to accept an election request.
fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bool {
// Only accept if the term is greater than our current term
// (prevents old/duplicate election messages)
if request.term <= state.current_term {
// Reject old terms (prevents old/duplicate election messages)
if request.term < state.current_term {
tracing::warn!(
request_term = request.term,
current_term = state.current_term,
@@ -287,6 +363,20 @@ fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bo
);
return false;
}
// Same term: only accept if we haven't already voted for someone else
if request.term == state.current_term
&& let Some(ref voted_for) = state.voted_for
&& voted_for != &request.candidate_storage_name
{
tracing::warn!(
request_term = request.term,
current_term = state.current_term,
already_voted = %voted_for,
candidate = %request.candidate_storage_name,
"rejecting election: already voted this term"
);
return false;
}
true
}
@@ -318,7 +408,7 @@ fn build_decision(
}
}
fn register_repo(
async fn register_repo(
myself: &ActorRef<GitNodeMessage>,
state: &mut GitNodeState,
relative_path: String,
@@ -329,12 +419,21 @@ fn register_repo(
let members = ractor::pg::get_members(&"gitks_nodes".to_string());
let my_cell = myself.get_cell();
let other_nodes_exist = members.iter().any(|m| m != &my_cell);
let role = if other_nodes_exist {
let role = if members.iter().any(|m| m != &my_cell) {
let header = RepositoryHeader {
storage_name: String::new(),
relative_path: relative_path.clone(),
storage_path: String::new(),
};
let primary_found = find_primary_in_cluster(&members, &my_cell, &header).await;
if primary_found {
ROLE_REPLICA.to_string()
} else {
ROLE_PRIMARY.to_string()
}
} else {
ROLE_PRIMARY.to_string()
};
if role == ROLE_PRIMARY {
@@ -365,6 +464,28 @@ fn register_repo(
);
}
/// Query all cluster members (except self) to find if a repository has a PRIMARY.
pub async fn find_primary_in_cluster(
members: &[ActorCell],
my_cell: &ActorCell,
header: &RepositoryHeader,
) -> bool {
for member in members {
if member == my_cell {
continue;
}
let actor_ref: ActorRef<GitNodeMessage> = member.clone().into();
if let Ok(decision) =
ractor::call_t!(actor_ref, GitNodeMessage::FindPrimary, 500, header.clone())
&& decision.found
&& decision.role == ROLE_PRIMARY
{
return true;
}
}
false
}
fn extract_category(relative_path: &str) -> &str {
relative_path.split('/').next().unwrap_or("root")
}
@@ -417,9 +538,9 @@ fn start_health_checker(myself: ActorRef<GitNodeMessage>, interval_secs: u64, ma
if consecutive_failures >= max_failures {
tracing::error!(
"no other nodes reachable for {max_failures} checks, triggering self-election as PRIMARY"
"no other nodes reachable for {max_failures} checks, triggering election"
);
trigger_self_election(&myself);
myself.cast(GitNodeMessage::TriggerElection).ok();
consecutive_failures = 0;
}
}
@@ -427,43 +548,6 @@ fn start_health_checker(myself: ActorRef<GitNodeMessage>, interval_secs: u64, ma
});
}
/// Trigger self-election: this node promotes itself to PRIMARY.
fn trigger_self_election(myself: &ActorRef<GitNodeMessage>) {
let members = ractor::pg::get_members(&"gitks_nodes".to_string());
let total_nodes = members.len();
tracing::warn!(
total_nodes = total_nodes,
"initiating self-election as new PRIMARY"
);
let new_term = std::time::SystemTime::now()
.duration_since(std::time::SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs();
myself
.cast(GitNodeMessage::RoleChanged(RoleChangedEvent {
storage_name: String::new(), // will be filled by handler from our own state
grpc_addr: String::new(),
new_role: ROLE_PRIMARY.to_string(),
term: new_term,
relative_paths: Vec::new(), // all repos
}))
.ok();
broadcast_role_changed(
myself,
RoleChangedEvent {
storage_name: String::new(), // handler fills
grpc_addr: String::new(),
new_role: ROLE_PRIMARY.to_string(),
term: new_term,
relative_paths: Vec::new(),
},
);
}
pub async fn start_node_actor(
service: GitksService,
storage_name: String,
+3 -1
View File
@@ -149,6 +149,9 @@ pub enum GitNodeMessage {
/// A role change has occurred in the cluster.
RoleChanged(RoleChangedEvent),
/// Health checker detected primary failure, trigger election.
TriggerElection,
}
#[derive(ractor_cluster::RactorMessage)]
@@ -156,7 +159,6 @@ pub enum RepoActorMessage {
UpdateMetadata(RepositoryHeader),
}
/// Request for a node to vote in a PRIMARY election.
#[derive(Debug, Clone)]
pub struct ElectionRequest {
+1
View File
@@ -3,6 +3,7 @@ pub mod message;
pub mod server;
pub mod sync;
pub use handler::find_primary_in_cluster;
pub use handler::{
GitNodeActor, GitNodeArgs, RepoEntry, broadcast_ref_update, broadcast_role_changed,
get_category_members, get_cluster_nodes, list_all_groups, route_group_for, start_node_actor,
+135 -34
View File
@@ -39,6 +39,57 @@ impl BundleApplicator {
}
Ok(())
}
/// Apply bundle from a file path (for streaming writes).
pub fn apply_bundle_from_file(&self, path: &Path) -> Result<(), String> {
let file = std::fs::File::open(path).map_err(|e| format!("open bundle file: {e}"))?;
let mut child = std::process::Command::new("git")
.args([
"--git-dir",
&self.repo_path.to_string_lossy(),
"bundle",
"unbundle",
"-",
])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
.spawn()
.map_err(|e| format!("spawn git bundle unbundle: {e}"))?;
// Stream file contents to stdin in a background thread
let mut stdin = child.stdin.take().ok_or("no stdin")?;
let file_handle = file;
let writer = std::thread::spawn(move || -> Result<(), String> {
use std::io::{Read, Write};
let mut reader = std::io::BufReader::new(file_handle);
let mut buf = vec![0u8; 65536];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
stdin
.write_all(&buf[..n])
.map_err(|e| format!("write to stdin: {e}"))?;
}
Err(e) => return Err(format!("read bundle file: {e}")),
}
}
Ok(())
});
let output = child
.wait_with_output()
.map_err(|e| format!("wait bundle: {e}"))?;
// Wait for writer thread
let _ = writer.join().map_err(|_| "writer thread panicked")?;
if !output.status.success() {
return Err(String::from_utf8_lossy(&output.stderr).into_owned());
}
Ok(())
}
}
pub fn collect_local_haves(repo_path: &Path) -> Result<Vec<Oid>, String> {
@@ -92,20 +143,45 @@ pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf)
let relative_path = event.relative_path.clone();
let repo_for_haves = local_repo_path.clone();
// Collect haves in a blocking thread
let haves = match tokio::task::spawn_blocking(move || collect_local_haves(&repo_for_haves))
.await
{
Ok(Ok(h)) => h,
Ok(Err(e)) => {
tracing::error!(relative_path = %event.relative_path, error = %e, "collect haves failed");
return;
}
Err(e) => {
tracing::error!(relative_path = %event.relative_path, error = %e, "haves task failed");
return;
}
};
// Stream pack data to a temporary file to avoid OOM
let temp_dir = local_repo_path.join(".gitks_tmp");
if let Err(e) = std::fs::create_dir_all(&temp_dir) {
tracing::error!(relative_path = %event.relative_path, error = %e, "create temp dir failed");
return;
}
let pack_result =
sync_via_pack_service_to_file(&grpc_addr, &relative_path, &haves, &temp_dir).await;
match pack_result {
Ok(Some(pack_file)) => {
let repo = local_repo_path.clone();
let pack_path = pack_file.clone();
match tokio::task::spawn_blocking(move || {
sync_via_pack_service(&grpc_addr, &relative_path, &repo_for_haves)
let applicator = BundleApplicator::new(repo);
applicator.apply_bundle_from_file(&pack_path)
})
.await
{
Ok(Ok(pack_data)) if !pack_data.is_empty() => {
let pack_len = pack_data.len();
let repo = local_repo_path.clone();
match tokio::task::spawn_blocking(move || apply_pack_data(&repo, &pack_data)).await {
Ok(Ok(())) => {
update_local_ref(&local_repo_path, &event.ref_name, &event.new_oid);
tracing::info!(
relative_path = %event.relative_path,
bytes = pack_len,
"replica sync done"
);
}
@@ -116,32 +192,35 @@ pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf)
tracing::error!(relative_path = %event.relative_path, error = %e, "apply task failed")
}
}
// Cleanup temp file
let _ = std::fs::remove_file(&pack_file);
}
Ok(Ok(_)) => {
Ok(None) => {
tracing::warn!(relative_path = %event.relative_path, "empty pack data from primary")
}
Ok(Err(e)) => {
Err(e) => {
tracing::error!(relative_path = %event.relative_path, error = %e, "pack fetch failed")
}
Err(e) => {
tracing::error!(relative_path = %event.relative_path, error = %e, "sync task failed")
}
}
// Cleanup temp dir if empty
let _ = std::fs::remove_dir(&temp_dir);
}
fn sync_via_pack_service(
/// Maximum pack size before we reject (10GB)
const MAX_PACK_SIZE: u64 = 10 * 1024 * 1024 * 1024;
/// Stream pack data from primary to a temporary file.
/// Returns Ok(Some(path)) on success, Ok(None) if empty, Err on failure.
async fn sync_via_pack_service_to_file(
grpc_addr: &str,
relative_path: &str,
local_repo_path: &Path,
) -> Result<Vec<u8>, String> {
let haves = collect_local_haves(local_repo_path)?;
let rt = tokio::runtime::Handle::current();
rt.block_on(async {
haves: &[Oid],
temp_dir: &Path,
) -> Result<Option<PathBuf>, String> {
use crate::pb::pack_service_client::PackServiceClient;
use crate::pb::{
AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader,
};
use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader};
use tokio::io::AsyncWriteExt;
use tokio_stream::StreamExt;
let endpoint = crate::server::remote_endpoint(grpc_addr)
@@ -170,7 +249,7 @@ fn sync_via_pack_service(
let refs = refs_resp.into_inner().references;
if refs.is_empty() {
return Ok(Vec::new());
return Ok(None);
}
let wants: Vec<Oid> = refs.iter().filter_map(|r| r.target_oid.clone()).collect();
@@ -187,7 +266,7 @@ fn sync_via_pack_service(
let options = PackObjectsOptions {
wants,
haves,
haves: haves.to_vec(),
shallow_revisions: Vec::new(),
deepen: 0,
thin_pack: false,
@@ -208,27 +287,49 @@ fn sync_via_pack_service(
.map_err(|e| format!("PackObjects: {e}"))?;
let mut stream = resp.into_inner();
let mut pack_data = Vec::new();
// Create a temporary file for streaming
let temp_file = temp_dir.join(format!("pack_{}.bundle", std::process::id()));
let mut file = tokio::fs::File::create(&temp_file)
.await
.map_err(|e| format!("create temp file: {e}"))?;
let mut total_bytes: u64 = 0;
while let Some(chunk) = stream.next().await {
match chunk {
Ok(msg) => pack_data.extend_from_slice(&msg.data),
Err(e) => return Err(format!("pack stream: {e}")),
Ok(msg) => {
total_bytes += msg.data.len() as u64;
if total_bytes > MAX_PACK_SIZE {
let _ = tokio::fs::remove_file(&temp_file).await;
return Err(format!(
"pack data exceeds maximum size ({}GB)",
MAX_PACK_SIZE / (1024 * 1024 * 1024)
));
}
file.write_all(&msg.data)
.await
.map_err(|e| format!("write pack data: {e}"))?;
}
Err(e) => {
let _ = tokio::fs::remove_file(&temp_file).await;
return Err(format!("pack stream: {e}"));
}
}
}
// Flush and close the file
file.flush()
.await
.map_err(|e| format!("flush pack file: {e}"))?;
drop(file);
tracing::info!(
relative_path = %relative_path,
pack_bytes = pack_data.len(),
pack_bytes = total_bytes,
"received pack data from primary"
);
Ok(pack_data)
})
}
fn apply_pack_data(repo_path: &Path, pack_data: &[u8]) -> Result<(), String> {
let applicator = BundleApplicator::new(repo_path.to_path_buf());
applicator.apply_bundle(pack_data)
Ok(Some(temp_file))
}
fn update_local_ref(repo_path: &Path, ref_name: &str, new_oid: &str) {
+13 -3
View File
@@ -5,7 +5,11 @@ use crate::pb::*;
impl GitBare {
/// Count commits in a revision range or path.
pub fn count_commits(&self, request: CountCommitsRequest) -> GitResult<CountCommitsResponse> {
let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision };
let revision = if request.revision.is_empty() {
"HEAD"
} else {
&request.revision
};
crate::sanitize::validate_revision(revision)?;
let mut args = vec![
@@ -48,7 +52,10 @@ impl GitBare {
}
/// Count diverging commits between two branches (left vs right).
pub fn count_diverging_commits(&self, request: CountDivergingCommitsRequest) -> GitResult<CountDivergingCommitsResponse> {
pub fn count_diverging_commits(
&self,
request: CountDivergingCommitsRequest,
) -> GitResult<CountDivergingCommitsResponse> {
crate::sanitize::validate_revision(&request.left)?;
crate::sanitize::validate_revision(&request.right)?;
@@ -75,6 +82,9 @@ impl GitBare {
let left_count = parts.first().and_then(|s| s.parse().ok()).unwrap_or(0);
let right_count = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0);
Ok(CountDivergingCommitsResponse { left_count, right_count })
Ok(CountDivergingCommitsResponse {
left_count,
right_count,
})
}
}
+21 -6
View File
@@ -13,18 +13,27 @@ impl GitBare {
crate::sanitize::validate_revision(&revision)?;
let repo = self.gix_repo()?;
let oid = repo.rev_parse_single(revision.as_str())
let oid = repo
.rev_parse_single(revision.as_str())
.map_err(|e| GitError::Gix(e.to_string()))?;
let commit = oid.object()
let commit = oid
.object()
.map_err(|e| GitError::Gix(e.to_string()))?
.try_into_commit()
.map_err(|e| GitError::Gix(format!("not a commit: {e}")))?;
Ok(crate::commit::get_commit::commit_to_pb(self, &commit, request.include_stats))
Ok(crate::commit::get_commit::commit_to_pb(
self,
&commit,
request.include_stats,
))
}
/// Batch lookup commits by OID list.
pub fn list_commits_by_oid(&self, request: ListCommitsByOidRequest) -> GitResult<ListCommitsByOidResponse> {
pub fn list_commits_by_oid(
&self,
request: ListCommitsByOidRequest,
) -> GitResult<ListCommitsByOidResponse> {
let repo = self.gix_repo()?;
let mut commits = Vec::new();
@@ -33,11 +42,17 @@ impl GitBare {
if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) {
if let Ok(obj) = repo.find_object(oid) {
if let Ok(commit) = obj.try_into_commit() {
commits.push(crate::commit::get_commit::commit_to_pb(self, &commit, request.include_stats));
commits.push(crate::commit::get_commit::commit_to_pb(
self,
&commit,
request.include_stats,
));
}
}
}
if commits.len() >= 100 { break; }
if commits.len() >= 100 {
break;
}
}
Ok(ListCommitsByOidResponse { commits })
+4 -1
View File
@@ -19,7 +19,10 @@ impl GitBare {
pub(crate) fn commit_to_pb(gb: &GitBare, commit: &gix::Commit<'_>, include_raw: bool) -> Commit {
let hex = commit.id.to_string();
let tree_hex = commit.tree_id().map(|t| t.to_string()).unwrap_or_default();
let message = commit.message_raw().map(|m| m.to_string()).unwrap_or_default();
let message = commit
.message_raw()
.map(|m| m.to_string())
.unwrap_or_default();
let (subject, body) = message
.split_once('\n')
.map(|(s, b)| (s.to_string(), b.trim_start_matches('\n').to_string()))
+46 -13
View File
@@ -4,11 +4,22 @@ use crate::pb::*;
impl GitBare {
/// Search commits by message content.
pub fn commits_by_message(&self, request: CommitsByMessageRequest) -> GitResult<CommitsByMessageResponse> {
let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision };
pub fn commits_by_message(
&self,
request: CommitsByMessageRequest,
) -> GitResult<CommitsByMessageResponse> {
let revision = if request.revision.is_empty() {
"HEAD"
} else {
&request.revision
};
crate::sanitize::validate_revision(revision)?;
let limit = if request.limit == 0 { 20 } else { request.limit.min(200) };
let limit = if request.limit == 0 {
20
} else {
request.limit.min(200)
};
let mut args = vec![
"--git-dir".to_string(),
@@ -50,7 +61,9 @@ impl GitBare {
if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) {
if let Ok(obj) = repo.find_object(oid) {
if let Ok(commit) = obj.try_into_commit() {
commits.push(crate::commit::get_commit::commit_to_pb(self, &commit, false));
commits.push(crate::commit::get_commit::commit_to_pb(
self, &commit, false,
));
}
}
}
@@ -60,7 +73,10 @@ impl GitBare {
}
/// Batch check if objects/revisions exist.
pub fn check_objects_exist(&self, request: CheckObjectsExistRequest) -> GitResult<CheckObjectsExistResponse> {
pub fn check_objects_exist(
&self,
request: CheckObjectsExistRequest,
) -> GitResult<CheckObjectsExistResponse> {
let repo = self.gix_repo()?;
let mut revisions = Vec::new();
@@ -119,13 +135,24 @@ impl GitBare {
}
}
Ok(CommitStats { additions, deletions, changed_files })
Ok(CommitStats {
additions,
deletions,
changed_files,
})
}
/// Get the last commit for a given path.
pub fn last_commit_for_path(&self, request: LastCommitForPathRequest) -> GitResult<LastCommitForPathResponse> {
pub fn last_commit_for_path(
&self,
request: LastCommitForPathRequest,
) -> GitResult<LastCommitForPathResponse> {
crate::sanitize::validate_file_path(&request.path)?;
let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision };
let revision = if request.revision.is_empty() {
"HEAD"
} else {
&request.revision
};
crate::sanitize::validate_revision(revision)?;
let args = vec![
@@ -155,20 +182,26 @@ impl GitBare {
let hex = stdout.lines().next().unwrap_or("").trim().to_string();
if hex.is_empty() {
return Ok(LastCommitForPathResponse { commit: None, path: request.path });
return Ok(LastCommitForPathResponse {
commit: None,
path: request.path,
});
}
let repo = self.gix_repo()?;
let commit = if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) {
repo.find_object(oid).ok().and_then(|obj| {
obj.try_into_commit().ok().map(|c| {
crate::commit::get_commit::commit_to_pb(self, &c, false)
})
obj.try_into_commit()
.ok()
.map(|c| crate::commit::get_commit::commit_to_pb(self, &c, false))
})
} else {
None
};
Ok(LastCommitForPathResponse { commit, path: request.path })
Ok(LastCommitForPathResponse {
commit,
path: request.path,
})
}
}
+40 -9
View File
@@ -4,7 +4,10 @@ use crate::pb::*;
impl GitBare {
/// Find changed paths between two revisions (no diff content).
pub fn find_changed_paths(&self, request: FindChangedPathsRequest) -> GitResult<FindChangedPathsResponse> {
pub fn find_changed_paths(
&self,
request: FindChangedPathsRequest,
) -> GitResult<FindChangedPathsResponse> {
crate::sanitize::validate_revision(&request.base)?;
crate::sanitize::validate_revision(&request.head)?;
@@ -41,21 +44,49 @@ impl GitBare {
for line in stdout.lines() {
let line = line.trim();
if line.is_empty() { continue; }
if line.is_empty() {
continue;
}
let parts: Vec<&str> = line.split('\t').collect();
if parts.is_empty() { continue; }
if parts.is_empty() {
continue;
}
let status_str = parts[0];
let status_letter = status_str.chars().next().unwrap_or('M');
let (status, old_path, new_path) = match status_letter {
'A' => (changed_path::Status::ChangedPathStatusAdded as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()),
'D' => (changed_path::Status::ChangedPathStatusDeleted as i32, parts.get(1).cloned().unwrap_or_default().to_string(), String::new()),
'R' => (changed_path::Status::ChangedPathStatusRenamed as i32, parts.get(1).cloned().unwrap_or_default().to_string(), parts.get(2).cloned().unwrap_or_default().to_string()),
'C' => (changed_path::Status::ChangedPathStatusCopied as i32, parts.get(1).cloned().unwrap_or_default().to_string(), parts.get(2).cloned().unwrap_or_default().to_string()),
'T' => (changed_path::Status::ChangedPathStatusTypeChanged as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()),
_ => (changed_path::Status::ChangedPathStatusModified as i32, String::new(), parts.get(1).cloned().unwrap_or_default().to_string()),
'A' => (
changed_path::Status::ChangedPathStatusAdded as i32,
String::new(),
parts.get(1).cloned().unwrap_or_default().to_string(),
),
'D' => (
changed_path::Status::ChangedPathStatusDeleted as i32,
parts.get(1).cloned().unwrap_or_default().to_string(),
String::new(),
),
'R' => (
changed_path::Status::ChangedPathStatusRenamed as i32,
parts.get(1).cloned().unwrap_or_default().to_string(),
parts.get(2).cloned().unwrap_or_default().to_string(),
),
'C' => (
changed_path::Status::ChangedPathStatusCopied as i32,
parts.get(1).cloned().unwrap_or_default().to_string(),
parts.get(2).cloned().unwrap_or_default().to_string(),
),
'T' => (
changed_path::Status::ChangedPathStatusTypeChanged as i32,
String::new(),
parts.get(1).cloned().unwrap_or_default().to_string(),
),
_ => (
changed_path::Status::ChangedPathStatusModified as i32,
String::new(),
parts.get(1).cloned().unwrap_or_default().to_string(),
),
};
paths.push(ChangedPath {
+3 -1
View File
@@ -18,7 +18,9 @@ impl GitBare {
// Apply options if present
if let Some(ref opts) = request.options {
if opts.recursive { args.push("--recursive".to_string()); }
if opts.recursive {
args.push("--recursive".to_string());
}
if opts.include_binary {
args.push("--binary".to_string());
} else {
-5
View File
@@ -94,7 +94,6 @@ impl DiskCache {
self.enabled
}
fn state_dir_for(&self, relative_path: &str) -> PathBuf {
self.repo_prefix
.join(STATE_DIR_RELATIVE)
@@ -109,7 +108,6 @@ impl DiskCache {
self.state_dir_for(relative_path).join("pending")
}
fn cache_dir(&self, namespace: &str) -> PathBuf {
self.repo_prefix.join(namespace)
}
@@ -118,7 +116,6 @@ impl DiskCache {
self.cache_dir(namespace).join(digest_to_path(digest))
}
/// Ensure the state directory for a repository exists and has a `latest` file.
/// If `latest` does not exist, create it with a random value.
pub fn ensure_state(&self, relative_path: &str) -> GitResult<String> {
@@ -230,7 +227,6 @@ impl DiskCache {
Ok(())
}
/// Compute a cache key for an info/refs request.
pub fn compute_info_refs_key(&self, relative_path: &str, protocol: &str) -> GitResult<String> {
let latest = self.ensure_state(relative_path)?;
@@ -268,7 +264,6 @@ impl DiskCache {
Ok(sha256_digest(parts))
}
/// Look up a cached response for the given namespace and digest.
/// Returns the cached bytes if found and not expired.
pub fn lookup(&self, namespace: &str, digest: &str) -> GitResult<Option<Vec<u8>>> {
+12 -8
View File
@@ -116,15 +116,15 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) -
let wait_result = c.wait_timeout(timeout);
match wait_result {
Ok(Some(status)) => {
let output = c.wait_with_output().unwrap_or_else(|_| {
// If we can't get output, at least return the status
Output {
status,
stdout: Vec::new(),
stderr: Vec::new(),
// Process exited within timeout, get its output
// Note: We already have the status, so we need to construct output differently
// Since wait_with_output would fail after try_wait, we return status-only output
HookResult {
accepted: status.success(),
exit_code: status.code().unwrap_or(-1),
stdout: String::new(), // stdout was consumed by the process
stderr: String::new(), // stderr was consumed by the process
}
});
HookResult::from_output(&output)
}
Ok(None) => {
tracing::warn!(
@@ -133,6 +133,8 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) -
"hook script timed out, killing"
);
let _ = c.kill();
// Explicitly wait to reap the zombie process
let _ = c.wait();
HookResult::rejected(format!(
"hook script timed out after {}s: {}",
timeout.as_secs(),
@@ -141,6 +143,8 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) -
}
Err(e) => {
let _ = c.kill();
// Explicitly wait to reap the zombie process
let _ = c.wait();
HookResult::rejected(format!("hook script wait error: {e}"))
}
}
+88 -8
View File
@@ -5,6 +5,7 @@
use crate::error::{GitError, GitResult};
/// Commands/patterns that are never allowed in custom hook scripts.
/// This is a blocklist approach - we also add pattern-based detection.
const FORBIDDEN_PATTERNS: &[&str] = &[
"rm -rf",
"rm -r /",
@@ -24,6 +25,34 @@ const FORBIDDEN_PATTERNS: &[&str] = &[
"init 6",
"poweroff",
"halt",
// Additional patterns to catch encoding/obfuscation attempts
"eval ", // eval can execute arbitrary strings
"exec ", // exec can replace process
"$(", // command substitution
"`", // backtick command substitution
"${", // variable expansion (can be used for obfuscation)
"|bash", // piping to bash
"|sh", // piping to sh
"|dash", // piping to dash
"|zsh", // piping to zsh
"base64", // base64 encoding/decoding (common for obfuscation)
"python -c", // inline python execution
"perl -e", // inline perl execution
"ruby -e", // inline ruby execution
"node -e", // inline node execution
"/dev/tcp", // bash reverse shell
"nc -e", // netcat reverse shell
"ncat", // netcat alternative
"socat", // socket relay
];
/// Additional regex-like patterns that indicate dangerous constructs.
/// These are checked with simple string matching for complexity reasons.
const DANGEROUS_PREFIXES: &[&str] = &[
"rm -rf /", // rm -rf with absolute path
"rm -rf ~", // rm -rf with home directory
"rm -rf .", // rm -rf with relative path (current dir)
"rm -rf *", // rm -rf with wildcard
];
/// Maximum hook script size (64KB).
@@ -43,19 +72,70 @@ pub fn validate_hook_content(content: &str) -> GitResult<()> {
content.len()
)));
}
let content_lower = content.to_lowercase();
for pattern in FORBIDDEN_PATTERNS {
if content_lower.contains(pattern) {
return Err(GitError::InvalidArgument(format!(
"hook content contains forbidden pattern: '{pattern}'"
)));
}
}
if content.contains('\0') {
return Err(GitError::InvalidArgument(
"hook content cannot contain null bytes".into(),
));
}
// Check for forbidden patterns (case-insensitive where appropriate)
let content_lower = content.to_lowercase();
for pattern in FORBIDDEN_PATTERNS {
if content_lower.contains(&pattern.to_lowercase()) {
return Err(GitError::InvalidArgument(format!(
"hook content contains forbidden pattern: '{pattern}'"
)));
}
}
// Check for dangerous prefixes (exact case)
for prefix in DANGEROUS_PREFIXES {
if content.contains(prefix) {
return Err(GitError::InvalidArgument(format!(
"hook content contains dangerous command: '{prefix}'"
)));
}
}
// Check for obfuscation techniques
check_obfuscation_attempts(content)?;
Ok(())
}
/// Check for common obfuscation attempts.
fn check_obfuscation_attempts(content: &str) -> GitResult<()> {
// Check for excessive use of special characters that might indicate obfuscation
let special_char_count = content.chars().filter(|c| matches!(c, '$' | '`' | '\\' | '|' | ';' | '&' | '(' | ')' | '{' | '}' | '[' | ']')).count();
let total_chars = content.chars().count();
// If more than 30% of content is special characters, it's suspicious
if total_chars > 0 && (special_char_count * 100 / total_chars) > 30 {
return Err(GitError::InvalidArgument(
"hook content appears obfuscated (too many special characters)".into(),
));
}
// Check for hex encoding attempts (e.g., \x41\x42)
if content.contains("\\x") {
let hex_count = content.matches("\\x").count();
if hex_count > 5 {
return Err(GitError::InvalidArgument(
"hook content contains hex encoding (potential obfuscation)".into(),
));
}
}
// Check for unicode escape sequences
if content.contains("\\u") {
let unicode_count = content.matches("\\u").count();
if unicode_count > 5 {
return Err(GitError::InvalidArgument(
"hook content contains unicode escapes (potential obfuscation)".into(),
));
}
}
Ok(())
}
+3 -3
View File
@@ -15,14 +15,14 @@ pub mod macros;
pub mod merge;
pub mod metrics;
pub mod oid;
pub mod rate_limit;
pub mod remote;
pub mod repository;
pub mod pack;
pub mod pack_cache;
pub mod paginate;
pub mod pb;
pub mod rate_limit;
pub mod refs;
pub mod remote;
pub mod repository;
pub mod sanitize;
pub mod server;
pub mod snapshot;
+10 -18
View File
@@ -15,7 +15,6 @@ use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::{Arc, OnceLock};
use std::time::{Duration, Instant};
struct MetricsInner {
/// Counter: total requests by (method, status_code)
/// Key: "method:status"
@@ -61,7 +60,6 @@ fn metrics() -> &'static Arc<MetricsInner> {
})
}
#[rustfmt::skip]
const DURATION_BUCKET_MS: &[u64] = &[
5, 10, 25, 50, 100, 250, 500, 1_000,
@@ -105,9 +103,7 @@ pub fn dec_active_requests() {
/// Set the repository count.
pub fn set_repository_count(count: u64) {
metrics()
.repository_count
.store(count, Ordering::Relaxed);
metrics().repository_count.store(count, Ordering::Relaxed);
}
/// Record a cache hit.
@@ -117,9 +113,7 @@ pub fn inc_cache_hits(count: u64) {
/// Record a cache miss.
pub fn inc_cache_misses(count: u64) {
metrics()
.cache_misses
.fetch_add(count, Ordering::Relaxed);
metrics().cache_misses.fetch_add(count, Ordering::Relaxed);
}
/// Record an error by kind (e.g., "not_found", "internal", "invalid_argument").
@@ -132,7 +126,6 @@ pub fn inc_error(kind: &str) {
.fetch_add(1, Ordering::Relaxed);
}
/// Render all metrics in Prometheus text exposition format.
pub fn render_metrics() -> String {
let m = metrics();
@@ -163,17 +156,15 @@ pub fn render_metrics() -> String {
let (method_and_status, count) = (entry.key(), entry.value());
let count = count.load(Ordering::Relaxed);
if let Some((method, status)) = method_and_status.rsplit_once(':') {
out.push_str(
&format!("gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n"),
);
out.push_str(&format!(
"gitks_requests_total{{method=\"{method}\",status=\"{status}\"}} {count}\n"
));
}
}
out.push('\n');
// Duration histogram
out.push_str(
"# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n",
);
out.push_str("# HELP gitks_request_duration_milliseconds Request duration histogram in ms\n");
out.push_str("# TYPE gitks_request_duration_milliseconds histogram\n");
for entry in &m.duration_buckets {
let (method_and_bound, count) = (entry.key(), entry.value());
@@ -215,7 +206,6 @@ pub fn render_metrics() -> String {
out
}
/// Start the metrics HTTP server on the given port.
/// Runs in a background task; returns the JoinHandle.
pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> {
@@ -256,12 +246,14 @@ async fn handle_metrics_connection(mut socket: tokio::net::TcpStream) {
body
);
let _ = tokio::time::timeout(Duration::from_secs(5), socket.write_all(response.as_bytes()))
let _ = tokio::time::timeout(
Duration::from_secs(5),
socket.write_all(response.as_bytes()),
)
.await;
let _ = socket.shutdown().await;
}
/// A guard that records metrics on drop.
///
/// Usage in handlers:
+30
View File
@@ -5,3 +5,33 @@ pub mod list_packfiles;
pub mod pack_objects;
pub mod receive_pack;
pub mod upload_pack;
/// A wrapper around ReceiverStream that cancels a token when dropped.
/// Used to properly clean up child processes when clients disconnect.
pub struct CancellableReceiverStream<T> {
inner: tokio_stream::wrappers::ReceiverStream<T>,
_cancel_guard: tokio_util::sync::DropGuard,
}
impl<T> CancellableReceiverStream<T> {
pub fn new(
inner: tokio_stream::wrappers::ReceiverStream<T>,
cancel_guard: tokio_util::sync::DropGuard,
) -> Self {
Self {
inner,
_cancel_guard: cancel_guard,
}
}
}
impl<T> tokio_stream::Stream for CancellableReceiverStream<T> {
type Item = T;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
std::pin::Pin::new(&mut self.inner).poll_next(cx)
}
}
+45 -7
View File
@@ -1,4 +1,5 @@
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
@@ -7,6 +8,10 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::bare::GitBare;
use crate::pb::ReceivePackResponse;
use super::CancellableReceiverStream;
/// Maximum time allowed for a git receive-pack process before it is killed.
const RECEIVE_PACK_TIMEOUT: Duration = Duration::from_secs(1800); // 30 minutes
impl GitBare {
/// Receive pack data using git-receive-pack with true concurrent streaming.
@@ -23,7 +28,7 @@ impl GitBare {
input: impl tokio_stream::Stream<Item = Result<crate::pb::ReceivePackRequest, tonic::Status>>
+ Send
+ 'static,
) -> Result<ReceiverStream<Result<ReceivePackResponse, tonic::Status>>, tonic::Status> {
) -> Result<CancellableReceiverStream<Result<ReceivePackResponse, tonic::Status>>, tonic::Status> {
let bare_dir = self.bare_dir.to_string_lossy().into_owned();
tracing::info!(
repo = %bare_dir,
@@ -33,6 +38,10 @@ impl GitBare {
let (tx, rx) = tokio::sync::mpsc::channel(16);
// Use a cancellation token to track client disconnect
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
let stream = Box::pin(input);
tokio::spawn(async move {
let stream = stream;
@@ -59,15 +68,20 @@ impl GitBare {
}
};
let child_id = child.id();
let mut stdin = child.stdin.take();
let mut stdout = child.stdout.take();
let mut stderr = child.stderr.take();
let stdin_task = {
let mut stream = stream;
let cancel = cancel_token.clone();
async move {
if let Some(mut stdin) = stdin.take() {
while let Some(result) = stream.next().await {
if cancel.is_cancelled() {
break;
}
match result {
Ok(req) => {
if stdin.write_all(&req.packet).await.is_err() {
@@ -87,10 +101,14 @@ impl GitBare {
let stdout_task = {
let tx = tx.clone();
let cancel = cancel_token.clone();
async move {
if let Some(mut stdout) = stdout.take() {
let mut buf = vec![0u8; 65536];
loop {
if cancel.is_cancelled() {
break;
}
match stdout.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
@@ -129,25 +147,45 @@ impl GitBare {
}
};
tokio::join!(stdin_task, stdout_task, stderr_task);
// Run all three concurrently with timeout
let _process_future = tokio::join!(stdin_task, stdout_task, stderr_task);
match child.wait().await {
Ok(status) if !status.success() => {
match tokio::time::timeout(RECEIVE_PACK_TIMEOUT, child.wait()).await {
Ok(Ok(status)) => {
if !status.success() {
let _ = tx
.send(Err(tonic::Status::internal(
"git receive-pack exited with error",
)))
.await;
}
Err(e) => {
}
Ok(Err(e)) => {
let _ = tx
.send(Err(tonic::Status::internal(format!("wait error: {e}"))))
.await;
}
_ => {}
Err(_timeout) => {
tracing::warn!(
repo = %bare_dir,
pid = ?child_id,
timeout_secs = RECEIVE_PACK_TIMEOUT.as_secs(),
"git receive-pack timed out, killing"
);
let _ = child.kill().await;
let _ = tx
.send(Err(tonic::Status::deadline_exceeded(
"git receive-pack timed out",
)))
.await;
}
}
});
Ok(ReceiverStream::new(rx))
// When the ReceiverStream is dropped (client disconnect), cancel the background task
let rx_stream = ReceiverStream::new(rx);
let cancel_guard = cancel_token_clone.clone().drop_guard();
Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard))
}
}
+45 -9
View File
@@ -1,4 +1,5 @@
use std::process::Stdio;
use std::time::Duration;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::process::Command;
@@ -7,6 +8,10 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::bare::GitBare;
use crate::pb::UploadPackResponse;
use super::CancellableReceiverStream;
/// Maximum time allowed for a git upload-pack process before it is killed.
const UPLOAD_PACK_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes
impl GitBare {
/// Upload pack data using git-upload-pack with true concurrent streaming.
@@ -23,7 +28,7 @@ impl GitBare {
input: impl tokio_stream::Stream<Item = Result<crate::pb::UploadPackRequest, tonic::Status>>
+ Send
+ 'static,
) -> Result<ReceiverStream<Result<UploadPackResponse, tonic::Status>>, tonic::Status> {
) -> Result<CancellableReceiverStream<Result<UploadPackResponse, tonic::Status>>, tonic::Status> {
let bare_dir = self.bare_dir.to_string_lossy().into_owned();
tracing::info!(
repo = %bare_dir,
@@ -33,6 +38,10 @@ impl GitBare {
let (tx, rx) = tokio::sync::mpsc::channel(16);
// Use a cancellation token to track client disconnect
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_token_clone = cancel_token.clone();
// Move input into the spawned task to make it 'static
let stream = Box::pin(input);
tokio::spawn(async move {
@@ -60,6 +69,7 @@ impl GitBare {
}
};
let child_id = child.id();
let mut stdin = child.stdin.take();
let mut stdout = child.stdout.take();
let mut stderr = child.stderr.take();
@@ -67,9 +77,13 @@ impl GitBare {
// Concurrent: write stdin packets, read stdout chunks, read stderr
let stdin_task = {
let mut stream = stream;
let cancel = cancel_token.clone();
async move {
if let Some(mut stdin) = stdin.take() {
while let Some(result) = stream.next().await {
if cancel.is_cancelled() {
break;
}
match result {
Ok(req) => {
if stdin.write_all(&req.packet).await.is_err() {
@@ -90,10 +104,14 @@ impl GitBare {
let stdout_task = {
let tx = tx.clone();
let cancel = cancel_token.clone();
async move {
if let Some(mut stdout) = stdout.take() {
let mut buf = vec![0u8; 65536];
loop {
if cancel.is_cancelled() {
break;
}
match stdout.read(&mut buf).await {
Ok(0) => break,
Ok(n) => {
@@ -132,27 +150,45 @@ impl GitBare {
}
};
// Run all three concurrently
tokio::join!(stdin_task, stdout_task, stderr_task);
// Run all three concurrently with timeout
let _process_future = tokio::join!(stdin_task, stdout_task, stderr_task);
// Wait for child exit
match child.wait().await {
Ok(status) if !status.success() => {
match tokio::time::timeout(UPLOAD_PACK_TIMEOUT, child.wait()).await {
Ok(Ok(status)) => {
if !status.success() {
let _ = tx
.send(Err(tonic::Status::internal(
"git upload-pack exited with error",
)))
.await;
}
Err(e) => {
}
Ok(Err(e)) => {
let _ = tx
.send(Err(tonic::Status::internal(format!("wait error: {e}"))))
.await;
}
_ => {}
Err(_timeout) => {
tracing::warn!(
repo = %bare_dir,
pid = ?child_id,
timeout_secs = UPLOAD_PACK_TIMEOUT.as_secs(),
"git upload-pack timed out, killing"
);
let _ = child.kill().await;
let _ = tx
.send(Err(tonic::Status::deadline_exceeded(
"git upload-pack timed out",
)))
.await;
}
}
});
Ok(ReceiverStream::new(rx))
// When the ReceiverStream is dropped (client disconnect), cancel the background task
let rx_stream = ReceiverStream::new(rx);
let cancel_guard = cancel_token_clone.clone().drop_guard();
Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard))
}
}
+12 -4
View File
@@ -26,11 +26,15 @@ pub const INFO_REFS_NAMESPACE: &str = "+gitks-cache/info_refs";
#[derive(Debug, Clone)]
pub struct PackCache {
disk_cache: DiskCache,
backpressure_enabled: bool,
}
impl PackCache {
pub fn new(disk_cache: DiskCache, _backpressure: bool) -> Self {
Self { disk_cache }
pub fn new(disk_cache: DiskCache, backpressure: bool) -> Self {
Self {
disk_cache,
backpressure_enabled: backpressure,
}
}
pub fn is_enabled(&self) -> bool {
@@ -63,7 +67,8 @@ impl PackCache {
tracing::info!(digest = %digest, "pack-objects cache hit, streaming from disk");
let (tx, rx) = tokio::sync::mpsc::channel(16);
let channel_size = if self.backpressure_enabled { 4 } else { 256 };
let (tx, rx) = tokio::sync::mpsc::channel(channel_size);
let sender = tx.clone();
tokio::spawn(async move {
@@ -104,12 +109,15 @@ impl PackCache {
/// Stream pack-objects output while simultaneously writing to cache.
/// This is the "tee" approach: data flows to both the client and the cache file.
/// When backpressure is enabled, uses a small channel to slow the producer
/// if the consumer is slow. Otherwise uses a large channel for max throughput.
pub fn tee_pack_stream(
&self,
digest: &str,
source: ReceiverStream<Result<PackfileChunk, tonic::Status>>,
) -> ReceiverStream<Result<PackfileChunk, tonic::Status>> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
let channel_size = if self.backpressure_enabled { 4 } else { 256 };
let (tx, rx) = tokio::sync::mpsc::channel(channel_size);
if !self.is_enabled() {
tokio::spawn(async move {
+51 -32
View File
@@ -9,10 +9,9 @@
//! // guard is dropped here → permit released
use dashmap::DashMap;
use std::sync::{Arc, OnceLock};
use std::sync::{Arc, OnceLock, RwLock};
use tokio::sync::Semaphore;
/// Default max concurrent operations per repository.
const DEFAULT_MAX_CONCURRENT: usize = 5;
@@ -20,8 +19,8 @@ const DEFAULT_MAX_CONCURRENT: usize = 5;
struct RateLimiter {
/// Per-repository semaphores. Key = repository relative_path.
semaphores: DashMap<String, Arc<Semaphore>>,
/// Max concurrent operations per repository.
max_concurrent: usize,
/// Max concurrent operations per repository (protected by RwLock for runtime updates).
max_concurrent: RwLock<usize>,
}
static RATE_LIMITER: OnceLock<RateLimiter> = OnceLock::new();
@@ -40,11 +39,18 @@ fn limiter() -> &'static RateLimiter {
RateLimiter {
semaphores: DashMap::new(),
max_concurrent: max,
max_concurrent: RwLock::new(max),
}
})
}
/// Get the current max_concurrent value.
fn get_max_concurrent() -> usize {
*limiter()
.max_concurrent
.read()
.unwrap_or_else(|e| e.into_inner())
}
/// A guard that holds a rate-limit permit. The permit is released on drop.
pub struct RateLimitGuard {
@@ -63,21 +69,21 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard>
if repo.is_empty() {
return None;
}
let l = limiter();
if l.max_concurrent == 0 {
let max_concurrent = get_max_concurrent();
if max_concurrent == 0 {
// Unlimited
return None;
}
let sem = l
let sem = limiter()
.semaphores
.entry(repo.to_string())
.or_insert_with(|| Arc::new(Semaphore::new(l.max_concurrent)))
.or_insert_with(|| Arc::new(Semaphore::new(max_concurrent)))
.value()
.clone();
// Release DashMap reference before awaiting
let _ = l;
let _ = repo;
match tokio::time::timeout(
std::time::Duration::from_secs(30),
@@ -87,7 +93,7 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard>
{
Ok(Ok(permit)) => {
tracing::debug!(
repo = %repo,
repo = %repo_relative_path.unwrap_or(""),
available = sem.available_permits(),
"rate limit permit acquired"
);
@@ -96,10 +102,10 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard>
Ok(Err(_closed)) => {
// Semaphore was closed — recreate it
tracing::warn!(
repo = %repo,
repo = %repo_relative_path.unwrap_or(""),
"rate limit semaphore closed, recreating"
);
let new_sem = Arc::new(Semaphore::new(limiter().max_concurrent));
let new_sem = Arc::new(Semaphore::new(get_max_concurrent()));
let permit = new_sem
.clone()
.acquire_owned()
@@ -107,13 +113,13 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard>
.expect("newly created semaphore should have permits");
limiter()
.semaphores
.insert(repo.to_string(), new_sem);
.insert(repo_relative_path.unwrap_or("").to_string(), new_sem);
Some(RateLimitGuard { _permit: permit })
}
Err(_elapsed) => {
tracing::warn!(
repo = %repo,
max_concurrent = limiter().max_concurrent,
repo = %repo_relative_path.unwrap_or(""),
max_concurrent = get_max_concurrent(),
"rate limit timeout waiting for permit"
);
None
@@ -122,7 +128,9 @@ pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard>
}
/// Acquire a rate-limit permit, returning a tonic error on timeout / overload.
pub async fn acquire_or_reject(repo_relative_path: Option<&str>) -> Result<Option<RateLimitGuard>, tonic::Status> {
pub async fn acquire_or_reject(
repo_relative_path: Option<&str>,
) -> Result<Option<RateLimitGuard>, tonic::Status> {
let repo = repo_relative_path.unwrap_or("");
if repo.is_empty() {
return Ok(None);
@@ -130,13 +138,13 @@ pub async fn acquire_or_reject(repo_relative_path: Option<&str>) -> Result<Optio
match acquire(Some(repo)).await {
Some(guard) => Ok(Some(guard)),
None => {
if limiter().max_concurrent == 0 {
if get_max_concurrent() == 0 {
return Ok(None);
}
// Timeout — reject with resource exhausted
Err(tonic::Status::resource_exhausted(format!(
"rate limit exceeded for repository '{repo}': max {max} concurrent operations",
max = limiter().max_concurrent
max = get_max_concurrent()
)))
}
}
@@ -149,22 +157,33 @@ pub fn remove_repository(repo_relative_path: &str) {
}
/// Update the max concurrent limit at runtime.
/// This properly updates the limit and recreates all existing semaphores.
pub fn set_max_concurrent(max: usize) {
let l = limiter();
// We can't modify the field directly through OnceLock, but we can
// update existing semaphores to add or remove permits as needed.
// For a simpler approach, just log and let new semaphores use the new value.
// Since max_concurrent is only read on insert, we use a separate atomic.
use std::sync::atomic::AtomicUsize;
static OVERRIDE: std::sync::atomic::AtomicUsize = AtomicUsize::new(0);
OVERRIDE.store(max, std::sync::atomic::Ordering::Relaxed);
// Recreate all existing semaphores
for entry in &l.semaphores {
let _old = l.semaphores.insert(
entry.key().clone(),
Arc::new(Semaphore::new(max)),
);
// Update the max_concurrent value
match l.max_concurrent.write() {
Ok(mut guard) => {
*guard = max;
}
Err(e) => {
// Poisoned lock - recover and update
let mut guard = e.into_inner();
*guard = max;
}
}
// Recreate all existing semaphores with the new limit
let keys: Vec<String> = l
.semaphores
.iter()
.map(|entry| entry.key().clone())
.collect();
for key in keys {
l.semaphores
.insert(key, Arc::new(Semaphore::new(max)));
}
tracing::info!(max_concurrent = max, "rate limit max_concurrent updated");
}
+6 -2
View File
@@ -5,7 +5,10 @@ use crate::pb::*;
impl GitBare {
/// Find all refs pointing to a given OID.
pub fn find_refs_by_oid(&self, request: FindRefsByOidRequest) -> GitResult<FindRefsByOidResponse> {
pub fn find_refs_by_oid(
&self,
request: FindRefsByOidRequest,
) -> GitResult<FindRefsByOidResponse> {
crate::sanitize::validate_revision(&request.oid)?;
let mut args = vec![
@@ -138,7 +141,8 @@ fn simple_glob_match(pattern: &str, name: &str) -> bool {
star_pi = Some(pi);
star_ni = ni;
pi += 1;
} else if pi < pat_bytes.len() && ni < name_bytes.len()
} else if pi < pat_bytes.len()
&& ni < name_bytes.len()
&& (pat_bytes[pi] == b'?' || pat_bytes[pi] == name_bytes[ni])
{
pi += 1;
+27 -9
View File
@@ -4,7 +4,10 @@ use crate::pb::*;
impl GitBare {
/// Update multiple refs atomically using `git update-ref --stdin`.
pub fn update_references(&self, request: UpdateReferencesRequest) -> GitResult<UpdateReferencesResponse> {
pub fn update_references(
&self,
request: UpdateReferencesRequest,
) -> GitResult<UpdateReferencesResponse> {
let mut stdin_input = String::new();
for update in &request.updates {
crate::sanitize::validate_ref_name(&update.ref_name)?;
@@ -16,10 +19,7 @@ impl GitBare {
update.ref_name, update.new_oid, update.old_oid
));
} else {
stdin_input.push_str(&format!(
"update {} {}\n",
update.ref_name, update.new_oid
));
stdin_input.push_str(&format!("update {} {}\n", update.ref_name, update.new_oid));
}
}
if stdin_input.is_empty() {
@@ -27,7 +27,13 @@ impl GitBare {
}
let output = std::process::Command::new("git")
.args(["--git-dir", &self.bare_dir.to_string_lossy(), "update-ref", "--stdin", "-z"])
.args([
"--git-dir",
&self.bare_dir.to_string_lossy(),
"update-ref",
"--stdin",
"-z",
])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -119,21 +125,33 @@ impl GitBare {
});
}
Ok(WriteRefResponse { ok: true, error: String::new() })
Ok(WriteRefResponse {
ok: true,
error: String::new(),
})
}
/// Check if a ref exists.
pub fn ref_exists(&self, request: RefExistsRequest) -> GitResult<RefExistsResponse> {
crate::sanitize::validate_ref_name(&request.ref_name)?;
let repo = self.gix_repo()?;
let exists = repo.try_find_reference(&request.ref_name).ok().flatten().is_some();
let exists = repo
.try_find_reference(&request.ref_name)
.ok()
.flatten()
.is_some();
Ok(RefExistsResponse { exists })
}
/// Find the default branch name.
pub fn find_default_branch_name(&self) -> GitResult<FindDefaultBranchNameResponse> {
let result = std::process::Command::new("git")
.args(["--git-dir", &self.bare_dir.to_string_lossy(), "symbolic-ref", "HEAD"])
.args([
"--git-dir",
&self.bare_dir.to_string_lossy(),
"symbolic-ref",
"HEAD",
])
.output()
.map_err(|e| crate::error::GitError::CommandFailed {
status_code: None,
+28 -7
View File
@@ -2,9 +2,14 @@ use crate::error::GitResult;
use crate::pb::*;
/// Discover remote refs via `git ls-remote`.
pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult<FindRemoteRepositoryResponse> {
pub fn find_remote_repository(
request: FindRemoteRepositoryRequest,
) -> GitResult<FindRemoteRepositoryResponse> {
if request.remote_url.is_empty() {
return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false });
return Ok(FindRemoteRepositoryResponse {
refs: vec![],
exists: false,
});
}
let output = std::process::Command::new("git")
@@ -20,9 +25,15 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
if stderr.contains("Could not resolve host") || stderr.contains("Repository not found") {
return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false });
return Ok(FindRemoteRepositoryResponse {
refs: vec![],
exists: false,
});
}
return Ok(FindRemoteRepositoryResponse { refs: vec![], exists: false });
return Ok(FindRemoteRepositoryResponse {
refs: vec![],
exists: false,
});
}
let stdout = String::from_utf8_lossy(&output.stdout);
@@ -43,7 +54,11 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult
ref_name: name.to_string(),
target_oid: String::new(),
symbolic: true,
symbolic_target: target.strip_prefix("ref:").unwrap_or(target).trim().to_string(),
symbolic_target: target
.strip_prefix("ref:")
.unwrap_or(target)
.trim()
.to_string(),
});
}
} else if let Some((oid, name)) = line.split_once('\t') {
@@ -60,7 +75,9 @@ pub fn find_remote_repository(request: FindRemoteRepositoryRequest) -> GitResult
}
/// Find the root ref (HEAD) of a remote repository.
pub fn find_remote_root_ref(request: FindRemoteRootRefRequest) -> GitResult<FindRemoteRootRefResponse> {
pub fn find_remote_root_ref(
request: FindRemoteRootRefRequest,
) -> GitResult<FindRemoteRootRefResponse> {
let output = std::process::Command::new("git")
.args(["ls-remote", "--symref", &request.remote_url, "HEAD"])
.stdout(std::process::Stdio::piped())
@@ -76,7 +93,11 @@ pub fn find_remote_root_ref(request: FindRemoteRootRefRequest) -> GitResult<Find
let line = line.trim();
if line.starts_with("ref:") {
if let Some((target, _name)) = line.split_once('\t') {
let ref_name = target.strip_prefix("ref:").unwrap_or(target).trim().to_string();
let ref_name = target
.strip_prefix("ref:")
.unwrap_or(target)
.trim()
.to_string();
return Ok(FindRemoteRootRefResponse {
ref_name,
target_oid: String::new(),
+28 -7
View File
@@ -4,8 +4,15 @@ use crate::pb::*;
impl GitBare {
/// Update mirror from a remote URL (fetch + update all refs).
pub fn update_remote_mirror(&self, request: UpdateRemoteMirrorRequest) -> GitResult<UpdateRemoteMirrorResponse> {
let remote_name = if request.remote_name.is_empty() { "origin" } else { &request.remote_name };
pub fn update_remote_mirror(
&self,
request: UpdateRemoteMirrorRequest,
) -> GitResult<UpdateRemoteMirrorResponse> {
let remote_name = if request.remote_name.is_empty() {
"origin"
} else {
&request.remote_name
};
// Add or update remote
let remote_check = std::process::Command::new("git")
@@ -114,12 +121,19 @@ impl GitBare {
}
}
Ok(UpdateRemoteMirrorResponse { ok: true, error: String::new() })
Ok(UpdateRemoteMirrorResponse {
ok: true,
error: String::new(),
})
}
/// Fetch from a remote URL without mirroring.
pub fn fetch_remote(&self, request: FetchRemoteRequest) -> GitResult<FetchRemoteResponse> {
let remote_name = if request.remote_name.is_empty() { "origin" } else { &request.remote_name };
let remote_name = if request.remote_name.is_empty() {
"origin"
} else {
&request.remote_name
};
// Ensure remote exists
let exists = std::process::Command::new("git")
@@ -158,8 +172,12 @@ impl GitBare {
remote_name.to_string(),
];
if request.prune { args.push("--prune".to_string()); }
if request.force { args.push("--force".to_string()); }
if request.prune {
args.push("--prune".to_string());
}
if request.force {
args.push("--force".to_string());
}
if request.refspecs.is_empty() {
args.push("+refs/heads/*:refs/heads/*".to_string());
@@ -187,7 +205,10 @@ impl GitBare {
});
}
Ok(FetchRemoteResponse { ok: true, error: String::new() })
Ok(FetchRemoteResponse {
ok: true,
error: String::new(),
})
}
/// Clone a repository from a remote URL (bare + mirror).
+10 -5
View File
@@ -6,9 +6,15 @@ impl GitBare {
/// Detect license by reading LICENSE/COPYING files and doing basic matching.
pub fn find_license(&self) -> GitResult<FindLicenseResponse> {
let possible_paths = [
"LICENSE", "LICENSE.md", "LICENSE.txt",
"LICENCE", "LICENCE.md", "LICENCE.txt",
"COPYING", "COPYING.md", "COPYING.txt",
"LICENSE",
"LICENSE.md",
"LICENSE.txt",
"LICENCE",
"LICENCE.md",
"LICENCE.txt",
"COPYING",
"COPYING.md",
"COPYING.txt",
"UNLICENSE",
];
@@ -102,8 +108,7 @@ fn detect_license(content: &str) -> (&'static str, &'static str, f64) {
}
// ISC
if lower.contains("permission to use, copy, modify, and/or distribute")
&& lower.contains("isc")
if lower.contains("permission to use, copy, modify, and/or distribute") && lower.contains("isc")
{
return ("ISC", "ISC License", 0.80);
}
+11 -3
View File
@@ -4,7 +4,10 @@ use crate::pb::*;
impl GitBare {
/// Find the best merge base for a set of revisions (OIDs).
pub fn find_merge_base(&self, request: FindMergeBaseRequest) -> GitResult<FindMergeBaseResponse> {
pub fn find_merge_base(
&self,
request: FindMergeBaseRequest,
) -> GitResult<FindMergeBaseResponse> {
if request.revisions.is_empty() {
return Ok(FindMergeBaseResponse::default());
}
@@ -49,7 +52,10 @@ impl GitBare {
}
/// Check if one commit is an ancestor of another.
pub fn commit_is_ancestor(&self, request: CommitIsAncestorRequest) -> GitResult<CommitIsAncestorResponse> {
pub fn commit_is_ancestor(
&self,
request: CommitIsAncestorRequest,
) -> GitResult<CommitIsAncestorResponse> {
crate::sanitize::validate_revision(&request.ancestor_oid)?;
crate::sanitize::validate_revision(&request.descendant_oid)?;
@@ -68,6 +74,8 @@ impl GitBare {
.map(|s| s.success())
.unwrap_or(false);
Ok(CommitIsAncestorResponse { is_ancestor: result })
Ok(CommitIsAncestorResponse {
is_ancestor: result,
})
}
}
+4 -3
View File
@@ -42,11 +42,12 @@ impl GitBare {
})?;
}
let output = child.wait_with_output().map_err(|e| {
crate::error::GitError::CommandFailed {
let output =
child
.wait_with_output()
.map_err(|e| crate::error::GitError::CommandFailed {
status_code: None,
stderr: e.to_string(),
}
})?;
let stdout = String::from_utf8_lossy(&output.stdout);
+56 -17
View File
@@ -4,8 +4,12 @@ use crate::pb::*;
impl GitBare {
/// Run heuristic optimization based on repo state.
pub fn optimize_repository(&self, request: OptimizeRepositoryRequest) -> GitResult<OptimizeRepositoryResponse> {
let strategy = OptimizeStrategy::try_from(request.strategy).unwrap_or(OptimizeStrategy::Heuristic);
pub fn optimize_repository(
&self,
request: OptimizeRepositoryRequest,
) -> GitResult<OptimizeRepositoryResponse> {
let strategy =
OptimizeStrategy::try_from(request.strategy).unwrap_or(OptimizeStrategy::Heuristic);
let mut stdout_all = String::new();
let mut stderr_all = String::new();
@@ -17,7 +21,9 @@ impl GitBare {
// Run commit-graph write if needed
if stats.commit_graph_size_bytes == 0 || strategy == OptimizeStrategy::Aggressive {
if let Ok(resp) = write_commit_graph(self, false, false) {
if !resp.ok { stderr_all.push_str(&resp.stderr); }
if !resp.ok {
stderr_all.push_str(&resp.stderr);
}
stdout_all.push_str(&resp.stdout);
}
}
@@ -28,7 +34,9 @@ impl GitBare {
if repack_needed || strategy == OptimizeStrategy::Aggressive {
let full = strategy == OptimizeStrategy::Aggressive;
if let Ok(resp) = run_repack(self, full, true, true) {
if !resp.ok { stderr_all.push_str(&resp.stderr); }
if !resp.ok {
stderr_all.push_str(&resp.stderr);
}
stdout_all.push_str(&resp.stdout);
}
}
@@ -36,7 +44,9 @@ impl GitBare {
// Prune if aggressive
if strategy == OptimizeStrategy::Aggressive {
if let Ok(resp) = run_gc(self, true, true) {
if !resp.ok { stderr_all.push_str(&resp.stderr); }
if !resp.ok {
stderr_all.push_str(&resp.stderr);
}
stdout_all.push_str(&resp.stdout);
}
}
@@ -44,7 +54,9 @@ impl GitBare {
OptimizeStrategy::Incremental => {
// Just run commit-graph write incrementally
if let Ok(resp) = write_commit_graph(self, false, false) {
if !resp.ok { stderr_all.push_str(&resp.stderr); }
if !resp.ok {
stderr_all.push_str(&resp.stderr);
}
stdout_all.push_str(&resp.stdout);
}
}
@@ -79,7 +91,10 @@ impl GitBare {
// Check commit-graph
let cg_size = std::fs::metadata(
self.bare_dir.join("objects").join("info").join("commit-graph")
self.bare_dir
.join("objects")
.join("info")
.join("commit-graph"),
)
.map(|m| m.len())
.unwrap_or(0);
@@ -96,11 +111,18 @@ impl GitBare {
}
}
fn write_commit_graph(gb: &GitBare, _split: bool, _replace: bool) -> GitResult<RepositoryMaintenanceResponse> {
fn write_commit_graph(
gb: &GitBare,
_split: bool,
_replace: bool,
) -> GitResult<RepositoryMaintenanceResponse> {
let out = std::process::Command::new("git")
.args([
"--git-dir", &gb.bare_dir.to_string_lossy(),
"commit-graph", "write", "--reachable",
"--git-dir",
&gb.bare_dir.to_string_lossy(),
"commit-graph",
"write",
"--reachable",
])
.stdout(std::process::Stdio::piped())
.stderr(std::process::Stdio::piped())
@@ -117,13 +139,25 @@ fn write_commit_graph(gb: &GitBare, _split: bool, _replace: bool) -> GitResult<R
})
}
fn run_repack(gb: &GitBare, full: bool, bitmaps: bool, _midx: bool) -> GitResult<RepositoryMaintenanceResponse> {
fn run_repack(
gb: &GitBare,
full: bool,
bitmaps: bool,
_midx: bool,
) -> GitResult<RepositoryMaintenanceResponse> {
let mut args = vec![
"--git-dir".to_string(), gb.bare_dir.to_string_lossy().into_owned(),
"--git-dir".to_string(),
gb.bare_dir.to_string_lossy().into_owned(),
"repack".to_string(),
];
if full { args.push("-ad".to_string()); } else { args.push("-d".to_string()); }
if bitmaps { args.push("--write-bitmap-index".to_string()); }
if full {
args.push("-ad".to_string());
} else {
args.push("-d".to_string());
}
if bitmaps {
args.push("--write-bitmap-index".to_string());
}
let out = std::process::Command::new("git")
.args(&args)
@@ -144,11 +178,16 @@ fn run_repack(gb: &GitBare, full: bool, bitmaps: bool, _midx: bool) -> GitResult
fn run_gc(gb: &GitBare, prune: bool, aggressive: bool) -> GitResult<RepositoryMaintenanceResponse> {
let mut args = vec![
"--git-dir".to_string(), gb.bare_dir.to_string_lossy().into_owned(),
"--git-dir".to_string(),
gb.bare_dir.to_string_lossy().into_owned(),
"gc".to_string(),
];
if prune { args.push("--prune=now".to_string()); }
if aggressive { args.push("--aggressive".to_string()); }
if prune {
args.push("--prune=now".to_string());
}
if aggressive {
args.push("--aggressive".to_string());
}
let out = std::process::Command::new("git")
.args(&args)
+15 -5
View File
@@ -4,7 +4,10 @@ use crate::pb::*;
impl GitBare {
/// Get raw changes between two revisions (file-level changes only, no diff content).
pub fn get_raw_changes(&self, request: GetRawChangesRequest) -> GitResult<GetRawChangesResponse> {
pub fn get_raw_changes(
&self,
request: GetRawChangesRequest,
) -> GitResult<GetRawChangesResponse> {
crate::sanitize::validate_revision(&request.base)?;
crate::sanitize::validate_revision(&request.head)?;
@@ -32,11 +35,15 @@ impl GitBare {
for line in stdout.lines() {
let line = line.trim();
if !line.starts_with(':') { continue; }
if !line.starts_with(':') {
continue;
}
let line = &line[1..];
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 5 { continue; }
if parts.len() < 5 {
continue;
}
let old_mode = u32::from_str_radix(parts[0], 8).unwrap_or(0);
let new_mode = u32::from_str_radix(parts[1], 8).unwrap_or(0);
@@ -55,11 +62,14 @@ impl GitBare {
};
let (old_path, new_path) = if parts.len() >= 6 {
(parts[5].to_string(), if status_letter == 'R' || status_letter == 'C' {
(
parts[5].to_string(),
if status_letter == 'R' || status_letter == 'C' {
parts.get(6).map(|s| s.to_string()).unwrap_or_default()
} else {
String::new()
})
},
)
} else {
(String::new(), String::new())
};
+28 -6
View File
@@ -4,11 +4,22 @@ use crate::pb::*;
impl GitBare {
/// Search file contents with a regex pattern.
pub fn search_files_by_content(&self, request: SearchFilesByContentRequest) -> GitResult<SearchFilesByContentResponse> {
pub fn search_files_by_content(
&self,
request: SearchFilesByContentRequest,
) -> GitResult<SearchFilesByContentResponse> {
crate::sanitize::validate_revision(&request.revision)?;
let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision };
let max_results = if request.max_results == 0 { 100 } else { request.max_results };
let revision = if request.revision.is_empty() {
"HEAD"
} else {
&request.revision
};
let max_results = if request.max_results == 0 {
100
} else {
request.max_results
};
let mut args = vec![
"--git-dir".to_string(),
@@ -62,11 +73,22 @@ impl GitBare {
}
/// Search file names matching a pattern.
pub fn search_files_by_name(&self, request: SearchFilesByNameRequest) -> GitResult<SearchFilesByNameResponse> {
let revision = if request.revision.is_empty() { "HEAD" } else { &request.revision };
pub fn search_files_by_name(
&self,
request: SearchFilesByNameRequest,
) -> GitResult<SearchFilesByNameResponse> {
let revision = if request.revision.is_empty() {
"HEAD"
} else {
&request.revision
};
crate::sanitize::validate_revision(revision)?;
let max_results = if request.max_results == 0 { 100 } else { request.max_results };
let max_results = if request.max_results == 0 {
100
} else {
request.max_results
};
let mut args = vec![
"--git-dir".to_string(),
-2
View File
@@ -276,7 +276,6 @@ impl commit_service_server::CommitService for GitksService {
Ok(tonic::Response::new(resp))
}
async fn find_commit(
&self,
request: tonic::Request<FindCommitRequest>,
@@ -368,7 +367,6 @@ impl commit_service_server::CommitService for GitksService {
Ok(tonic::Response::new(resp))
}
async fn count_commits(
&self,
request: tonic::Request<CountCommitsRequest>,
+4 -4
View File
@@ -170,9 +170,10 @@ impl diff_service_server::DiffService for GitksService {
Ok(tonic::Response::new(resp))
}
type RawDiffStream = tokio_stream::wrappers::ReceiverStream<Result<RawDiffResponse, tonic::Status>>;
type RawPatchStream = tokio_stream::wrappers::ReceiverStream<Result<RawPatchResponse, tonic::Status>>;
type RawDiffStream =
tokio_stream::wrappers::ReceiverStream<Result<RawDiffResponse, tonic::Status>>;
type RawPatchStream =
tokio_stream::wrappers::ReceiverStream<Result<RawPatchResponse, tonic::Status>>;
async fn raw_diff(
&self,
@@ -200,7 +201,6 @@ impl diff_service_server::DiffService for GitksService {
Ok(tonic::Response::new(into_stream(chunks)))
}
async fn find_changed_paths(
&self,
request: tonic::Request<FindChangedPathsRequest>,
+81 -13
View File
@@ -45,9 +45,11 @@ mod repository_maint;
mod tag;
mod tree;
use dashmap::DashMap;
use gix::discover::is_git;
use ractor::{ActorCell, ActorRef};
use std::path::{Path, PathBuf};
use std::time::{Duration, Instant};
use tokio_stream::wrappers::ReceiverStream;
use crate::actor::message::{GitNodeMessage, RouteDecision};
@@ -59,6 +61,16 @@ use crate::pb::{
remote_service_server, repository_service_server, tag_service_server, tree_service_server,
};
/// TTL for route cache entries.
const ROUTE_CACHE_TTL: Duration = Duration::from_secs(60); // 1 minute
/// A cached route entry with creation time.
#[derive(Clone)]
pub struct CachedRoute {
pub decision: RouteDecision,
pub created_at: Instant,
}
#[derive(Clone)]
pub struct GitksService {
pub repo_prefix: PathBuf,
@@ -67,6 +79,7 @@ pub struct GitksService {
pub disk_cache: Option<crate::disk_cache::DiskCache>,
pub pack_cache: Option<crate::pack_cache::PackCache>,
pub hook_manager: Option<crate::hooks::HookManager>,
pub route_cache: DashMap<String, CachedRoute>,
}
impl GitksService {
@@ -78,6 +91,7 @@ impl GitksService {
disk_cache: None,
pack_cache: None,
hook_manager: None,
route_cache: DashMap::new(),
}
}
@@ -125,6 +139,22 @@ impl GitksService {
is_write: bool,
) -> Result<Option<RouteDecision>, tonic::Status> {
use crate::actor::message::{ROLE_PRIMARY, ROLE_REPLICA};
// Check route cache for read requests
if !is_write
&& let Some(cached) = self.route_cache.get(&header.relative_path)
&& !cached.decision.grpc_addr.is_empty()
&& cached.decision.found
&& cached.created_at.elapsed() < ROUTE_CACHE_TTL
{
tracing::debug!(
relative_path = %header.relative_path,
grpc_addr = %cached.decision.grpc_addr,
"route cache hit"
);
return Ok(Some(cached.decision.clone()));
}
let members = ractor::pg::get_members(&"gitks_nodes".to_string());
let local = self.node_actor.as_ref().map(|actor| actor.get_cell());
let mut primary: Option<RouteDecision> = None;
@@ -152,19 +182,31 @@ impl GitksService {
replica = Some(decision);
}
}
if let Some(p) = primary {
return Ok(Some(p));
}
if let Some(r) = replica {
let result = if let Some(p) = primary {
Some(p)
} else if let Some(r) = replica {
tracing::info!(
storage_name = %r.storage_name,
relative_path = %r.relative_path,
"read request routed to replica"
);
return Ok(Some(r));
}
Some(r)
} else {
let _ = ROLE_PRIMARY;
Ok(None)
None
};
// Cache result for read requests
if let Some(ref decision) = result {
self.route_cache.insert(
header.relative_path.clone(),
CachedRoute {
decision: decision.clone(),
created_at: Instant::now(),
},
);
}
Ok(result)
}
fn repo_label(&self, header: Option<&crate::pb::RepositoryHeader>) -> String {
@@ -180,7 +222,10 @@ impl GitksService {
}
/// Get the relative path from a repository header, if any.
pub(crate) fn repo_relative_path<'a>(&self, header: Option<&'a crate::pb::RepositoryHeader>) -> Option<&'a str> {
pub(crate) fn repo_relative_path<'a>(
&self,
header: Option<&'a crate::pb::RepositoryHeader>,
) -> Option<&'a str> {
header.and_then(|h| {
if h.relative_path.is_empty() {
None
@@ -287,6 +332,9 @@ impl GitksService {
// Invalidate moka caches
crate::server::cache::invalidate_repo(relative_path);
// Invalidate route cache
self.route_cache.remove(relative_path);
// Invalidate disk cache
if let Some(ref pc) = self.pack_cache {
pc.invalidate_repo(relative_path);
@@ -421,7 +469,7 @@ pub(crate) fn into_stream<T: Send + 'static>(
ReceiverStream::new(rx)
}
pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result<std::process::Output, tonic::Status> {
pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> GitResult<std::process::Output> {
let mut full_args: Vec<String> = vec![
"--git-dir".into(),
gb.bare_dir.to_string_lossy().into_owned(),
@@ -441,21 +489,41 @@ pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result<std::process::Outpu
error = %e,
"failed to spawn git subprocess"
);
tonic::Status::internal(e.to_string())
GitError::Internal(format!("failed to spawn git: {e}"))
})?;
if !result.status.success() {
let stderr = String::from_utf8_lossy(&result.stderr);
let stderr_str = String::from_utf8_lossy(&result.stderr);
tracing::warn!(
repo = %gb.bare_dir.display(),
status = ?result.status.code(),
stderr = %stderr.trim(),
stderr = %stderr_str.trim(),
"git subprocess exited with non-zero status"
);
return Err(tonic::Status::internal(stderr.trim().to_string()));
return Err(structured_git_error(&stderr_str, result.status.code()));
}
Ok(result)
}
/// Map git subprocess stderr to a structured GitError variant.
fn structured_git_error(stderr: &str, code: Option<i32>) -> GitError {
let stderr_trimmed = stderr.trim();
if stderr_trimmed.contains("not a git repository") || stderr_trimmed.contains("does not exist")
{
GitError::RepoNotFound
} else if stderr_trimmed.contains("Permission denied") || stderr_trimmed.contains("denied") {
GitError::PermissionDenied(stderr_trimmed.to_string())
} else if stderr_trimmed.contains("is locked") || stderr_trimmed.contains("Could not acquire") {
GitError::Locked(stderr_trimmed.to_string())
} else if stderr_trimmed.contains("not found") || stderr_trimmed.contains("do not have") {
GitError::NotFound(stderr_trimmed.to_string())
} else {
GitError::CommandFailed {
status_code: code,
stderr: stderr_trimmed.to_string(),
}
}
}
pub async fn serve(
addr: std::net::SocketAddr,
svc: GitksService,
+20 -5
View File
@@ -3,6 +3,7 @@ use tokio_stream::wrappers::ReceiverStream;
use crate::pb::pack_service_client::PackServiceClient;
use crate::pb::*;
use crate::pack::CancellableReceiverStream;
use super::{GitksService, into_status};
@@ -14,8 +15,8 @@ remote_client!(
#[tonic::async_trait]
impl pack_service_server::PackService for GitksService {
type UploadPackStream = ReceiverStream<Result<UploadPackResponse, tonic::Status>>;
type ReceivePackStream = ReceiverStream<Result<ReceivePackResponse, tonic::Status>>;
type UploadPackStream = CancellableReceiverStream<Result<UploadPackResponse, tonic::Status>>;
type ReceivePackStream = CancellableReceiverStream<Result<ReceivePackResponse, tonic::Status>>;
type PackObjectsStream = ReceiverStream<Result<PackfileChunk, tonic::Status>>;
async fn advertise_refs(
@@ -112,7 +113,12 @@ impl pack_service_server::PackService for GitksService {
.upload_pack(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?;
let out = super::bridge_server_stream(resp.into_inner());
return Ok(tonic::Response::new(out));
// Create a dummy cancel token for the forwarded stream
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_guard = cancel_token.drop_guard();
return Ok(tonic::Response::new(
crate::pack::CancellableReceiverStream::new(out, cancel_guard),
));
}
crate::metrics::record_rpc_error(&m, &err);
return Err(err);
@@ -182,7 +188,12 @@ impl pack_service_server::PackService for GitksService {
.receive_pack(tokio_stream::wrappers::ReceiverStream::new(rx))
.await?;
let out = super::bridge_server_stream(resp.into_inner());
return Ok(tonic::Response::new(out));
// Create a dummy cancel token for the forwarded stream
let cancel_token = tokio_util::sync::CancellationToken::new();
let cancel_guard = cancel_token.drop_guard();
return Ok(tonic::Response::new(
crate::pack::CancellableReceiverStream::new(out, cancel_guard),
));
}
crate::metrics::record_rpc_error(&m, &err);
return Err(err);
@@ -333,7 +344,11 @@ impl pack_service_server::PackService for GitksService {
inputs.push(msg?);
}
let _rate = self
.acquire_rate_limit(inputs.first().and_then(|r: &IndexPackRequest| r.repository.as_ref()))
.acquire_rate_limit(
inputs
.first()
.and_then(|r: &IndexPackRequest| r.repository.as_ref()),
)
.await?;
let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref()));
let span = tracing::info_span!("pack.index_pack", %repo);
+1 -1
View File
@@ -1,5 +1,5 @@
use crate::pb::*;
use crate::pb::ref_service_server::RefService;
use crate::pb::*;
use super::GitksService;
+1 -1
View File
@@ -1,5 +1,5 @@
use crate::pb::*;
use crate::pb::remote_service_server::RemoteService;
use crate::pb::*;
use crate::remote::find_remote::{find_remote_repository, find_remote_root_ref};
use super::GitksService;
+2 -7
View File
@@ -434,7 +434,6 @@ impl repository_service_server::RepositoryService for GitksService {
Ok(tonic::Response::new(resp))
}
async fn list_hooks(
&self,
request: tonic::Request<ListHooksRequest>,
@@ -495,7 +494,6 @@ impl repository_service_server::RepositoryService for GitksService {
Ok(tonic::Response::new(()))
}
async fn create_snapshot(
&self,
request: tonic::Request<CreateSnapshotRequest>,
@@ -600,7 +598,6 @@ impl repository_service_server::RepositoryService for GitksService {
Ok(tonic::Response::new(()))
}
type FetchRepositoryDataStream =
ReceiverStream<Result<FetchRepositoryDataResponse, tonic::Status>>;
@@ -698,7 +695,6 @@ impl repository_service_server::RepositoryService for GitksService {
Ok(tonic::Response::new(ReceiverStream::new(rx)))
}
async fn find_merge_base(
&self,
request: tonic::Request<FindMergeBaseRequest>,
@@ -751,7 +747,6 @@ impl repository_service_server::RepositoryService for GitksService {
Ok(tonic::Response::new(resp))
}
async fn objects_size(
&self,
request: tonic::Request<ObjectsSizeRequest>,
@@ -795,7 +790,8 @@ impl repository_service_server::RepositoryService for GitksService {
&self,
request: tonic::Request<CreateRepositoryFromUrlRequest>,
) -> Result<tonic::Response<CreateRepositoryFromUrlResponse>, tonic::Status> {
let m = crate::metrics::RequestMetrics::new("gitks.RepositoryService/CreateRepositoryFromURL");
let m =
crate::metrics::RequestMetrics::new("gitks.RepositoryService/CreateRepositoryFromURL");
let inner = request.into_inner();
let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?;
let bare_dir = self.resolve_for_init(inner.repository.as_ref())?;
@@ -816,7 +812,6 @@ impl repository_service_server::RepositoryService for GitksService {
}))
}
async fn find_license(
&self,
request: tonic::Request<FindLicenseRequest>,
+42 -17
View File
@@ -1,11 +1,9 @@
#[cfg(test)]
mod cluster_test {
use gitks::pb::{
repository_service_client::RepositoryServiceClient,
branch_service_client::BranchServiceClient,
RepositoryHeader, InitRepositoryRequest, CreateBranchRequest,
GetRepositoryRequest,
ObjectSelector, ObjectName, object_selector,
CreateBranchRequest, GetRepositoryRequest, InitRepositoryRequest, ObjectName,
ObjectSelector, RepositoryHeader, branch_service_client::BranchServiceClient,
object_selector, repository_service_client::RepositoryServiceClient,
};
const N1: &str = "http://localhost:50051";
@@ -13,38 +11,61 @@ mod cluster_test {
const N3: &str = "http://localhost:50053";
fn hdr(path: &str) -> RepositoryHeader {
RepositoryHeader { storage_name: String::new(), relative_path: path.into(), storage_path: String::new() }
RepositoryHeader {
storage_name: String::new(),
relative_path: path.into(),
storage_path: String::new(),
}
}
#[tokio::test]
async fn test_cluster_routing() {
let ts = std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_secs();
let ts = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs();
let repo = format!("cluster-test-{ts}");
// ── Init via node1 ──
let mut n1 = RepositoryServiceClient::connect(N1).await.unwrap();
let r = n1.init_repository(tonic::Request::new(InitRepositoryRequest {
repository: Some(hdr(&repo)), bare: true, object_format: 0, initial_branch: "main".into(),
})).await.unwrap().into_inner();
let r = n1
.init_repository(tonic::Request::new(InitRepositoryRequest {
repository: Some(hdr(&repo)),
bare: true,
object_format: 0,
initial_branch: "main".into(),
}))
.await
.unwrap()
.into_inner();
println!("✅ n1 init: bare={}", r.bare);
// ── Read via node2 (should forward to PRIMARY n1) ──
let mut n2 = RepositoryServiceClient::connect(N2).await.unwrap();
let r2 = n2.get_repository(tonic::Request::new(GetRepositoryRequest {
let r2 = n2
.get_repository(tonic::Request::new(GetRepositoryRequest {
repository: Some(hdr(&repo)),
})).await.unwrap().into_inner();
}))
.await
.unwrap()
.into_inner();
println!("✅ n2 get routed→primary: bare={}", r2.bare);
// ── Read via node3 ──
let mut n3 = RepositoryServiceClient::connect(N3).await.unwrap();
let r3 = n3.get_repository(tonic::Request::new(GetRepositoryRequest {
let r3 = n3
.get_repository(tonic::Request::new(GetRepositoryRequest {
repository: Some(hdr(&repo)),
})).await.unwrap().into_inner();
}))
.await
.unwrap()
.into_inner();
println!("✅ n3 get routed→primary: bare={}", r3.bare);
// ── Write (create branch) via node2 → primary ──
let mut n2b = BranchServiceClient::connect(N2).await.unwrap();
let b = n2b.create_branch(tonic::Request::new(CreateBranchRequest {
let b = n2b
.create_branch(tonic::Request::new(CreateBranchRequest {
repository: Some(hdr(&repo)),
name: "feature/x".into(),
start_point: Some(ObjectSelector {
@@ -53,9 +74,13 @@ mod cluster_test {
})),
}),
force: false,
})).await;
}))
.await;
match b {
Ok(branch) => println!("✅ n2 create-branch routed→primary: name={}", branch.into_inner().name),
Ok(branch) => println!(
"✅ n2 create-branch routed→primary: name={}",
branch.into_inner().name
),
Err(e) => println!("⚠️ create-branch: {e} (expected — empty repo has no commits)"),
}