diff --git a/Cargo.lock b/Cargo.lock index d44f8b0..a324e7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -721,6 +721,7 @@ dependencies = [ "tokio-stream", "tokio-util", "tonic", + "tonic-health", "tonic-prost", "tonic-prost-build", "tracing", @@ -3075,6 +3076,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tonic-health" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fcfab99db777fba2802f0dfa861d1628d1ae916fb199d29819941f139ae85082" +dependencies = [ + "prost", + "tokio", + "tokio-stream", + "tonic", + "tonic-prost", +] + [[package]] name = "tonic-prost" version = "0.14.6" diff --git a/Cargo.toml b/Cargo.toml index 089fd1c..a0a8918 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -33,6 +33,7 @@ thiserror = { version = "2", features = [] } prost = "0.14" prost-types = "0.14" tonic = { version = "0.14", features = ["transport"] } +tonic-health = "0.14.6" tonic-prost = "0.14" tempfile = "3" dotenvy = "0.15" diff --git a/actor/handler.rs b/actor/handler.rs index 9af2d39..09ce4bb 100644 --- a/actor/handler.rs +++ b/actor/handler.rs @@ -1,7 +1,7 @@ use crate::actor::message::{ AppendEntriesRequest, AppendEntriesResponse, ElectionRequest, ElectionResult, GitNodeMessage, - NodeHealth, ReadIndexResponse, RefUpdateEvent, RoleChangedEvent, RouteDecision, - ROLE_PRIMARY, ROLE_REPLICA, RAFT_MSG_VERSION, + NodeHealth, RAFT_MSG_VERSION, ROLE_PRIMARY, ROLE_REPLICA, ReadIndexResponse, RefUpdateEvent, + RoleChangedEvent, RouteDecision, }; use crate::actor::raft_log::RaftLog; use crate::pb::RepositoryHeader; @@ -94,9 +94,8 @@ impl Actor for GitNodeActor { // Initialize Raft log with disk persistence let raft_data_dir = args.data_dir.join("raft"); - let raft_log = RaftLog::new(&raft_data_dir).map_err(|e| { - ActorProcessingErr::from(format!("failed to init raft log: {e}")) - })?; + let raft_log = RaftLog::new(&raft_data_dir) + .map_err(|e| ActorProcessingErr::from(format!("failed to init raft log: {e}")))?; tracing::info!( storage_name = %args.storage_name, entries = raft_log.len(), @@ -451,9 +450,7 @@ fn should_accept_election(request: &ElectionRequest, state: &GitNodeState) -> bo ); return false; } - if request.last_log_term == my_last_term - && request.last_log_index < my_last_index - { + if request.last_log_term == my_last_term && request.last_log_index < my_last_index { tracing::warn!( candidate_index = request.last_log_index, my_index = my_last_index, @@ -796,20 +793,19 @@ fn handle_append_entries( }; } } - if state.raft_log.term_at(entry.index) == 0 { - if let Some(raft_entry) = entry.to_entry() - && let Err(e) = state.raft_log.append_reserved(raft_entry) - { - tracing::error!(error = %e, "failed to append raft entry"); - return AppendEntriesResponse { - version: RAFT_MSG_VERSION, - term: state.current_term, - success: false, - match_index: state.raft_log.last_index(), - conflict_index: 0, - conflict_term: 0, - }; - } + if state.raft_log.term_at(entry.index) == 0 + && let Some(raft_entry) = entry.to_entry() + && let Err(e) = state.raft_log.append_reserved(raft_entry) + { + tracing::error!(error = %e, "failed to append raft entry"); + return AppendEntriesResponse { + version: RAFT_MSG_VERSION, + term: state.current_term, + success: false, + match_index: state.raft_log.last_index(), + conflict_index: 0, + conflict_term: 0, + }; } } @@ -858,7 +854,10 @@ fn handle_read_index(state: &GitNodeState) -> ReadIndexResponse { ReadIndexResponse { commit_index: state.raft_log.commit_index(), leader_term: state.current_term, - is_leader: state.is_primary && state.leader_lease_deadline.is_some_and(|d| d > Instant::now()), + is_leader: state.is_primary + && state + .leader_lease_deadline + .is_some_and(|d| d > Instant::now()), } } @@ -902,8 +901,12 @@ pub async fn broadcast_append_entries( match ractor::call_t!(actor_ref, GitNodeMessage::AppendEntries, 5000, request) { Ok(response) if response.success => { success_count += 1; - state.match_index.insert(follower_id.clone(), response.match_index); - state.next_index.insert(follower_id, response.match_index + 1); + state + .match_index + .insert(follower_id.clone(), response.match_index); + state + .next_index + .insert(follower_id, response.match_index + 1); } Ok(response) => { // Follower rejected — update next_index for retry @@ -916,7 +919,9 @@ pub async fn broadcast_append_entries( // Decrement next_index (optimization: use conflict info) let next = state.next_index.get(&follower_id).copied().unwrap_or(1); if response.conflict_index > 0 && response.conflict_index < next { - state.next_index.insert(follower_id, response.conflict_index); + state + .next_index + .insert(follower_id, response.conflict_index); } else if next > 1 { state.next_index.insert(follower_id, next - 1); } @@ -933,7 +938,9 @@ pub async fn broadcast_append_entries( /// Check if Leader lease is still valid. pub fn is_leader_lease_valid(state: &GitNodeState) -> bool { state.is_primary - && state.leader_lease_deadline.is_some_and(|d| d > Instant::now()) + && state + .leader_lease_deadline + .is_some_and(|d| d > Instant::now()) } /// Update Leader lease after successful majority replication. @@ -1006,8 +1013,12 @@ async fn handle_raft_write( match ractor::call_t!(actor_ref, GitNodeMessage::AppendEntries, 5000, request) { Ok(response) if response.success => { success_count += 1; - state.match_index.insert(follower_id.clone(), response.match_index); - state.next_index.insert(follower_id, response.match_index + 1); + state + .match_index + .insert(follower_id.clone(), response.match_index); + state + .next_index + .insert(follower_id, response.match_index + 1); } Ok(response) => { tracing::debug!( @@ -1083,11 +1094,14 @@ fn apply_raft_command(state: &mut GitNodeState, command: &crate::actor::raft_log storage_name = %storage_name, "applying RegisterRepo from Raft log" ); - state.repos.entry(relative_path.clone()).or_insert_with(|| RepoEntry { - role: ROLE_REPLICA.to_string(), - last_commit: String::new(), - read_only: false, - }); + state + .repos + .entry(relative_path.clone()) + .or_insert_with(|| RepoEntry { + role: ROLE_REPLICA.to_string(), + last_commit: String::new(), + read_only: false, + }); } crate::actor::raft_log::Command::RemoveRepo { relative_path } => { tracing::info!( @@ -1121,5 +1135,7 @@ fn apply_raft_command(state: &mut GitNodeState, command: &crate::actor::raft_log } // Advance last_applied - state.raft_log.advance_last_applied(state.raft_log.commit_index()); + state + .raft_log + .advance_last_applied(state.raft_log.commit_index()); } diff --git a/actor/message.rs b/actor/message.rs index 3c4fc1f..02041cb 100644 --- a/actor/message.rs +++ b/actor/message.rs @@ -159,7 +159,6 @@ pub enum GitNodeMessage { TriggerElection, // ── Raft consensus messages ────────────────────────────── - /// AppendEntries RPC: Leader → Follower log replication. #[rpc] AppendEntries(AppendEntriesRequest, RpcReplyPort), @@ -418,7 +417,16 @@ impl BytesConvertable for AppendEntriesRequest { }); } let leader_commit = read_u64(&bytes, &mut offset); - Self { version, term, leader_id, leader_grpc_addr, prev_log_index, prev_log_term, entries, leader_commit } + Self { + version, + term, + leader_id, + leader_grpc_addr, + prev_log_index, + prev_log_term, + entries, + leader_commit, + } } } @@ -457,7 +465,14 @@ impl BytesConvertable for AppendEntriesResponse { let match_index = read_u64(&bytes, &mut offset); let conflict_index = read_u64(&bytes, &mut offset); let conflict_term = read_u64(&bytes, &mut offset); - Self { version, term, success, match_index, conflict_index, conflict_term } + Self { + version, + term, + success, + match_index, + conflict_index, + conflict_term, + } } } @@ -502,7 +517,11 @@ impl BytesConvertable for ReadIndexResponse { let commit_index = read_u64(&bytes, &mut offset); let leader_term = read_u64(&bytes, &mut offset); let is_leader = bytes.get(offset).copied().unwrap_or(0) == 1; - Self { commit_index, leader_term, is_leader } + Self { + commit_index, + leader_term, + is_leader, + } } } diff --git a/actor/mod.rs b/actor/mod.rs index ad99349..730f7f9 100644 --- a/actor/mod.rs +++ b/actor/mod.rs @@ -7,15 +7,14 @@ pub mod sync; pub use handler::find_primary_in_cluster; pub use handler::{ - broadcast_append_entries, broadcast_ref_update, broadcast_role_changed, - get_category_members, get_cluster_nodes, is_leader_lease_valid, list_all_groups, - renew_leader_lease, route_group_for, start_node_actor, GitNodeActor, GitNodeArgs, RepoEntry, + GitNodeActor, GitNodeArgs, RepoEntry, broadcast_append_entries, broadcast_ref_update, + broadcast_role_changed, get_category_members, get_cluster_nodes, is_leader_lease_valid, + list_all_groups, renew_leader_lease, route_group_for, start_node_actor, }; pub use message::{ - AppendEntriesRequest, AppendEntriesResponse, ElectionRequest, ElectionResult, - GitNodeMessage, NodeHealth, ReadIndexRequest, ReadIndexResponse, RefUpdateEvent, - RepoActorMessage, RoleChangedEvent, RouteDecision, SerializedRaftEntry, - ROLE_PRIMARY, ROLE_REPLICA, RAFT_MSG_VERSION, + AppendEntriesRequest, AppendEntriesResponse, ElectionRequest, ElectionResult, GitNodeMessage, + NodeHealth, RAFT_MSG_VERSION, ROLE_PRIMARY, ROLE_REPLICA, ReadIndexRequest, ReadIndexResponse, + RefUpdateEvent, RepoActorMessage, RoleChangedEvent, RouteDecision, SerializedRaftEntry, }; pub use raft_log::{Command as RaftCommand, LogEntry as RaftLogEntry, RaftLog}; pub use server::init_actor_cluster; diff --git a/actor/raft_log.rs b/actor/raft_log.rs index 80cbfe3..3258165 100644 --- a/actor/raft_log.rs +++ b/actor/raft_log.rs @@ -13,9 +13,9 @@ use std::sync::atomic::{AtomicU64, Ordering}; use ractor_cluster::BytesConvertable; -use crate::error::{GitError, GitResult}; -use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage}; use crate::actor::handler::RepoEntry; +use crate::actor::snapshot::{RaftSnapshot, SnapshotStorage}; +use crate::error::{GitError, GitResult}; use std::collections::HashMap; /// Protocol version for forward/backward compatibility. @@ -56,11 +56,19 @@ impl Command { pub fn encode(&self) -> Vec { let mut buf = Vec::new(); match self { - Command::RefUpdate { relative_path, ref_name, old_oid, new_oid } => { + Command::RefUpdate { + relative_path, + ref_name, + old_oid, + new_oid, + } => { buf.push(0); // tag encode_strings(&mut buf, &[relative_path, ref_name, old_oid, new_oid]); } - Command::RegisterRepo { relative_path, storage_name } => { + Command::RegisterRepo { + relative_path, + storage_name, + } => { buf.push(1); encode_strings(&mut buf, &[relative_path, storage_name]); } @@ -68,7 +76,10 @@ impl Command { buf.push(2); encode_strings(&mut buf, &[relative_path]); } - Command::SetPrimary { storage_name, relative_paths } => { + Command::SetPrimary { + storage_name, + relative_paths, + } => { buf.push(3); encode_string(&mut buf, storage_name); buf.extend((relative_paths.len() as u32).to_be_bytes()); @@ -192,7 +203,12 @@ pub struct LogEntry { impl LogEntry { pub fn new(term: u64, index: u64, command: Command) -> Self { let checksum = Self::compute_checksum(term, index, &command); - Self { term, index, command, checksum } + Self { + term, + index, + command, + checksum, + } } fn compute_checksum(term: u64, index: u64, command: &Command) -> u32 { @@ -243,7 +259,12 @@ impl LogEntry { return None; } - Some(LogEntry { term, index, command, checksum }) + Some(LogEntry { + term, + index, + command, + checksum, + }) } } @@ -320,7 +341,9 @@ impl RaftStorage { .append(true) .open(&self.index_path) .map_err(GitError::Io)?; - index_file.write_all(&index_entry.encode()).map_err(GitError::Io)?; + index_file + .write_all(&index_entry.encode()) + .map_err(GitError::Io)?; index_file.flush().map_err(GitError::Io)?; Ok(entry.index) @@ -383,7 +406,12 @@ impl RaftStorage { ); break; } - entries.push(LogEntry { term, index, command, checksum }); + entries.push(LogEntry { + term, + index, + command, + checksum, + }); } None => { tracing::warn!(index, "failed to decode command during recovery, stopping"); @@ -478,7 +506,10 @@ impl RaftLog { let entries = storage.load_all()?; - let next_index = entries.last().map(|e| e.index + 1).unwrap_or(snapshot_index + 1); + let next_index = entries + .last() + .map(|e| e.index + 1) + .unwrap_or(snapshot_index + 1); let last_applied = entries.last().map(|e| e.index).unwrap_or(snapshot_index); Ok(Self { @@ -610,7 +641,9 @@ impl RaftLog { return Ok(()); // Nothing to compact } - let keep: Vec = self.entries.iter() + let keep: Vec = self + .entries + .iter() .filter(|e| e.index >= from_index) .cloned() .collect(); @@ -622,11 +655,7 @@ impl RaftLog { self.storage.truncate_and_rebuild(&keep)?; self.entries = keep; - tracing::info!( - from_index, - kept = self.entries.len(), - "raft log compacted" - ); + tracing::info!(from_index, kept = self.entries.len(), "raft log compacted"); Ok(()) } @@ -635,7 +664,9 @@ impl RaftLog { /// when a follower detects a term mismatch, it must delete the conflicting entry /// and all entries that follow. pub fn truncate_from(&mut self, from_index: u64) -> GitResult<()> { - let keep: Vec = self.entries.iter() + let keep: Vec = self + .entries + .iter() .filter(|e| e.index < from_index) .cloned() .collect(); @@ -677,11 +708,7 @@ impl RaftLog { /// Create a snapshot of the current state and compact the log. pub fn create_snapshot(&mut self, repos: HashMap) -> GitResult<()> { - let snapshot = RaftSnapshot::new( - self.last_applied, - self.term_at(self.last_applied), - repos, - ); + let snapshot = RaftSnapshot::new(self.last_applied, self.term_at(self.last_applied), repos); self.snapshot_storage.save(&snapshot)?; self.snapshot_index = snapshot.last_included_index; @@ -701,12 +728,16 @@ impl RaftLog { } /// Restore state from a snapshot. - pub fn restore_snapshot(&mut self, snapshot: RaftSnapshot) -> GitResult> { + pub fn restore_snapshot( + &mut self, + snapshot: RaftSnapshot, + ) -> GitResult> { self.snapshot_index = snapshot.last_included_index; self.snapshot_term = snapshot.last_included_term; self.commit_index = snapshot.last_included_index; self.last_applied = snapshot.last_included_index; - self.next_index.store(snapshot.last_included_index + 1, Ordering::SeqCst); + self.next_index + .store(snapshot.last_included_index + 1, Ordering::SeqCst); // Clear all entries (they're covered by the snapshot) self.entries.clear(); diff --git a/actor/snapshot.rs b/actor/snapshot.rs index 9f0f676..bae428a 100644 --- a/actor/snapshot.rs +++ b/actor/snapshot.rs @@ -79,11 +79,14 @@ impl RaftSnapshot { let last_commit = read_string(data, &mut offset)?; let read_only = data.get(offset).copied().unwrap_or(0) == 1; offset += 1; - repos.insert(path, RepoEntry { - role, - last_commit, - read_only, - }); + repos.insert( + path, + RepoEntry { + role, + last_commit, + read_only, + }, + ); } Some(Self { diff --git a/actor/sync.rs b/actor/sync.rs index 170782c..b0a4a0d 100644 --- a/actor/sync.rs +++ b/actor/sync.rs @@ -219,7 +219,9 @@ async fn sync_via_pack_service_to_file( temp_dir: &Path, ) -> Result, String> { use crate::pb::pack_service_client::PackServiceClient; - use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader}; + use crate::pb::{ + AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader, + }; use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; @@ -358,10 +360,7 @@ fn update_local_ref(repo_path: &Path, ref_name: &str, new_oid: &str) { /// Apply a committed Raft command to the local git repository. /// This is called on followers when they receive committed entries from the leader. -pub fn apply_raft_command_to_repo( - repo_prefix: &Path, - command: &crate::actor::raft_log::Command, -) { +pub fn apply_raft_command_to_repo(repo_prefix: &Path, command: &crate::actor::raft_log::Command) { match command { crate::actor::raft_log::Command::RefUpdate { relative_path, diff --git a/build.rs b/build.rs index ec46737..1dbc93d 100644 --- a/build.rs +++ b/build.rs @@ -111,16 +111,19 @@ fn generate_linguist( // Merge: primary wins over secondary, with explicit priority for known conflicts // These are common extensions where linguist has multiple primary claims let priority_overrides: HashMap<&str, &str> = [ - (".rs", "Rust"), // RenderScript also claims .rs - (".md", "Markdown"), // GCC Machine Description also claims .md - (".r", "R"), // Rebol also claims .r - (".s", "Assembly"), // Multiple assemblers claim .s - (".ms", "MAXScript"), // Unix Assembly also claims .ms - (".g", "G-code"), // GAP also claims .g - (".m", "Objective-C"), // Mercury, MUF, etc. also claim .m - (".w", "CWeb"), // OpenSCAD also claims .w - (".q", "Q"), // KBD also claims .q - ].iter().cloned().collect(); + (".rs", "Rust"), // RenderScript also claims .rs + (".md", "Markdown"), // GCC Machine Description also claims .md + (".r", "R"), // Rebol also claims .r + (".s", "Assembly"), // Multiple assemblers claim .s + (".ms", "MAXScript"), // Unix Assembly also claims .ms + (".g", "G-code"), // GAP also claims .g + (".m", "Objective-C"), // Mercury, MUF, etc. also claim .m + (".w", "CWeb"), // OpenSCAD also claims .w + (".q", "Q"), // KBD also claims .q + ] + .iter() + .cloned() + .collect(); for (ext, (lang, ltype)) in ext_primary { if let Some(&preferred) = priority_overrides.get(ext.as_str()) { @@ -140,7 +143,11 @@ fn generate_linguist( if let Some(entry) = languages.get(preferred) && entry.extensions.iter().any(|e| e.to_lowercase() == ext) { - ext_map.push((ext.to_string(), preferred.to_string(), entry.lang_type.clone())); + ext_map.push(( + ext.to_string(), + preferred.to_string(), + entry.lang_type.clone(), + )); } } } @@ -172,8 +179,12 @@ fn generate_linguist( code.push_str("/// Key is lowercase extension including the dot, e.g. \".rs\".\n"); code.push_str("pub static EXTENSION_MAP: &[(&str, &str, &str)] = &[\n"); for (ext, lang, ltype) in &ext_map { - code.push_str(&format!(" (\"{}\", \"{}\", \"{}\"),\n", - escape_str(ext), escape_str(lang), escape_str(ltype))); + code.push_str(&format!( + " (\"{}\", \"{}\", \"{}\"),\n", + escape_str(ext), + escape_str(lang), + escape_str(ltype) + )); } code.push_str("];\n\n"); @@ -182,8 +193,12 @@ fn generate_linguist( code.push_str("/// Key is exact filename, e.g. \"Makefile\", \"Dockerfile\".\n"); code.push_str("pub static FILENAME_MAP: &[(&str, &str, &str)] = &[\n"); for (fname, lang, ltype) in &fname_map { - code.push_str(&format!(" (\"{}\", \"{}\", \"{}\"),\n", - escape_str(fname), escape_str(lang), escape_str(ltype))); + code.push_str(&format!( + " (\"{}\", \"{}\", \"{}\"),\n", + escape_str(fname), + escape_str(lang), + escape_str(ltype) + )); } code.push_str("];\n\n"); @@ -191,8 +206,11 @@ fn generate_linguist( code.push_str("/// Language name to type mapping.\n"); code.push_str("pub static LANG_TYPE_MAP: &[(&str, &str)] = &[\n"); for (lang, ltype) in &lang_type_map { - code.push_str(&format!(" (\"{}\", \"{}\"),\n", - escape_str(lang), escape_str(ltype))); + code.push_str(&format!( + " (\"{}\", \"{}\"),\n", + escape_str(lang), + escape_str(ltype) + )); } code.push_str("];\n\n"); @@ -202,8 +220,11 @@ fn generate_linguist( let mut group_vec: Vec<_> = lang_group_map.iter().collect(); group_vec.sort_by(|a, b| a.0.cmp(b.0)); for (lang, group) in group_vec { - code.push_str(&format!(" (\"{}\", \"{}\"),\n", - escape_str(lang), escape_str(group))); + code.push_str(&format!( + " (\"{}\", \"{}\"),\n", + escape_str(lang), + escape_str(group) + )); } code.push_str("];\n\n"); @@ -213,23 +234,28 @@ fn generate_linguist( code.push_str(" match ext {\n"); // Image extensions - let image_exts = [".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".svg", - ".webp", ".tiff", ".tif", ".psd", ".raw", ".heic", ".heif", ".avif", - ".apng", ".jfif", ".pjpeg", ".pjp"]; + let image_exts = [ + ".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".svg", ".webp", ".tiff", ".tif", ".psd", + ".raw", ".heic", ".heif", ".avif", ".apng", ".jfif", ".pjpeg", ".pjp", + ]; for ext in &image_exts { code.push_str(&format!(" \"{}\" => \"Image\",\n", ext)); } // Video extensions - let video_exts = [".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", - ".m4v", ".mpg", ".mpeg", ".3gp", ".3g2", ".ogv", ".vob"]; + let video_exts = [ + ".mp4", ".avi", ".mkv", ".mov", ".wmv", ".flv", ".webm", ".m4v", ".mpg", ".mpeg", ".3gp", + ".3g2", ".ogv", ".vob", + ]; for ext in &video_exts { code.push_str(&format!(" \"{}\" => \"Video\",\n", ext)); } // Audio extensions - let audio_exts = [".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a", - ".opus", ".aiff", ".ape", ".alac", ".mid", ".midi"]; + let audio_exts = [ + ".mp3", ".wav", ".flac", ".aac", ".ogg", ".wma", ".m4a", ".opus", ".aiff", ".ape", ".alac", + ".mid", ".midi", + ]; for ext in &audio_exts { code.push_str(&format!(" \"{}\" => \"Audio\",\n", ext)); } @@ -241,11 +267,12 @@ fn generate_linguist( } // Other binary - let binary_exts = [".exe", ".dll", ".so", ".dylib", ".a", ".lib", ".o", - ".obj", ".bin", ".dat", ".db", ".sqlite", ".sqlite3", ".pyc", ".pyo", - ".class", ".jar", ".war", ".ear", ".zip", ".tar", ".gz", - ".bz2", ".xz", ".7z", ".rar", ".pdf", ".doc", ".docx", ".xls", - ".xlsx", ".ppt", ".pptx", ".odt", ".ods", ".odp", ".wasm", ".node"]; + let binary_exts = [ + ".exe", ".dll", ".so", ".dylib", ".a", ".lib", ".o", ".obj", ".bin", ".dat", ".db", + ".sqlite", ".sqlite3", ".pyc", ".pyo", ".class", ".jar", ".war", ".ear", ".zip", ".tar", + ".gz", ".bz2", ".xz", ".7z", ".rar", ".pdf", ".doc", ".docx", ".xls", ".xlsx", ".ppt", + ".pptx", ".odt", ".ods", ".odp", ".wasm", ".node", + ]; for ext in &binary_exts { code.push_str(&format!(" \"{}\" => \"Binary\",\n", ext)); } @@ -259,8 +286,7 @@ fn generate_linguist( } fn escape_str(s: &str) -> String { - s.replace('\\', "\\\\") - .replace('"', "\\\"") + s.replace('\\', "\\\\").replace('"', "\\\"") } fn proto_files(proto_dir: &Path) -> Result, Box> { diff --git a/commit/find_commit.rs b/commit/find_commit.rs index 5d6b975..b2dc316 100644 --- a/commit/find_commit.rs +++ b/commit/find_commit.rs @@ -39,16 +39,15 @@ impl GitBare { for oid_bytes in &request.oids { let hex: String = oid_bytes.iter().map(|b| format!("{b:02x}")).collect(); - if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) { - if let Ok(obj) = repo.find_object(oid) { - if let Ok(commit) = obj.try_into_commit() { - commits.push(crate::commit::get_commit::commit_to_pb( - self, - &commit, - request.include_stats, - )); - } - } + if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) + && let Ok(obj) = repo.find_object(oid) + && let Ok(commit) = obj.try_into_commit() + { + commits.push(crate::commit::get_commit::commit_to_pb( + self, + &commit, + request.include_stats, + )); } if commits.len() >= 100 { break; diff --git a/commit/query.rs b/commit/query.rs index 2e39385..24798a1 100644 --- a/commit/query.rs +++ b/commit/query.rs @@ -58,14 +58,13 @@ impl GitBare { for line in stdout.lines().skip(request.offset as usize) { let hex = line.trim(); - if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) { - if let Ok(obj) = repo.find_object(oid) { - if let Ok(commit) = obj.try_into_commit() { - commits.push(crate::commit::get_commit::commit_to_pb( - self, &commit, false, - )); - } - } + if let Ok(oid) = gix::ObjectId::from_hex(hex.as_bytes()) + && let Ok(obj) = repo.find_object(oid) + && let Ok(commit) = obj.try_into_commit() + { + commits.push(crate::commit::get_commit::commit_to_pb( + self, &commit, false, + )); } } diff --git a/disk_cache.rs b/disk_cache.rs index c32d001..82b543f 100644 --- a/disk_cache.rs +++ b/disk_cache.rs @@ -271,10 +271,8 @@ impl DiskCache { } let start = std::time::Instant::now(); let path = self.cache_file_path(namespace, digest); - if !path.exists() { - crate::metrics::record_cache_op("disk", "miss", start.elapsed()); - return Ok(None); - } + + // Check expiry before reading, but handle concurrent deletion gracefully. if let Ok(metadata) = std::fs::metadata(&path) && let Ok(modified) = metadata.modified() && let Ok(age) = SystemTime::now().duration_since(modified) @@ -292,16 +290,26 @@ impl DiskCache { crate::metrics::record_cache_op("disk", "expired", start.elapsed()); return Ok(None); } - let data = std::fs::read(&path).map_err(GitError::Io)?; - tracing::debug!( - namespace = %namespace, - digest = %digest, - size = data.len(), - elapsed_ms = start.elapsed().as_millis() as u64, - "cache hit" - ); - crate::metrics::record_cache_op("disk", "hit", start.elapsed()); - Ok(Some(data)) + + match std::fs::read(&path) { + Ok(data) => { + tracing::debug!( + namespace = %namespace, + digest = %digest, + size = data.len(), + elapsed_ms = start.elapsed().as_millis() as u64, + "cache hit" + ); + crate::metrics::record_cache_op("disk", "hit", start.elapsed()); + Ok(Some(data)) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => { + // File was deleted between metadata check and read — treat as miss. + crate::metrics::record_cache_op("disk", "miss", start.elapsed()); + Ok(None) + } + Err(e) => Err(GitError::Io(e)), + } } /// Insert a cached response for the given namespace and digest. @@ -338,9 +346,8 @@ impl DiskCache { return Ok(None); } let path = self.cache_file_path(namespace, digest); - if !path.exists() { - return Ok(None); - } + + // Check expiry; handle concurrent deletion gracefully. if let Ok(metadata) = std::fs::metadata(&path) && let Ok(modified) = metadata.modified() && let Ok(age) = SystemTime::now().duration_since(modified) @@ -349,8 +356,12 @@ impl DiskCache { std::fs::remove_file(&path).ok(); return Ok(None); } - let file = std::fs::File::open(&path).map_err(GitError::Io)?; - Ok(Some(file)) + + match std::fs::File::open(&path) { + Ok(file) => Ok(Some(file)), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(GitError::Io(e)), + } } /// Open a cache file for streaming write. diff --git a/hooks/runner.rs b/hooks/runner.rs index 86432f6..259067f 100644 --- a/hooks/runner.rs +++ b/hooks/runner.rs @@ -128,10 +128,14 @@ pub fn run_hook_dir( /// Run a single hook script with stdin data and timeout. fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) -> HookResult { + // Use Stdio::null() for stdout/stderr to prevent pipe-buffer deadlock. + // With Stdio::piped() + never reading, a hook that writes >64KB of output + // would block the child on write(), and the parent's try_wait() would + // loop until timeout before killing it. let child = std::process::Command::new(script_path) .stdin(Stdio::piped()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) + .stdout(Stdio::null()) + .stderr(Stdio::null()) .spawn(); match child { @@ -143,17 +147,12 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - let wait_result = c.wait_timeout(timeout); match wait_result { - Ok(Some(status)) => { - // Process exited within timeout, get its output - // Note: We already have the status, so we need to construct output differently - // Since wait_with_output would fail after try_wait, we return status-only output - HookResult { - accepted: status.success(), - exit_code: status.code().unwrap_or(-1), - stdout: String::new(), // stdout was consumed by the process - stderr: String::new(), // stderr was consumed by the process - } - } + Ok(Some(status)) => HookResult { + accepted: status.success(), + exit_code: status.code().unwrap_or(-1), + stdout: String::new(), + stderr: String::new(), + }, Ok(None) => { tracing::warn!( script = %script_path.display(), @@ -161,7 +160,6 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - "hook script timed out, killing" ); let _ = c.kill(); - // Explicitly wait to reap the zombie process let _ = c.wait(); HookResult::rejected(format!( "hook script timed out after {}s: {}", @@ -171,7 +169,6 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - } Err(e) => { let _ = c.kill(); - // Explicitly wait to reap the zombie process let _ = c.wait(); HookResult::rejected(format!("hook script wait error: {e}")) } @@ -183,7 +180,6 @@ fn run_single_script(script_path: &Path, stdin_data: &[u8], timeout: Duration) - error = %e, "failed to spawn hook script" ); - // If the script can't be executed, treat as rejection HookResult::rejected(format!("failed to spawn hook script: {e}")) } } diff --git a/hooks/sanitize.rs b/hooks/sanitize.rs index 15fe8d4..50dde1c 100644 --- a/hooks/sanitize.rs +++ b/hooks/sanitize.rs @@ -26,33 +26,33 @@ const FORBIDDEN_PATTERNS: &[&str] = &[ "poweroff", "halt", // Additional patterns to catch encoding/obfuscation attempts - "eval ", // eval can execute arbitrary strings - "exec ", // exec can replace process - "$(", // command substitution - "`", // backtick command substitution - "${", // variable expansion (can be used for obfuscation) - "|bash", // piping to bash - "|sh", // piping to sh - "|dash", // piping to dash - "|zsh", // piping to zsh - "base64", // base64 encoding/decoding (common for obfuscation) - "python -c", // inline python execution - "perl -e", // inline perl execution - "ruby -e", // inline ruby execution - "node -e", // inline node execution - "/dev/tcp", // bash reverse shell - "nc -e", // netcat reverse shell - "ncat", // netcat alternative - "socat", // socket relay + "eval ", // eval can execute arbitrary strings + "exec ", // exec can replace process + "$(", // command substitution + "`", // backtick command substitution + "${", // variable expansion (can be used for obfuscation) + "|bash", // piping to bash + "|sh", // piping to sh + "|dash", // piping to dash + "|zsh", // piping to zsh + "base64", // base64 encoding/decoding (common for obfuscation) + "python -c", // inline python execution + "perl -e", // inline perl execution + "ruby -e", // inline ruby execution + "node -e", // inline node execution + "/dev/tcp", // bash reverse shell + "nc -e", // netcat reverse shell + "ncat", // netcat alternative + "socat", // socket relay ]; /// Additional regex-like patterns that indicate dangerous constructs. /// These are checked with simple string matching for complexity reasons. const DANGEROUS_PREFIXES: &[&str] = &[ - "rm -rf /", // rm -rf with absolute path - "rm -rf ~", // rm -rf with home directory - "rm -rf .", // rm -rf with relative path (current dir) - "rm -rf *", // rm -rf with wildcard + "rm -rf /", // rm -rf with absolute path + "rm -rf ~", // rm -rf with home directory + "rm -rf .", // rm -rf with relative path (current dir) + "rm -rf *", // rm -rf with wildcard ]; /// Maximum hook script size (64KB). @@ -106,7 +106,15 @@ pub fn validate_hook_content(content: &str) -> GitResult<()> { /// Check for common obfuscation attempts. fn check_obfuscation_attempts(content: &str) -> GitResult<()> { // Check for excessive use of special characters that might indicate obfuscation - let special_char_count = content.chars().filter(|c| matches!(c, '$' | '`' | '\\' | '|' | ';' | '&' | '(' | ')' | '{' | '}' | '[' | ']')).count(); + let special_char_count = content + .chars() + .filter(|c| { + matches!( + c, + '$' | '`' | '\\' | '|' | ';' | '&' | '(' | ')' | '{' | '}' | '[' | ']' + ) + }) + .count(); let total_chars = content.chars().count(); // If more than 30% of content is special characters, it's suspicious diff --git a/main.rs b/main.rs index 0da469e..f27b0ce 100644 --- a/main.rs +++ b/main.rs @@ -59,48 +59,50 @@ fn init_tracing() -> Option { // Optional file output with rotation if let Ok(log_dir) = std::env::var("GITKS_LOG_DIR") { - let rotation = match env_or("GITKS_LOG_ROTATION", "daily").as_str() { - "hourly" => tracing_appender::rolling::Rotation::HOURLY, - "never" => tracing_appender::rolling::Rotation::NEVER, - _ => tracing_appender::rolling::Rotation::DAILY, - }; - let retention = env_u64("GITKS_LOG_RETENTION", 7) as usize; + let rotation = match env_or("GITKS_LOG_ROTATION", "daily").as_str() { + "hourly" => tracing_appender::rolling::Rotation::HOURLY, + "never" => tracing_appender::rolling::Rotation::NEVER, + _ => tracing_appender::rolling::Rotation::DAILY, + }; + let retention = env_u64("GITKS_LOG_RETENTION", 7) as usize; - let mut builder = tracing_appender::rolling::Builder::new() - .rotation(rotation) - .filename_prefix("gitks") - .filename_suffix("log"); + let mut builder = tracing_appender::rolling::Builder::new() + .rotation(rotation) + .filename_prefix("gitks") + .filename_suffix("log"); - if retention > 0 { - builder = builder.max_log_files(retention); - } - - let file_appender = builder.build(&log_dir).expect("failed to create log directory"); - let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); - - let file_layer = fmt::layer() - .json() - .with_target(true) - .with_file(true) - .with_line_number(true) - .with_writer(non_blocking) - .with_filter(EnvFilter::new("info")) - .boxed(); - - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .with(file_layer) - .init(); - - Some(guard) - } else { - tracing_subscriber::registry() - .with(env_filter) - .with(fmt_layer) - .init(); - None + if retention > 0 { + builder = builder.max_log_files(retention); } + + let file_appender = builder + .build(&log_dir) + .expect("failed to create log directory"); + let (non_blocking, guard) = tracing_appender::non_blocking(file_appender); + + let file_layer = fmt::layer() + .json() + .with_target(true) + .with_file(true) + .with_line_number(true) + .with_writer(non_blocking) + .with_filter(EnvFilter::new("info")) + .boxed(); + + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .with(file_layer) + .init(); + + Some(guard) + } else { + tracing_subscriber::registry() + .with(env_filter) + .with(fmt_layer) + .init(); + None + } } #[tokio::main] @@ -143,10 +145,7 @@ async fn main() -> Result<(), Box> { ); if disk_cache_enabled { - tracing::info!( - max_age_secs = disk_cache_max_age, - "disk cache enabled" - ); + tracing::info!(max_age_secs = disk_cache_max_age, "disk cache enabled"); disk_cache.cleanup_on_startup()?; gitks::disk_cache::start_cache_cleanup_task(disk_cache.clone(), Duration::from_secs(300)); } else { @@ -290,8 +289,13 @@ async fn main() -> Result<(), Box> { } let raft_data_dir = repo_prefix.join(".gitks_raft"); - let (node_actor, node_handle) = - init_actor_cluster(svc.clone(), storage_name.clone(), grpc_addr.clone(), raft_data_dir).await?; + let (node_actor, node_handle) = init_actor_cluster( + svc.clone(), + storage_name.clone(), + grpc_addr.clone(), + raft_data_dir, + ) + .await?; let svc = svc .with_actor(node_actor.clone()) .with_grpc_addr(grpc_addr.clone()); diff --git a/metrics.rs b/metrics.rs index 619e957..71700af 100644 --- a/metrics.rs +++ b/metrics.rs @@ -20,7 +20,7 @@ //! - GET /debug/config — Runtime configuration use dashmap::DashMap; -use std::sync::atomic::{AtomicU64, AtomicBool, Ordering}; +use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, OnceLock}; use std::time::{Duration, Instant}; @@ -82,7 +82,9 @@ struct MetricsInner { static METRICS: OnceLock> = OnceLock::new(); /// Handle for dynamic log level reload. -static LOG_RELOAD_HANDLE: OnceLock>> = OnceLock::new(); +static LOG_RELOAD_HANDLE: OnceLock< + Option>, +> = OnceLock::new(); use tracing_subscriber::EnvFilter; @@ -141,7 +143,9 @@ fn record_duration_bucket(map: &DashMap, key_prefix: &str, du } pub fn set_slow_request_threshold(ms: u64) { - metrics().slow_request_threshold_ms.store(ms, Ordering::Relaxed); + metrics() + .slow_request_threshold_ms + .store(ms, Ordering::Relaxed); } pub fn set_ready(ready: bool) { @@ -254,12 +258,19 @@ pub fn record_hook_execution(hook_type: &str, result: &str, duration: Duration) record_duration_bucket(&m.hook_duration_buckets, hook_type, duration_ms); } -pub fn set_raft_state(term: u64, commit_index: u64, last_applied: u64, is_leader: bool, log_entries: u64) { +pub fn set_raft_state( + term: u64, + commit_index: u64, + last_applied: u64, + is_leader: bool, + log_entries: u64, +) { let m = metrics(); m.raft_term.store(term, Ordering::Relaxed); m.raft_commit_index.store(commit_index, Ordering::Relaxed); m.raft_last_applied.store(last_applied, Ordering::Relaxed); - m.raft_is_leader.store(if is_leader { 1 } else { 0 }, Ordering::Relaxed); + m.raft_is_leader + .store(if is_leader { 1 } else { 0 }, Ordering::Relaxed); m.raft_log_entries.store(log_entries, Ordering::Relaxed); } @@ -268,7 +279,8 @@ pub fn inc_raft_append_entries(success: bool) { let m = metrics(); m.raft_append_entries_total.fetch_add(1, Ordering::Relaxed); if success { - m.raft_append_entries_success.fetch_add(1, Ordering::Relaxed); + m.raft_append_entries_success + .fetch_add(1, Ordering::Relaxed); } } @@ -296,14 +308,22 @@ fn prom_escape(value: &str) -> String { out } -fn render_counter_map(out: &mut String, name: &str, help: &str, map: &DashMap, labels: &[&str]) { +fn render_counter_map( + out: &mut String, + name: &str, + help: &str, + map: &DashMap, + labels: &[&str], +) { out.push_str(&format!("# HELP {name} {help}\n")); out.push_str(&format!("# TYPE {name} counter\n")); for entry in map { let (key, count) = (entry.key(), entry.value().load(Ordering::Relaxed)); let parts: Vec<&str> = key.split(':').collect(); if parts.len() == labels.len() { - let label_str: String = labels.iter().zip(parts.iter()) + let label_str: String = labels + .iter() + .zip(parts.iter()) .map(|(l, v)| format!("{l}=\"{}\"", prom_escape(v))) .collect::>() .join(","); @@ -324,7 +344,10 @@ fn render_histogram(out: &mut String, name: &str, help: &str, map: &DashMap String { out.push_str(&format!("gitks_repository_count {repos}\n\n")); // gRPC requests - render_counter_map(&mut out, "gitks_requests_total", - "Total gRPC requests by method and status", &m.request_count, &["method", "status"]); + render_counter_map( + &mut out, + "gitks_requests_total", + "Total gRPC requests by method and status", + &m.request_count, + &["method", "status"], + ); // gRPC duration - render_histogram(&mut out, "gitks_request_duration_milliseconds", - "Request duration histogram in ms", &m.duration_buckets); + render_histogram( + &mut out, + "gitks_request_duration_milliseconds", + "Request duration histogram in ms", + &m.duration_buckets, + ); // Slow requests - render_counter_map(&mut out, "gitks_slow_requests_total", - "Slow gRPC requests by method", &m.slow_request_count, &["method"]); + render_counter_map( + &mut out, + "gitks_slow_requests_total", + "Slow gRPC requests by method", + &m.slow_request_count, + &["method"], + ); // Cache let hits = m.cache_hits.load(Ordering::Relaxed); @@ -372,26 +409,58 @@ pub fn render_metrics() -> String { out.push_str(&format!("gitks_cache_misses_total {misses}\n\n")); // Errors - render_counter_map(&mut out, "gitks_errors_total", - "Total errors by kind", &m.error_count, &["kind"]); + render_counter_map( + &mut out, + "gitks_errors_total", + "Total errors by kind", + &m.error_count, + &["kind"], + ); // Git subprocess - render_counter_map(&mut out, "gitks_git_cmd_total", - "Git subprocess calls by command", &m.git_cmd_count, &["command"]); - render_histogram(&mut out, "gitks_git_cmd_duration_milliseconds", - "Git subprocess duration in ms", &m.git_cmd_duration_buckets); + render_counter_map( + &mut out, + "gitks_git_cmd_total", + "Git subprocess calls by command", + &m.git_cmd_count, + &["command"], + ); + render_histogram( + &mut out, + "gitks_git_cmd_duration_milliseconds", + "Git subprocess duration in ms", + &m.git_cmd_duration_buckets, + ); // Cache operations - render_counter_map(&mut out, "gitks_cache_ops_total", - "Cache operations by cache and result", &m.cache_op_count, &["cache", "result"]); - render_histogram(&mut out, "gitks_cache_op_duration_milliseconds", - "Cache operation duration in ms", &m.cache_op_duration_buckets); + render_counter_map( + &mut out, + "gitks_cache_ops_total", + "Cache operations by cache and result", + &m.cache_op_count, + &["cache", "result"], + ); + render_histogram( + &mut out, + "gitks_cache_op_duration_milliseconds", + "Cache operation duration in ms", + &m.cache_op_duration_buckets, + ); // Hook execution - render_counter_map(&mut out, "gitks_hook_executions_total", - "Hook executions by type and result", &m.hook_count, &["hook_type", "result"]); - render_histogram(&mut out, "gitks_hook_duration_milliseconds", - "Hook execution duration in ms", &m.hook_duration_buckets); + render_counter_map( + &mut out, + "gitks_hook_executions_total", + "Hook executions by type and result", + &m.hook_count, + &["hook_type", "result"], + ); + render_histogram( + &mut out, + "gitks_hook_duration_milliseconds", + "Hook execution duration in ms", + &m.hook_duration_buckets, + ); // Raft consensus metrics let raft_term = m.raft_term.load(Ordering::Relaxed); @@ -426,11 +495,15 @@ pub fn render_metrics() -> String { out.push_str("# HELP gitks_raft_append_entries_total Total AppendEntries RPCs sent\n"); out.push_str("# TYPE gitks_raft_append_entries_total counter\n"); - out.push_str(&format!("gitks_raft_append_entries_total {raft_ae_total}\n\n")); + out.push_str(&format!( + "gitks_raft_append_entries_total {raft_ae_total}\n\n" + )); out.push_str("# HELP gitks_raft_append_entries_success Successful AppendEntries RPCs\n"); out.push_str("# TYPE gitks_raft_append_entries_success counter\n"); - out.push_str(&format!("gitks_raft_append_entries_success {raft_ae_success}\n\n")); + out.push_str(&format!( + "gitks_raft_append_entries_success {raft_ae_success}\n\n" + )); out.push_str("# HELP gitks_raft_elections_total Total elections triggered\n"); out.push_str("# TYPE gitks_raft_elections_total counter\n"); @@ -438,7 +511,9 @@ pub fn render_metrics() -> String { out.push_str("# HELP gitks_raft_elections_won Elections won by this node\n"); out.push_str("# TYPE gitks_raft_elections_won counter\n"); - out.push_str(&format!("gitks_raft_elections_won {raft_elections_won}\n\n")); + out.push_str(&format!( + "gitks_raft_elections_won {raft_elections_won}\n\n" + )); out } @@ -446,12 +521,12 @@ pub fn render_metrics() -> String { use bytes::Bytes; use http_body_util::Full; use hyper::body::Incoming; -use hyper::{Request, Response, Method}; use hyper::service::Service; +use hyper::{Method, Request, Response}; use std::convert::Infallible; use std::future::Future; -use std::pin::Pin; use std::net::SocketAddr; +use std::pin::Pin; /// Global cancel token for the HTTP server, set from main. static HTTP_CANCEL: OnceLock = OnceLock::new(); @@ -501,9 +576,7 @@ async fn handle_request(req: Request) -> Result>, let body = render_metrics(); text_response(200, "text/plain; version=0.0.4; charset=utf-8", body) } - (Method::GET, "/health") => { - json_response(200, r#"{"status":"healthy"}"#) - } + (Method::GET, "/health") => json_response(200, r#"{"status":"healthy"}"#), (Method::GET, "/ready") => { if metrics().ready.load(Ordering::Relaxed) { json_response(200, r#"{"status":"ready"}"#) @@ -519,30 +592,28 @@ async fn handle_request(req: Request) -> Result>, }; json_response(200, &format!(r#"{{"log_level":"{msg}"}}"#)) } - (Method::PUT, "/debug/log-level") => { - match handle_log_level_update(req).await { - Ok(resp) => resp, - Err(e) => json_response(400, &format!(r#"{{"error":"{e}"}}"#)), - } - } + (Method::PUT, "/debug/log-level") => match handle_log_level_update(req).await { + Ok(resp) => resp, + Err(e) => json_response(400, &format!(r#"{{"error":"{e}"}}"#)), + }, (Method::GET, "/debug/config") => { let threshold = metrics().slow_request_threshold_ms.load(Ordering::Relaxed); let ready = metrics().ready.load(Ordering::Relaxed); - json_response(200, &format!( - r#"{{"slow_request_threshold_ms":{},"ready":{}}}"#, threshold, ready - )) - } - _ => { - json_response(404, r#"{"error":"not found"}"#) + json_response( + 200, + &format!( + r#"{{"slow_request_threshold_ms":{},"ready":{}}}"#, + threshold, ready + ), + ) } + _ => json_response(404, r#"{"error":"not found"}"#), }; Ok(response) } -async fn handle_log_level_update( - req: Request, -) -> Result>, String> { +async fn handle_log_level_update(req: Request) -> Result>, String> { use http_body_util::BodyExt; let body_bytes = req @@ -551,8 +622,8 @@ async fn handle_log_level_update( .map_err(|e| format!("failed to read body: {e}"))? .to_bytes(); - let new_filter = String::from_utf8(body_bytes.to_vec()) - .map_err(|e| format!("invalid UTF-8: {e}"))?; + let new_filter = + String::from_utf8(body_bytes.to_vec()).map_err(|e| format!("invalid UTF-8: {e}"))?; let new_filter = new_filter.trim().to_string(); if new_filter.is_empty() { @@ -566,31 +637,35 @@ async fn handle_log_level_update( Ok(json_response(500, &format!(r#"{{"error":"{e}"}}"#))) } else { tracing::info!(new_filter = %new_filter, "log level updated via HTTP"); - Ok(json_response(200, &format!( - r#"{{"status":"ok","filter":"{}"}}"#, new_filter - ))) + Ok(json_response( + 200, + &format!(r#"{{"status":"ok","filter":"{}"}}"#, new_filter), + )) } } - Err(e) => Ok(json_response(400, &format!( - r#"{{"error":"invalid filter: {e}"}}"# - ))), + Err(e) => Ok(json_response( + 400, + &format!(r#"{{"error":"invalid filter: {e}"}}"#), + )), }, - _ => Ok(json_response(501, r#"{"error":"dynamic log level not configured"}"#)), + _ => Ok(json_response( + 501, + r#"{"error":"dynamic log level not configured"}"#, + )), } } /// Start the HTTP server (metrics + health + debug) using hyper 1.x. pub fn start_metrics_server(port: u16) -> tokio::task::JoinHandle<()> { tokio::spawn(async move { - let listener = match tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))) - .await - { - Ok(l) => l, - Err(e) => { - tracing::error!(port, error = %e, "failed to bind HTTP server"); - return; - } - }; + let listener = + match tokio::net::TcpListener::bind(SocketAddr::from(([0, 0, 0, 0], port))).await { + Ok(l) => l, + Err(e) => { + tracing::error!(port, error = %e, "failed to bind HTTP server"); + return; + } + }; tracing::info!(port, "HTTP server started (metrics + health + debug)"); let cancel = HTTP_CANCEL diff --git a/pack/receive_pack.rs b/pack/receive_pack.rs index 32eb133..3bdf0f4 100644 --- a/pack/receive_pack.rs +++ b/pack/receive_pack.rs @@ -6,9 +6,9 @@ use tokio::process::Command; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; +use super::CancellableReceiverStream; use crate::bare::GitBare; use crate::pb::ReceivePackResponse; -use super::CancellableReceiverStream; /// Maximum time allowed for a git receive-pack process before it is killed. const RECEIVE_PACK_TIMEOUT: Duration = Duration::from_secs(1800); // 30 minutes @@ -28,7 +28,8 @@ impl GitBare { input: impl tokio_stream::Stream> + Send + 'static, - ) -> Result>, tonic::Status> { + ) -> Result>, tonic::Status> + { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); tracing::info!( repo = %bare_dir, @@ -186,6 +187,9 @@ impl GitBare { let rx_stream = ReceiverStream::new(rx); let cancel_guard = cancel_token_clone.clone().drop_guard(); - Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard)) + Ok(super::CancellableReceiverStream::new( + rx_stream, + cancel_guard, + )) } } diff --git a/pack/upload_pack.rs b/pack/upload_pack.rs index bea68d3..84d5bd7 100644 --- a/pack/upload_pack.rs +++ b/pack/upload_pack.rs @@ -6,9 +6,9 @@ use tokio::process::Command; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; +use super::CancellableReceiverStream; use crate::bare::GitBare; use crate::pb::UploadPackResponse; -use super::CancellableReceiverStream; /// Maximum time allowed for a git upload-pack process before it is killed. const UPLOAD_PACK_TIMEOUT: Duration = Duration::from_secs(600); // 10 minutes @@ -28,7 +28,8 @@ impl GitBare { input: impl tokio_stream::Stream> + Send + 'static, - ) -> Result>, tonic::Status> { + ) -> Result>, tonic::Status> + { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); tracing::info!( repo = %bare_dir, @@ -189,6 +190,9 @@ impl GitBare { let rx_stream = ReceiverStream::new(rx); let cancel_guard = cancel_token_clone.clone().drop_guard(); - Ok(super::CancellableReceiverStream::new(rx_stream, cancel_guard)) + Ok(super::CancellableReceiverStream::new( + rx_stream, + cancel_guard, + )) } } diff --git a/rate_limit.rs b/rate_limit.rs index 04591ad..34a7950 100644 --- a/rate_limit.rs +++ b/rate_limit.rs @@ -192,8 +192,7 @@ pub fn set_max_concurrent(max: usize) { .collect(); for key in keys { - l.semaphores - .insert(key, Arc::new(Semaphore::new(max))); + l.semaphores.insert(key, Arc::new(Semaphore::new(max))); } tracing::info!(max_concurrent = max, "rate limit max_concurrent updated"); diff --git a/refs/find_refs.rs b/refs/find_refs.rs index 155db93..6d9e807 100644 --- a/refs/find_refs.rs +++ b/refs/find_refs.rs @@ -69,7 +69,7 @@ impl GitBare { // Sort direction let sort_prefix = match SortDirection::try_from(request.sort_direction) { Ok(SortDirection::Asc) => "", - Ok(SortDirection::Desc) | _ => "-", + _ => "-", }; args.push(format!("--sort={sort_prefix}refname")); diff --git a/refs/update_refs.rs b/refs/update_refs.rs index 7de7ad8..0f6e206 100644 --- a/refs/update_refs.rs +++ b/refs/update_refs.rs @@ -53,12 +53,13 @@ impl GitBare { })?; } drop(child.stdin.take()); - let output = child.wait_with_output().map_err(|e| { - crate::error::GitError::CommandFailed { - status_code: None, - stderr: e.to_string(), - } - })?; + let output = + child + .wait_with_output() + .map_err(|e| crate::error::GitError::CommandFailed { + status_code: None, + stderr: e.to_string(), + })?; let stderr = String::from_utf8_lossy(&output.stderr).into_owned(); if !output.status.success() { diff --git a/remote/mirror.rs b/remote/mirror.rs index bb95c7d..56b2bdf 100644 --- a/remote/mirror.rs +++ b/remote/mirror.rs @@ -113,14 +113,14 @@ impl GitBare { ]) .output(); - if let Ok(out) = head_output { - if !out.status.success() { - tracing::warn!( - repo = %self.bare_dir.display(), - stderr = %String::from_utf8_lossy(&out.stderr).trim(), - "failed to auto-set remote HEAD" - ); - } + if let Ok(out) = head_output + && !out.status.success() + { + tracing::warn!( + repo = %self.bare_dir.display(), + stderr = %String::from_utf8_lossy(&out.stderr).trim(), + "failed to auto-set remote HEAD" + ); } Ok(UpdateRemoteMirrorResponse { diff --git a/repository/lang_stats.rs b/repository/lang_stats.rs index 9c6e828..a887b28 100644 --- a/repository/lang_stats.rs +++ b/repository/lang_stats.rs @@ -5,9 +5,7 @@ use gix::object::tree::EntryKind; use crate::bare::GitBare; use crate::error::{GitError, GitResult}; -use crate::pb::{ - GetLanguageStatsRequest, GetLanguageStatsResponse, LanguageStat, object_selector, -}; +use crate::pb::{GetLanguageStatsRequest, GetLanguageStatsResponse, LanguageStat, object_selector}; // Include the generated linguist rules include!(concat!(env!("OUT_DIR"), "/linguist_generated.rs")); @@ -181,10 +179,12 @@ impl GitBare { let mut resolved: HashMap = HashMap::new(); for (lang, s) in stats { let target = resolve_group(&lang).unwrap_or(&lang); - let entry = resolved.entry(target.to_string()).or_insert_with(|| LangStats { - lang_type: s.lang_type.clone(), - ..Default::default() - }); + let entry = resolved + .entry(target.to_string()) + .or_insert_with(|| LangStats { + lang_type: s.lang_type.clone(), + ..Default::default() + }); entry.file_count += s.file_count; entry.bytes += s.bytes; entry.lines += s.lines; @@ -214,7 +214,11 @@ impl GitBare { }) .collect(); - languages.sort_by(|a, b| b.bytes.cmp(&a.bytes).then_with(|| a.language.cmp(&b.language))); + languages.sort_by(|a, b| { + b.bytes + .cmp(&a.bytes) + .then_with(|| a.language.cmp(&b.language)) + }); Ok(GetLanguageStatsResponse { languages, @@ -283,10 +287,13 @@ impl GitBare { *ctx.total_bytes += size; *ctx.total_lines += lines; - let s = ctx.stats.entry(lang_key.clone()).or_insert_with(|| LangStats { - lang_type: lang_type.to_string(), - ..Default::default() - }); + let s = ctx + .stats + .entry(lang_key.clone()) + .or_insert_with(|| LangStats { + lang_type: lang_type.to_string(), + ..Default::default() + }); s.file_count += 1; s.bytes += size; s.lines += lines; diff --git a/repository/objects_size.rs b/repository/objects_size.rs index 9ce7eb2..0906c2f 100644 --- a/repository/objects_size.rs +++ b/repository/objects_size.rs @@ -57,7 +57,7 @@ impl GitBare { let parts: Vec<&str> = line.split_whitespace().collect(); if parts.len() >= 2 { let oid = parts[0]; - let found = parts.get(1).map_or(true, |&s| s != "missing"); + let found = parts.get(1).is_none_or(|&s| s != "missing"); let size = parts.get(1).and_then(|s| s.parse().ok()).unwrap_or(0); sizes.push(ObjectSize { oid: oid.to_string(), diff --git a/repository/optimize.rs b/repository/optimize.rs index cc1c591..fe54837 100644 --- a/repository/optimize.rs +++ b/repository/optimize.rs @@ -19,13 +19,13 @@ impl GitBare { let stats = self.get_repository_statistics()?; // Run commit-graph write if needed - if stats.commit_graph_size_bytes == 0 || strategy == OptimizeStrategy::Aggressive { - if let Ok(resp) = write_commit_graph(self, false, false) { - if !resp.ok { - stderr_all.push_str(&resp.stderr); - } - stdout_all.push_str(&resp.stdout); + if (stats.commit_graph_size_bytes == 0 || strategy == OptimizeStrategy::Aggressive) + && let Ok(resp) = write_commit_graph(self, false, false) + { + if !resp.ok { + stderr_all.push_str(&resp.stderr); } + stdout_all.push_str(&resp.stdout); } // Repack if many loose objects or packfiles @@ -42,13 +42,13 @@ impl GitBare { } // Prune if aggressive - if strategy == OptimizeStrategy::Aggressive { - if let Ok(resp) = run_gc(self, true, true) { - if !resp.ok { - stderr_all.push_str(&resp.stderr); - } - stdout_all.push_str(&resp.stdout); + if strategy == OptimizeStrategy::Aggressive + && let Ok(resp) = run_gc(self, true, true) + { + if !resp.ok { + stderr_all.push_str(&resp.stderr); } + stdout_all.push_str(&resp.stdout); } } OptimizeStrategy::Incremental => { diff --git a/repository/search_files.rs b/repository/search_files.rs index b3dd55a..62319dc 100644 --- a/repository/search_files.rs +++ b/repository/search_files.rs @@ -58,14 +58,14 @@ impl GitBare { // Format: path:line:col:matched_text if let Some((path_and_rest, matched)) = line.rsplit_once(':') { let prefix_parts: Vec<&str> = path_and_rest.rsplitn(3, ':').collect(); - if prefix_parts.len() >= 3 { - if let Ok(line_num) = prefix_parts[0].parse::() { - results.push(SearchResult { - path: prefix_parts[2].to_string(), - line: line_num, - matched_text: matched.to_string(), - }); - } + if prefix_parts.len() >= 3 + && let Ok(line_num) = prefix_parts[0].parse::() + { + results.push(SearchResult { + path: prefix_parts[2].to_string(), + line: line_num, + matched_text: matched.to_string(), + }); } } } diff --git a/sanitize.rs b/sanitize.rs index 41c40c1..2546a19 100644 --- a/sanitize.rs +++ b/sanitize.rs @@ -7,10 +7,20 @@ use crate::error::GitError; use crate::error::GitResult; /// Characters that are never allowed in git ref names / revision strings. +/// +/// Git disallows: space, `~`, `^`, `:`, `?`, `*`, `[`, `\`, and all ASCII +/// control characters (bytes 0–31 and 127). The control characters are +/// checked separately via `is_ascii_control()`. const FORBIDDEN_REF_CHARS: &[char] = &[ - '~', '^', ':', '?', '*', '[', '\\', ' ', '\n', '\r', '\t', '\0', + '~', '^', ':', '?', '*', '[', '\\', ' ', ]; +/// Returns true if `c` is an ASCII control character (bytes 0–31, 127). +fn is_ascii_control(c: char) -> bool { + let b = c as u32; + b <= 31 || b == 127 +} + /// Validate a git reference name (branch, tag, etc.). /// /// Git ref rules (from `git check-ref-format`): @@ -44,7 +54,7 @@ pub fn validate_ref_name(name: &str) -> GitResult<()> { "ref name cannot contain '@{{': {name}" ))); } - if name.contains(|c: char| FORBIDDEN_REF_CHARS.contains(&c)) { + if name.contains(|c: char| FORBIDDEN_REF_CHARS.contains(&c) || is_ascii_control(c)) { return Err(GitError::InvalidArgument(format!( "ref name contains forbidden character: {name}" ))); @@ -267,13 +277,7 @@ pub fn validate_config_key(key: &str) -> GitResult<()> { } /// Allowed URL schemes for git remotes. -const ALLOWED_REMOTE_SCHEMES: &[&str] = &[ - "http://", - "https://", - "ssh://", - "git://", - "git+ssh://", -]; +const ALLOWED_REMOTE_SCHEMES: &[&str] = &["http://", "https://", "ssh://", "git://", "git+ssh://"]; /// Validate a remote URL for git operations. /// @@ -309,16 +313,14 @@ pub fn validate_remote_url(url: &str) -> GitResult<()> { /// Refspecs must not contain null bytes, newlines, or shell metacharacters. pub fn validate_refspec(refspec: &str) -> GitResult<()> { if refspec.is_empty() { - return Err(GitError::InvalidArgument( - "refspec cannot be empty".into(), - )); + return Err(GitError::InvalidArgument("refspec cannot be empty".into())); } if refspec.contains('\0') || refspec.contains('\n') || refspec.contains('\r') { return Err(GitError::InvalidArgument( "refspec contains invalid characters".into(), )); } - if refspec.contains(|c: char| matches!(c, '$' | '`' | '(' | ')' | '{' | '}' | '|' | ';' | '&' | '<' | '>')) { + if refspec.contains(['$', '`', '(', ')', '{', '}', '|', ';', '&', '<', '>']) { return Err(GitError::InvalidArgument(format!( "refspec contains shell metacharacter: {refspec}" ))); diff --git a/server/mod.rs b/server/mod.rs index 91a4131..f3515a4 100644 --- a/server/mod.rs +++ b/server/mod.rs @@ -122,12 +122,15 @@ impl GitksService { pub fn cleanup_route_cache(&self) { let before = self.route_cache.len(); - self.route_cache.retain(|_key, cached| { - cached.created_at.elapsed() < ROUTE_CACHE_TTL - }); + self.route_cache + .retain(|_key, cached| cached.created_at.elapsed() < ROUTE_CACHE_TTL); let removed = before - self.route_cache.len(); if removed > 0 { - tracing::debug!(removed, remaining = self.route_cache.len(), "route cache cleaned"); + tracing::debug!( + removed, + remaining = self.route_cache.len(), + "route cache cleaned" + ); } } @@ -388,9 +391,10 @@ impl GitksService { &self, command: crate::actor::raft_log::Command, ) -> Result<(), tonic::Status> { - let actor = self.node_actor.as_ref().ok_or_else(|| { - tonic::Status::failed_precondition("node actor not initialized") - })?; + let actor = self + .node_actor + .as_ref() + .ok_or_else(|| tonic::Status::failed_precondition("node actor not initialized"))?; // Send the command to the actor for Raft processing let result = ractor::call_t!( @@ -405,7 +409,9 @@ impl GitksService { if success { Ok(()) } else { - Err(tonic::Status::aborted("Raft consensus failed: not leader or timeout")) + Err(tonic::Status::aborted( + "Raft consensus failed: not leader or timeout", + )) } } Err(e) => Err(tonic::Status::internal(format!("Raft write error: {e}"))), @@ -415,20 +421,16 @@ impl GitksService { /// Perform a ReadIndex check to ensure this node can serve consistent reads. /// This confirms the Leader is still valid before reading from local state. pub async fn raft_read_index(&self) -> Result<(), tonic::Status> { - let actor = self.node_actor.as_ref().ok_or_else(|| { - tonic::Status::failed_precondition("node actor not initialized") - })?; + let actor = self + .node_actor + .as_ref() + .ok_or_else(|| tonic::Status::failed_precondition("node actor not initialized"))?; let request = crate::actor::message::ReadIndexRequest { relative_path: String::new(), }; - let result = ractor::call_t!( - actor, - GitNodeMessage::ReadIndex, - 5000, - request - ); + let result = ractor::call_t!(actor, GitNodeMessage::ReadIndex, 5000, request); match result { Ok(response) => { @@ -436,7 +438,7 @@ impl GitksService { Ok(()) } else { Err(tonic::Status::failed_precondition( - "not leader, cannot serve consistent read" + "not leader, cannot serve consistent read", )) } } @@ -649,23 +651,73 @@ pub async fn serve( let span = tracing::info_span!("gitks.server", %addr); let _enter = span.enter(); tracing::info!("registering gRPC services"); + + let (health_reporter, health_service) = tonic_health::server::health_reporter(); + + let repo_svc = repository_service_server::RepositoryServiceServer::new(svc.clone()); + let archive_svc = archive_service_server::ArchiveServiceServer::new(svc.clone()); + let blame_svc = blame_service_server::BlameServiceServer::new(svc.clone()); + let branch_svc = branch_service_server::BranchServiceServer::new(svc.clone()); + let commit_svc = commit_service_server::CommitServiceServer::new(svc.clone()); + let diff_svc = diff_service_server::DiffServiceServer::new(svc.clone()); + let merge_svc = merge_service_server::MergeServiceServer::new(svc.clone()); + let pack_svc = pack_service_server::PackServiceServer::new(svc.clone()); + let ref_svc = ref_service_server::RefServiceServer::new(svc.clone()); + let remote_svc = remote_service_server::RemoteServiceServer::new(svc.clone()); + let tag_svc = tag_service_server::TagServiceServer::new(svc.clone()); + let tree_svc = tree_service_server::TreeServiceServer::new(svc); + + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + health_reporter + .set_serving::>() + .await; + let server = tonic::transport::Server::builder() - .add_service(repository_service_server::RepositoryServiceServer::new( - svc.clone(), - )) - .add_service(archive_service_server::ArchiveServiceServer::new( - svc.clone(), - )) - .add_service(blame_service_server::BlameServiceServer::new(svc.clone())) - .add_service(branch_service_server::BranchServiceServer::new(svc.clone())) - .add_service(commit_service_server::CommitServiceServer::new(svc.clone())) - .add_service(diff_service_server::DiffServiceServer::new(svc.clone())) - .add_service(merge_service_server::MergeServiceServer::new(svc.clone())) - .add_service(pack_service_server::PackServiceServer::new(svc.clone())) - .add_service(ref_service_server::RefServiceServer::new(svc.clone())) - .add_service(remote_service_server::RemoteServiceServer::new(svc.clone())) - .add_service(tag_service_server::TagServiceServer::new(svc.clone())) - .add_service(tree_service_server::TreeServiceServer::new(svc)); + .add_service(health_service) + .add_service(repo_svc) + .add_service(archive_svc) + .add_service(blame_svc) + .add_service(branch_svc) + .add_service(commit_svc) + .add_service(diff_svc) + .add_service(merge_svc) + .add_service(pack_svc) + .add_service(ref_svc) + .add_service(remote_svc) + .add_service(tag_svc) + .add_service(tree_svc); tracing::info!("server ready, starting to accept connections"); server.serve(addr).await } diff --git a/server/pack.rs b/server/pack.rs index d7cc9f6..9db0d3d 100644 --- a/server/pack.rs +++ b/server/pack.rs @@ -1,9 +1,9 @@ use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; +use crate::pack::CancellableReceiverStream; use crate::pb::pack_service_client::PackServiceClient; use crate::pb::*; -use crate::pack::CancellableReceiverStream; use super::{GitksService, into_status}; diff --git a/server/remote.rs b/server/remote.rs index 4c000e9..69f18cd 100644 --- a/server/remote.rs +++ b/server/remote.rs @@ -12,11 +12,16 @@ impl RemoteService for GitksService { ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.RemoteService/FindRemoteRepository"); let inner = request.into_inner(); - let span = tracing::info_span!("remote.find_remote_repository", remote_url = %inner.remote_url); + let span = + tracing::info_span!("remote.find_remote_repository", remote_url = %inner.remote_url); let _enter = span.enter(); tracing::info!(remote_url = %inner.remote_url, "find_remote_repository"); let resp = find_remote_repository(inner).map_err(super::into_status)?; - tracing::info!(refs_count = resp.refs.len(), exists = resp.exists, "find_remote_repository done"); + tracing::info!( + refs_count = resp.refs.len(), + exists = resp.exists, + "find_remote_repository done" + ); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -27,7 +32,8 @@ impl RemoteService for GitksService { ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.RemoteService/FindRemoteRootRef"); let inner = request.into_inner(); - let span = tracing::info_span!("remote.find_remote_root_ref", remote_url = %inner.remote_url); + let span = + tracing::info_span!("remote.find_remote_root_ref", remote_url = %inner.remote_url); let _enter = span.enter(); tracing::info!(remote_url = %inner.remote_url, "find_remote_root_ref"); let resp = find_remote_root_ref(inner).map_err(super::into_status)?; diff --git a/tests/lang_stats_test.rs b/tests/lang_stats_test.rs index cbcb34f..8ea6305 100644 --- a/tests/lang_stats_test.rs +++ b/tests/lang_stats_test.rs @@ -7,7 +7,11 @@ use gitks::repository::lang_stats::{EXTENSION_MAP, FILENAME_MAP}; fn test_extension_map_lookup() { // Verify .md is in the map let result = EXTENSION_MAP.binary_search_by(|&(e, _, _)| e.cmp(".md")); - assert!(result.is_ok(), ".md should be in EXTENSION_MAP, got {:?}", result); + assert!( + result.is_ok(), + ".md should be in EXTENSION_MAP, got {:?}", + result + ); let idx = result.unwrap(); assert_eq!(EXTENSION_MAP[idx].1, "Markdown"); assert_eq!(EXTENSION_MAP[idx].2, "prose"); @@ -122,10 +126,7 @@ fn test_language_stats_with_path() { // Should NOT find README.md (it's at root level) let md = resp.languages.iter().find(|l| l.language == "Markdown"); - assert!( - md.is_none(), - "should not find Markdown in src/ directory" - ); + assert!(md.is_none(), "should not find Markdown in src/ directory"); } #[test] @@ -145,6 +146,9 @@ fn test_language_stats_line_count_excludes_blank_lines() { let md = resp.languages.iter().find(|l| l.language == "Markdown"); if let Some(md) = md { // README.md: "# Test" and "Updated." are non-blank = 2 lines - assert!(md.lines >= 2, "should count at least 2 code lines for README.md"); + assert!( + md.lines >= 2, + "should count at least 2 code lines for README.md" + ); } }