From c32a7cad2f3e15ff24d96ea376449411c75c4135 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 18:33:42 +0800 Subject: [PATCH] feat(cluster): implement Raft consensus with tracing and HTTP support - Add Raft log and snapshot mechanisms for distributed consensus - Integrate hyper HTTP server and client libraries for network communication - Enhance tracing capabilities with structured logging and spans - Add dependency tracking for new consensus-related crates - Implement snapshot storage with serialization and persistence - Add remote repository synchronization via Raft commands - Include comprehensive tracing instrumentation across services --- Cargo.lock | 90 +++++++++++++++++++++ Cargo.toml | 8 +- actor/mod.rs | 14 +++- actor/server.rs | 4 +- actor/snapshot.rs | 202 ++++++++++++++++++++++++++++++++++++++++++++++ actor/sync.rs | 50 ++++++++++++ server/commit.rs | 36 +++++++++ server/diff.rs | 12 +++ server/refs.rs | 30 +++++++ server/remote.rs | 13 +++ 10 files changed, 453 insertions(+), 6 deletions(-) create mode 100644 actor/snapshot.rs diff --git a/Cargo.lock b/Cargo.lock index ec22726..d44f8b0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -372,6 +372,15 @@ dependencies = [ "parking_lot_core", ] +[[package]] +name = "deranged" +version = "0.5.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7cd812cc2bc1d69d4764bd80df88b4317eaef9e773c75226407d9bc0876b211c" +dependencies = [ + "powerfmt", +] + [[package]] name = "digest" version = "0.10.7" @@ -686,12 +695,17 @@ name = "gitks" version = "1.0.0" dependencies = [ "async-trait", + "bytes", + "crc32fast", "dashmap", "dotenvy", "duct", "etcd-client", "gix", "gix-archive", + "http-body-util", + "hyper", + "hyper-util", "moka", "prost", "prost-types", @@ -710,6 +724,7 @@ dependencies = [ "tonic-prost", "tonic-prost-build", "tracing", + "tracing-appender", "tracing-subscriber", ] @@ -2039,6 +2054,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "num-conv" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "521739c6d2bac4aa25192232afe6841231376b2b26d4d9fae5ecf8ca5772e441" + [[package]] name = "once_cell" version = "1.21.4" @@ -2136,6 +2157,12 @@ dependencies = [ "portable-atomic", ] +[[package]] +name = "powerfmt" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" + [[package]] name = "ppv-lite86" version = "0.2.21" @@ -2791,6 +2818,12 @@ version = "2.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" +[[package]] +name = "symlink" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7973cce6668464ea31f176d85b13c7ab3bba2cb3b77a2ed26abd7801688010a" + [[package]] name = "syn" version = "2.0.117" @@ -2867,6 +2900,37 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "time" +version = "0.3.47" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "743bd48c283afc0388f9b8827b976905fb217ad9e647fae3a379a9283c4def2c" +dependencies = [ + "deranged", + "itoa", + "num-conv", + "powerfmt", + "serde_core", + "time-core", + "time-macros", +] + +[[package]] +name = "time-core" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7694e1cfe791f8d31026952abf09c69ca6f6fa4e1a1229e18988f06a04a12dca" + +[[package]] +name = "time-macros" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e70e4c5a0e0a8a4823ad65dfe1a6930e4f4d756dcd9dd7939022b5e8c501215" +dependencies = [ + "num-conv", + "time-core", +] + [[package]] name = "tinyvec" version = "1.11.0" @@ -3081,6 +3145,19 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-appender" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "050686193eb999b4bb3bc2acfa891a13da00f79734704c4b8b4ef1a10b368a3c" +dependencies = [ + "crossbeam-channel", + "symlink", + "thiserror", + "time", + "tracing-subscriber", +] + [[package]] name = "tracing-attributes" version = "0.1.31" @@ -3113,6 +3190,16 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-serde" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1" +dependencies = [ + "serde", + "tracing-core", +] + [[package]] name = "tracing-subscriber" version = "0.3.23" @@ -3123,12 +3210,15 @@ dependencies = [ "nu-ansi-term", "once_cell", "regex-automata", + "serde", + "serde_json", "sharded-slab", "smallvec", "thread_local", "tracing", "tracing-core", "tracing-log", + "tracing-serde", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 34b2ca3..089fd1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,6 +24,8 @@ gix = { version = "0.84", default-features = false, features = ["serde", "blame" gix-archive = { version = "0.33", features = ["sha256","sha1","document-features"] } duct = { version = "1", features = [] } tracing = { version = "0.1", features = ["log"] } +tracing-subscriber = { version = "0.3", features = ["env-filter", "json"] } +tracing-appender = "0.2" tokio = { version = "1", features = ["rt-multi-thread", "macros", "process", "io-util", "sync", "net"] } tokio-stream = { version = "0.1", features = ["full"] } tokio-util = "0.7" @@ -34,12 +36,16 @@ tonic = { version = "0.14", features = ["transport"] } tonic-prost = "0.14" tempfile = "3" dotenvy = "0.15" -tracing-subscriber = { version = "0.3", features = ["env-filter"] } ractor = { version = "0.15.13", features = ["cluster","tokio_runtime","monitors","message_span_propogation","async-trait"]} ractor_cluster = { version = "0.15.13", features = ["async-trait"] } async-trait = "0.1.89" etcd-client = { version = "0.18.0", features = ["tls"] } dashmap = "6" +hyper = { version = "1", features = ["server", "http1"] } +hyper-util = { version = "0.1", features = ["tokio"] } +http-body-util = "0.1" +bytes = "1" +crc32fast = "1" [[bin]] name = "gitks" path = "main.rs" diff --git a/actor/mod.rs b/actor/mod.rs index c8ba24b..ad99349 100644 --- a/actor/mod.rs +++ b/actor/mod.rs @@ -1,15 +1,21 @@ pub mod handler; pub mod message; +pub mod raft_log; pub mod server; +pub mod snapshot; pub mod sync; pub use handler::find_primary_in_cluster; pub use handler::{ - GitNodeActor, GitNodeArgs, RepoEntry, broadcast_ref_update, broadcast_role_changed, - get_category_members, get_cluster_nodes, list_all_groups, route_group_for, start_node_actor, + broadcast_append_entries, broadcast_ref_update, broadcast_role_changed, + get_category_members, get_cluster_nodes, is_leader_lease_valid, list_all_groups, + renew_leader_lease, route_group_for, start_node_actor, GitNodeActor, GitNodeArgs, RepoEntry, }; pub use message::{ - ElectionRequest, ElectionResult, GitNodeMessage, NodeHealth, ROLE_PRIMARY, ROLE_REPLICA, - RefUpdateEvent, RepoActorMessage, RoleChangedEvent, RouteDecision, + AppendEntriesRequest, AppendEntriesResponse, ElectionRequest, ElectionResult, + GitNodeMessage, NodeHealth, ReadIndexRequest, ReadIndexResponse, RefUpdateEvent, + RepoActorMessage, RoleChangedEvent, RouteDecision, SerializedRaftEntry, + ROLE_PRIMARY, ROLE_REPLICA, RAFT_MSG_VERSION, }; +pub use raft_log::{Command as RaftCommand, LogEntry as RaftLogEntry, RaftLog}; pub use server::init_actor_cluster; diff --git a/actor/server.rs b/actor/server.rs index 61776b9..857b435 100644 --- a/actor/server.rs +++ b/actor/server.rs @@ -2,14 +2,16 @@ use crate::actor::handler::start_node_actor; use crate::actor::message::GitNodeMessage; use crate::server::GitksService; use ractor::ActorRef; +use std::path::PathBuf; pub async fn init_actor_cluster( service: GitksService, storage_name: String, grpc_addr: String, + data_dir: PathBuf, ) -> Result<(ActorRef, tokio::task::JoinHandle<()>), ractor::SpawnErr> { tracing::info!(storage_name = %storage_name, grpc_addr = %grpc_addr, "initializing actor cluster"); - let result = start_node_actor(service, storage_name.clone(), grpc_addr).await?; + let result = start_node_actor(service, storage_name.clone(), grpc_addr, data_dir).await?; tracing::info!(storage_name = %storage_name, "actor cluster ready"); Ok(result) } diff --git a/actor/snapshot.rs b/actor/snapshot.rs new file mode 100644 index 0000000..9f0f676 --- /dev/null +++ b/actor/snapshot.rs @@ -0,0 +1,202 @@ +//! Raft log snapshot mechanism for log compaction. +//! +//! When the Raft log grows beyond the size threshold, a snapshot is created +//! that captures the current state of all repositories. Old log entries before +//! the snapshot are then discarded. +//! +//! Snapshot format: +//! - `raft-snapshot.dat`: Contains the serialized state at a given log index +//! +//! The snapshot includes: +//! - All repository entries (path, role, last_commit) +//! - The log index at which the snapshot was taken +//! - The term at which the snapshot was taken + +use std::collections::HashMap; +use std::io::{BufReader, BufWriter, Read, Write}; +use std::path::{Path, PathBuf}; + +use crate::actor::handler::RepoEntry; +use crate::error::{GitError, GitResult}; + +/// Snapshot metadata and data. +#[derive(Debug, Clone)] +pub struct RaftSnapshot { + /// The log index at which this snapshot was taken. + pub last_included_index: u64, + /// The term at which this snapshot was taken. + pub last_included_term: u64, + /// All repository entries at the time of the snapshot. + pub repos: HashMap, +} + +impl RaftSnapshot { + /// Create a new snapshot from the current state. + pub fn new( + last_included_index: u64, + last_included_term: u64, + repos: HashMap, + ) -> Self { + Self { + last_included_index, + last_included_term, + repos, + } + } + + /// Serialize the snapshot to bytes. + pub fn encode(&self) -> Vec { + let mut buf = Vec::new(); + // Header + buf.extend(self.last_included_index.to_be_bytes()); + buf.extend(self.last_included_term.to_be_bytes()); + // Repository count + buf.extend((self.repos.len() as u32).to_be_bytes()); + // Each repository entry + for (path, entry) in &self.repos { + encode_string(&mut buf, path); + encode_string(&mut buf, &entry.role); + encode_string(&mut buf, &entry.last_commit); + buf.push(if entry.read_only { 1 } else { 0 }); + } + buf + } + + /// Deserialize a snapshot from bytes. + pub fn decode(data: &[u8]) -> Option { + if data.len() < 20 { + return None; + } + let mut offset = 0; + let last_included_index = read_u64(data, &mut offset)?; + let last_included_term = read_u64(data, &mut offset)?; + let repo_count = read_u32(data, &mut offset)? as usize; + + let mut repos = HashMap::with_capacity(repo_count); + for _ in 0..repo_count { + let path = read_string(data, &mut offset)?; + let role = read_string(data, &mut offset)?; + let last_commit = read_string(data, &mut offset)?; + let read_only = data.get(offset).copied().unwrap_or(0) == 1; + offset += 1; + repos.insert(path, RepoEntry { + role, + last_commit, + read_only, + }); + } + + Some(Self { + last_included_index, + last_included_term, + repos, + }) + } +} + +/// Snapshot storage manager. +pub struct SnapshotStorage { + snapshot_path: PathBuf, +} + +impl SnapshotStorage { + pub fn new(data_dir: &Path) -> Self { + Self { + snapshot_path: data_dir.join("raft-snapshot.dat"), + } + } + + /// Save a snapshot to disk. + pub fn save(&self, snapshot: &RaftSnapshot) -> GitResult<()> { + let data = snapshot.encode(); + let file = std::fs::File::create(&self.snapshot_path).map_err(GitError::Io)?; + let mut writer = BufWriter::new(file); + writer.write_all(&data).map_err(GitError::Io)?; + writer.flush().map_err(GitError::Io)?; + + tracing::info!( + index = snapshot.last_included_index, + term = snapshot.last_included_term, + repos = snapshot.repos.len(), + "raft snapshot saved" + ); + Ok(()) + } + + /// Load a snapshot from disk. + pub fn load(&self) -> GitResult> { + if !self.snapshot_path.exists() { + return Ok(None); + } + + let file = std::fs::File::open(&self.snapshot_path).map_err(GitError::Io)?; + let mut reader = BufReader::new(file); + let mut data = Vec::new(); + reader.read_to_end(&mut data).map_err(GitError::Io)?; + + match RaftSnapshot::decode(&data) { + Some(snapshot) => { + tracing::info!( + index = snapshot.last_included_index, + term = snapshot.last_included_term, + repos = snapshot.repos.len(), + "raft snapshot loaded" + ); + Ok(Some(snapshot)) + } + None => { + tracing::warn!("failed to decode raft snapshot, ignoring"); + Ok(None) + } + } + } + + /// Check if a snapshot exists. + pub fn exists(&self) -> bool { + self.snapshot_path.exists() + } + + /// Delete the snapshot file. + pub fn delete(&self) -> GitResult<()> { + if self.snapshot_path.exists() { + std::fs::remove_file(&self.snapshot_path).map_err(GitError::Io)?; + } + Ok(()) + } +} + +// ── Helper functions ───────────────────────────────────────── + +fn encode_string(buf: &mut Vec, s: &str) { + let bytes = s.as_bytes(); + buf.extend((bytes.len() as u32).to_be_bytes()); + buf.extend(bytes); +} + +fn read_u32(data: &[u8], offset: &mut usize) -> Option { + if *offset + 4 > data.len() { + return None; + } + let val = u32::from_be_bytes(data[*offset..*offset + 4].try_into().ok()?); + *offset += 4; + Some(val) +} + +fn read_u64(data: &[u8], offset: &mut usize) -> Option { + if *offset + 8 > data.len() { + return None; + } + let val = u64::from_be_bytes(data[*offset..*offset + 8].try_into().ok()?); + *offset += 8; + Some(val) +} + +fn read_string(data: &[u8], offset: &mut usize) -> Option { + let len = read_u32(data, offset)? as usize; + if *offset + len > data.len() { + return None; + } + let s = String::from_utf8_lossy(&data[*offset..*offset + len]).into_owned(); + *offset += len; + Some(s) +} diff --git a/actor/sync.rs b/actor/sync.rs index 23c4a61..170782c 100644 --- a/actor/sync.rs +++ b/actor/sync.rs @@ -355,3 +355,53 @@ fn update_local_ref(repo_path: &Path, ref_name: &str, new_oid: &str) { Err(e) => tracing::error!(ref_name = %ref_name, error = %e, "update-ref spawn failed"), } } + +/// Apply a committed Raft command to the local git repository. +/// This is called on followers when they receive committed entries from the leader. +pub fn apply_raft_command_to_repo( + repo_prefix: &Path, + command: &crate::actor::raft_log::Command, +) { + match command { + crate::actor::raft_log::Command::RefUpdate { + relative_path, + ref_name, + old_oid: _, + new_oid, + } => { + let repo_path = repo_prefix.join(relative_path); + tracing::info!( + relative_path = %relative_path, + ref_name = %ref_name, + new_oid = %new_oid, + "applying RefUpdate from Raft log to local repo" + ); + update_local_ref(&repo_path, ref_name, new_oid); + } + crate::actor::raft_log::Command::RegisterRepo { + relative_path, + storage_name: _, + } => { + tracing::info!( + relative_path = %relative_path, + "RegisterRepo from Raft log (no git action needed)" + ); + } + crate::actor::raft_log::Command::RemoveRepo { relative_path } => { + tracing::info!( + relative_path = %relative_path, + "RemoveRepo from Raft log (no git action needed)" + ); + } + crate::actor::raft_log::Command::SetPrimary { + storage_name, + relative_paths, + } => { + tracing::info!( + storage_name = %storage_name, + paths = relative_paths.len(), + "SetPrimary from Raft log (no git action needed)" + ); + } + } +} diff --git a/server/commit.rs b/server/commit.rs index 8a3822f..876fdb5 100644 --- a/server/commit.rs +++ b/server/commit.rs @@ -283,8 +283,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/FindCommit"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.find_commit", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.find_commit(inner).map_err(into_status)?; + tracing::info!(%repo, "find_commit done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -296,8 +300,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/ListCommitsByOid"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.list_commits_by_oid", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.list_commits_by_oid(inner).map_err(into_status)?; + tracing::info!(%repo, count = resp.commits.len(), "list_commits_by_oid done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -309,8 +317,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/CommitIsAncestor"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.commit_is_ancestor", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.commit_is_ancestor(inner).map_err(into_status)?; + tracing::info!(%repo, is_ancestor = resp.is_ancestor, "commit_is_ancestor done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -322,8 +334,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/CheckObjectsExist"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.check_objects_exist", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.check_objects_exist(inner).map_err(into_status)?; + tracing::info!(%repo, "check_objects_exist done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -335,8 +351,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/CommitsByMessage"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.commits_by_message", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.commits_by_message(inner).map_err(into_status)?; + tracing::info!(%repo, count = resp.commits.len(), "commits_by_message done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -348,8 +368,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/GetCommitStats"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.get_commit_stats", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.get_commit_stats(inner).map_err(into_status)?; + tracing::info!(%repo, additions = resp.additions, deletions = resp.deletions, changed_files = resp.changed_files, "get_commit_stats done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -361,8 +385,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/LastCommitForPath"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.last_commit_for_path", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.last_commit_for_path(inner).map_err(into_status)?; + tracing::info!(%repo, "last_commit_for_path done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -374,8 +402,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/CountCommits"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.count_commits", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.count_commits(inner).map_err(into_status)?; + tracing::info!(%repo, count = resp.count, "count_commits done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -387,8 +419,12 @@ impl commit_service_server::CommitService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.CommitService/CountDivergingCommits"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("commit.count_diverging_commits", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.count_diverging_commits(inner).map_err(into_status)?; + tracing::info!(%repo, left = resp.left_count, right = resp.right_count, "count_diverging_commits done"); m.record("ok"); Ok(tonic::Response::new(resp)) } diff --git a/server/diff.rs b/server/diff.rs index 4ad0f3a..f13f98f 100644 --- a/server/diff.rs +++ b/server/diff.rs @@ -182,8 +182,12 @@ impl diff_service_server::DiffService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.DiffService/RawDiff"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("diff.raw_diff", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let chunks = gb.raw_diff(inner).map_err(into_status)?; + tracing::info!(%repo, "raw_diff streaming started"); m.record("ok"); Ok(tonic::Response::new(into_stream(chunks))) } @@ -195,8 +199,12 @@ impl diff_service_server::DiffService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.DiffService/RawPatch"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("diff.raw_patch", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let chunks = gb.raw_patch(inner).map_err(into_status)?; + tracing::info!(%repo, "raw_patch streaming started"); m.record("ok"); Ok(tonic::Response::new(into_stream(chunks))) } @@ -208,8 +216,12 @@ impl diff_service_server::DiffService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.DiffService/FindChangedPaths"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("diff.find_changed_paths", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.find_changed_paths(inner).map_err(into_status)?; + tracing::info!(%repo, paths = resp.paths.len(), "find_changed_paths done"); m.record("ok"); Ok(tonic::Response::new(resp)) } diff --git a/server/refs.rs b/server/refs.rs index 53fe3c1..c21e04f 100644 --- a/server/refs.rs +++ b/server/refs.rs @@ -12,8 +12,12 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/FindDefaultBranchName"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("ref.find_default_branch_name", %repo); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.find_default_branch_name().map_err(super::into_status)?; + tracing::info!(%repo, "find_default_branch_name done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -25,8 +29,13 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/RefExists"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let ref_name = inner.ref_name.clone(); + let span = tracing::info_span!("ref.ref_exists", %repo, %ref_name); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.ref_exists(inner).map_err(super::into_status)?; + tracing::info!(%repo, %ref_name, exists = resp.exists, "ref_exists done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -38,8 +47,13 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/UpdateReferences"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let updates_count = inner.updates.len(); + let span = tracing::info_span!("ref.update_references", %repo, %updates_count); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.update_references(inner).map_err(super::into_status)?; + tracing::info!(%repo, %updates_count, failed = resp.failed_refs.len(), "update_references done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -51,8 +65,13 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/DeleteRefs"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let ref_count = inner.ref_names.len(); + let span = tracing::info_span!("ref.delete_refs", %repo, %ref_count); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.delete_refs(inner).map_err(super::into_status)?; + tracing::info!(%repo, %ref_count, failed = resp.failed_refs.len(), "delete_refs done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -64,8 +83,13 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/FindRefsByOID"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let oid = inner.oid.clone(); + let span = tracing::info_span!("ref.find_refs_by_oid", %repo, %oid); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.find_refs_by_oid(inner).map_err(super::into_status)?; + tracing::info!(%repo, %oid, count = resp.refs.len(), "find_refs_by_oid done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -77,8 +101,14 @@ impl RefService for GitksService { let m = crate::metrics::RequestMetrics::new("gitks.RefService/ListRefs"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; + let repo = self.repo_label(inner.repository.as_ref()); + let prefixes = inner.prefixes.len(); + let pattern = inner.pattern.clone(); + let span = tracing::info_span!("ref.list_refs", %repo, %prefixes, %pattern); + let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.list_all_refs(inner).map_err(super::into_status)?; + tracing::info!(%repo, count = resp.refs.len(), "list_refs done"); m.record("ok"); Ok(tonic::Response::new(resp)) } diff --git a/server/remote.rs b/server/remote.rs index 747f44b..4c000e9 100644 --- a/server/remote.rs +++ b/server/remote.rs @@ -12,7 +12,11 @@ impl RemoteService for GitksService { ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.RemoteService/FindRemoteRepository"); let inner = request.into_inner(); + let span = tracing::info_span!("remote.find_remote_repository", remote_url = %inner.remote_url); + let _enter = span.enter(); + tracing::info!(remote_url = %inner.remote_url, "find_remote_repository"); let resp = find_remote_repository(inner).map_err(super::into_status)?; + tracing::info!(refs_count = resp.refs.len(), exists = resp.exists, "find_remote_repository done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -23,7 +27,11 @@ impl RemoteService for GitksService { ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.RemoteService/FindRemoteRootRef"); let inner = request.into_inner(); + let span = tracing::info_span!("remote.find_remote_root_ref", remote_url = %inner.remote_url); + let _enter = span.enter(); + tracing::info!(remote_url = %inner.remote_url, "find_remote_root_ref"); let resp = find_remote_root_ref(inner).map_err(super::into_status)?; + tracing::info!(ref_name = %resp.ref_name, target_oid = %resp.target_oid, "find_remote_root_ref done"); m.record("ok"); Ok(tonic::Response::new(resp)) } @@ -34,9 +42,14 @@ impl RemoteService for GitksService { ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.RemoteService/UpdateRemoteMirror"); let inner = request.into_inner(); + let repo = self.repo_label(inner.repository.as_ref()); + let span = tracing::info_span!("remote.update_remote_mirror", %repo); + let _enter = span.enter(); + tracing::info!(repo = %repo, remote_url = %inner.remote_url, force = inner.force, prune = inner.prune, "update_remote_mirror"); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.update_remote_mirror(inner).map_err(super::into_status)?; + tracing::info!(ok = resp.ok, "update_remote_mirror done"); m.record("ok"); Ok(tonic::Response::new(resp)) }