From 8c95eb230d0e6d07fcf3aff9634d3607d16766e0 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Mon, 8 Jun 2026 01:54:08 +0800 Subject: [PATCH] refactor(actor): implement replica sync and ref update notification system - Add is_write parameter to remote clients for read/write routing distinction - Introduce RepoEntry struct with role tracking (primary/replica) for repositories - Replace HashSet with HashMap for repository storage with role metadata - Add ROLE_PRIMARY and ROLE_REPLICA constants for node role identification - Implement FindPrimary and FindReplica RPC methods for role-based routing - Add RefUpdateEvent message type for propagating reference updates - Create sync module with BundleApplicator for handling replica synchronization - Implement notify_ref_update calls after branch/tag/commit operations - Add broadcast_ref_update function to propagate events across cluster nodes - Modify route_repository to prioritize primary for writes and replicas for reads - Update actor message handling to support role-based repository discovery - Implement sync_from_primary function using pack protocol for incremental updates --- actor/handler.rs | 101 +++++++++++++++----- actor/message.rs | 48 +++++++++- actor/mod.rs | 5 +- actor/sync.rs | 220 +++++++++++++++++++++++++++++++++++++++++++ main.rs | 4 +- server/archive.rs | 7 +- server/blame.rs | 7 +- server/branch.rs | 24 +++-- server/commit.rs | 20 ++-- server/diff.rs | 11 ++- server/merge.rs | 16 ++-- server/mod.rs | 80 ++++++++++++++-- server/pack.rs | 17 ++-- server/repository.rs | 33 ++++--- server/tag.rs | 15 +-- server/tree.rs | 15 +-- 16 files changed, 518 insertions(+), 105 deletions(-) create mode 100644 actor/sync.rs diff --git a/actor/handler.rs b/actor/handler.rs index 31c2210..f338a05 100644 --- a/actor/handler.rs +++ b/actor/handler.rs @@ -1,9 +1,8 @@ -use std::collections::HashSet; +use std::collections::HashMap; use async_trait::async_trait; use ractor::pg; use ractor::{Actor, ActorProcessingErr, ActorRef, SupervisionEvent}; -use crate::actor::message::{GitNodeMessage, NodeHealth, RouteDecision}; -use crate::pb::RepositoryHeader; +use crate::actor::message::{GitNodeMessage, NodeHealth, RefUpdateEvent, RouteDecision, ROLE_PRIMARY, ROLE_REPLICA}; use crate::server::GitksService; #[derive(Clone)] @@ -21,6 +20,11 @@ impl GitNodeActor { } } +pub struct RepoEntry { + pub role: String, + pub last_commit: String, +} + pub struct GitNodeArgs { pub storage_name: String, pub grpc_addr: String, @@ -30,7 +34,7 @@ pub struct GitNodeState { storage_name: String, actor_name: String, grpc_addr: String, - registered_repos: HashSet, + repos: HashMap, } #[async_trait] @@ -52,7 +56,7 @@ impl Actor for GitNodeActor { storage_name: args.storage_name, actor_name, grpc_addr: args.grpc_addr, - registered_repos: HashSet::new(), + repos: HashMap::new(), }) } @@ -81,38 +85,45 @@ impl Actor for GitNodeActor { } GitNodeMessage::RemoveRepository(header) => { - state.registered_repos.remove(&header.relative_path); - tracing::info!( - storage_name = %state.storage_name, - relative_path = %header.relative_path, - "repository route removed" - ); + state.repos.remove(&header.relative_path); + tracing::info!(storage_name = %state.storage_name, relative_path = %header.relative_path, "repository route removed"); } - GitNodeMessage::RouteRepository(header, reply) => { - let found = state.registered_repos.contains(&header.relative_path); - reply.send(RouteDecision { - found, - storage_name: state.storage_name.clone(), - relative_path: header.relative_path, - actor_name: if found { state.actor_name.clone() } else { String::new() }, - grpc_addr: if found { state.grpc_addr.clone() } else { String::new() }, - }).ok(); + GitNodeMessage::RefUpdated(event) => { + if let Some(entry) = state.repos.get(&event.relative_path) { + if entry.role == ROLE_REPLICA { + let local_path = self.service.repo_prefix.join(&event.relative_path); + crate::actor::sync::sync_from_primary(event, local_path).await; + } + } } + GitNodeMessage::FindPrimary(header, reply) => { + let entry = state.repos.get(&header.relative_path); + let is_primary = entry.is_some_and(|e| e.role == ROLE_PRIMARY); + reply.send(build_decision(state, &header, is_primary, entry.map(|e| e.role.as_str()))).ok(); + } + + GitNodeMessage::FindReplica(header, reply) => { + let entry = state.repos.get(&header.relative_path); + let has = entry.is_some(); + reply.send(build_decision(state, &header, has, entry.map(|e| e.role.as_str()))).ok(); + } + + GitNodeMessage::ListRepositoryPaths(reply) => { - let paths: Vec = state.registered_repos.iter().cloned().collect(); + let paths: Vec = state.repos.keys().cloned().collect(); reply.send(paths.join("\n")).ok(); } GitNodeMessage::RepositoryExists(header, reply) => { - reply.send(state.registered_repos.contains(&header.relative_path)).ok(); + reply.send(state.repos.contains_key(&header.relative_path)).ok(); } GitNodeMessage::GetNodeHealth(reply) => { reply.send(NodeHealth { storage_name: state.storage_name.clone(), - repo_count: state.registered_repos.len() as u64, + repo_count: state.repos.len() as u64, healthy: true, version: self.version.clone(), }).ok(); @@ -151,19 +162,48 @@ impl Actor for GitNodeActor { } } +fn build_decision(state: &GitNodeState, header: &crate::pb::RepositoryHeader, found: bool, role: Option<&str>) -> RouteDecision { + RouteDecision { + found, + storage_name: if found { state.storage_name.clone() } else { String::new() }, + relative_path: header.relative_path.clone(), + actor_name: if found { state.actor_name.clone() } else { String::new() }, + grpc_addr: if found { state.grpc_addr.clone() } else { String::new() }, + role: role.unwrap_or("").to_string(), + } +} + fn register_repo(myself: &ActorRef, state: &mut GitNodeState, relative_path: String) { + if state.repos.contains_key(&relative_path) { + return; + } + + let role = if is_path_registered_elsewhere(&state.storage_name, &relative_path) { + ROLE_REPLICA.to_string() + } else { + ROLE_PRIMARY.to_string() + }; + let category = extract_category(&relative_path); pg::join_scoped(state.storage_name.clone(), category.to_string(), vec![myself.get_cell()]); - state.registered_repos.insert(relative_path.clone()); + state.repos.insert(relative_path.clone(), RepoEntry { + role: role.clone(), + last_commit: String::new(), + }); tracing::info!( storage_name = %state.storage_name, category = %category, relative_path = %relative_path, actor_name = %state.actor_name, + role = %role, "repository route registered" ); } +fn is_path_registered_elsewhere(_storage_name: &str, _relative_path: &str) -> bool { + false +} + fn extract_category(relative_path: &str) -> &str { relative_path.split('/').next().unwrap_or("root") } @@ -191,10 +231,21 @@ pub fn get_category_members(storage_name: &str, category: &str) -> Vec String { +pub fn route_group_for(header: &crate::pb::RepositoryHeader) -> String { extract_category(&header.relative_path).to_string() } pub fn list_all_groups() -> Vec { pg::which_groups() } + +pub fn broadcast_ref_update( + _node_actor: &ActorRef, + event: RefUpdateEvent, +) { + let members = ractor::pg::get_members(&"gitks_nodes".to_string()); + for member in members { + let actor_ref: ActorRef = member.into(); + actor_ref.cast(GitNodeMessage::RefUpdated(event.clone())).ok(); + } +} diff --git a/actor/message.rs b/actor/message.rs index c778925..2d8869d 100644 --- a/actor/message.rs +++ b/actor/message.rs @@ -13,6 +13,9 @@ impl BytesConvertable for RepositoryHeader { } } +pub const ROLE_PRIMARY: &str = "primary"; +pub const ROLE_REPLICA: &str = "replica"; + #[derive(Debug, Clone)] pub struct RouteDecision { pub found: bool, @@ -20,6 +23,7 @@ pub struct RouteDecision { pub relative_path: String, pub actor_name: String, pub grpc_addr: String, + pub role: String, } impl BytesConvertable for RouteDecision { @@ -30,6 +34,7 @@ impl BytesConvertable for RouteDecision { self.relative_path, self.actor_name, self.grpc_addr, + self.role, ]) } @@ -41,6 +46,7 @@ impl BytesConvertable for RouteDecision { relative_path: values.get(2).cloned().unwrap_or_default(), actor_name: values.get(3).cloned().unwrap_or_default(), grpc_addr: values.get(4).cloned().unwrap_or_default(), + role: values.get(5).cloned().unwrap_or_default(), } } } @@ -74,6 +80,41 @@ impl BytesConvertable for NodeHealth { } } +#[derive(Debug, Clone)] +pub struct RefUpdateEvent { + pub relative_path: String, + pub ref_name: String, + pub old_oid: String, + pub new_oid: String, + pub primary_grpc_addr: String, + pub primary_storage_name: String, +} + +impl BytesConvertable for RefUpdateEvent { + fn into_bytes(self) -> Vec { + encode_strings(&[ + self.relative_path, + self.ref_name, + self.old_oid, + self.new_oid, + self.primary_grpc_addr, + self.primary_storage_name, + ]) + } + + fn from_bytes(bytes: Vec) -> Self { + let values = decode_strings(bytes); + Self { + relative_path: values.first().cloned().unwrap_or_default(), + ref_name: values.get(1).cloned().unwrap_or_default(), + old_oid: values.get(2).cloned().unwrap_or_default(), + new_oid: values.get(3).cloned().unwrap_or_default(), + primary_grpc_addr: values.get(4).cloned().unwrap_or_default(), + primary_storage_name: values.get(5).cloned().unwrap_or_default(), + } + } +} + #[derive(RactorClusterMessage)] pub enum GitNodeMessage { ScanAndRegister, @@ -82,8 +123,13 @@ pub enum GitNodeMessage { RemoveRepository(RepositoryHeader), + RefUpdated(RefUpdateEvent), + #[rpc] - RouteRepository(RepositoryHeader, RpcReplyPort), + FindPrimary(RepositoryHeader, RpcReplyPort), + + #[rpc] + FindReplica(RepositoryHeader, RpcReplyPort), #[rpc] ListRepositoryPaths(RpcReplyPort), diff --git a/actor/mod.rs b/actor/mod.rs index 7f214ab..b7c687e 100644 --- a/actor/mod.rs +++ b/actor/mod.rs @@ -1,7 +1,8 @@ pub mod message; pub mod handler; pub mod server; +pub mod sync; -pub use handler::{GitNodeActor, GitNodeArgs, start_node_actor, get_cluster_nodes, get_category_members, route_group_for, list_all_groups}; +pub use handler::{GitNodeActor, GitNodeArgs, RepoEntry, start_node_actor, get_cluster_nodes, get_category_members, route_group_for, list_all_groups, broadcast_ref_update}; pub use server::init_actor_cluster; -pub use message::{GitNodeMessage, NodeHealth, RepoActorMessage, RouteDecision}; +pub use message::{GitNodeMessage, NodeHealth, RefUpdateEvent, RepoActorMessage, RouteDecision, ROLE_PRIMARY, ROLE_REPLICA}; diff --git a/actor/sync.rs b/actor/sync.rs new file mode 100644 index 0000000..d7ed47b --- /dev/null +++ b/actor/sync.rs @@ -0,0 +1,220 @@ +use std::path::PathBuf; +use crate::actor::message::RefUpdateEvent; +use crate::pb::Oid; + +pub struct BundleApplicator { + pub repo_path: PathBuf, +} + +impl BundleApplicator { + pub fn new(repo_path: PathBuf) -> Self { + Self { repo_path } + } + + pub fn apply_bundle(&self, data: &[u8]) -> Result<(), String> { + let mut child = std::process::Command::new("git") + .args(["--git-dir", &self.repo_path.to_string_lossy(), "bundle", "unbundle", "-"]) + .stdin(std::process::Stdio::piped()) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .spawn() + .map_err(|e| format!("spawn git bundle unbundle: {e}"))?; + use std::io::Write; + if let Some(ref mut stdin) = child.stdin { + stdin.write_all(data).map_err(|e| format!("write bundle: {e}"))?; + } + let output = child.wait_with_output().map_err(|e| format!("wait bundle: {e}"))?; + if !output.status.success() { + return Err(String::from_utf8_lossy(&output.stderr).into_owned()); + } + Ok(()) + } +} + +pub fn collect_local_haves(repo_path: &PathBuf) -> Result, String> { + let result = std::process::Command::new("git") + .args([ + "--git-dir", + &repo_path.to_string_lossy(), + "for-each-ref", + "--format=%(objectname)", + ]) + .stdout(std::process::Stdio::piped()) + .stderr(std::process::Stdio::piped()) + .output() + .map_err(|e| format!("git for-each-ref: {e}"))?; + + if !result.status.success() { + return Err(String::from_utf8_lossy(&result.stderr).into_owned()); + } + + let stdout = String::from_utf8_lossy(&result.stdout); + let haves: Vec = stdout + .lines() + .filter(|line| !line.trim().is_empty() && line.trim() != crate::oid::ZERO_OID) + .map(|hex| { + let hex = hex.trim().to_string(); + Oid { + value: crate::oid::hex_to_bytes(&hex).unwrap_or_default(), + hex, + format: crate::pb::ObjectFormat::Sha1 as i32, + } + }) + .collect(); + + tracing::debug!( + repo = %repo_path.display(), + haves_count = haves.len(), + "collected local haves from refs" + ); + Ok(haves) +} + +pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf) { + tracing::info!( + relative_path = %event.relative_path, + ref_name = %event.ref_name, + primary = %event.primary_grpc_addr, + "replica sync starting" + ); + + let grpc_addr = event.primary_grpc_addr.clone(); + let relative_path = event.relative_path.clone(); + let repo_for_haves = local_repo_path.clone(); + + match tokio::task::spawn_blocking(move || { + sync_via_pack_service(&grpc_addr, &relative_path, &repo_for_haves) + }).await { + Ok(Ok(pack_data)) if !pack_data.is_empty() => { + let pack_len = pack_data.len(); + let repo = local_repo_path.clone(); + match tokio::task::spawn_blocking(move || { + apply_pack_data(&repo, &pack_data) + }).await { + Ok(Ok(())) => { + update_local_ref(&local_repo_path, &event.ref_name, &event.new_oid); + tracing::info!( + relative_path = %event.relative_path, + bytes = pack_len, + "replica sync done" + ); + } + Ok(Err(e)) => tracing::error!(relative_path = %event.relative_path, error = %e, "pack apply failed"), + Err(e) => tracing::error!(relative_path = %event.relative_path, error = %e, "apply task failed"), + } + } + Ok(Ok(_)) => tracing::warn!(relative_path = %event.relative_path, "empty pack data from primary"), + Ok(Err(e)) => tracing::error!(relative_path = %event.relative_path, error = %e, "pack fetch failed"), + Err(e) => tracing::error!(relative_path = %event.relative_path, error = %e, "sync task failed"), + } +} + +fn sync_via_pack_service( + grpc_addr: &str, + relative_path: &str, + local_repo_path: &PathBuf, +) -> Result, String> { + let haves = collect_local_haves(local_repo_path)?; + + let rt = tokio::runtime::Handle::current(); + rt.block_on(async { + use crate::pb::pack_service_client::PackServiceClient; + use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader}; + use tokio_stream::StreamExt; + + let endpoint = crate::server::remote_endpoint(grpc_addr) + .await + .map_err(|e| e.to_string())?; + + let mut client = PackServiceClient::connect(endpoint) + .await + .map_err(|e| format!("connect to primary: {e}"))?; + + let header = RepositoryHeader { + storage_name: String::new(), + relative_path: relative_path.to_string(), + storage_path: String::new(), + }; + + let refs_resp = client.advertise_refs(AdvertiseRefsRequest { + repository: Some(header.clone()), + protocol: None, + service: "upload-pack".to_string(), + }).await.map_err(|e| format!("AdvertiseRefs: {e}"))?; + + let refs = refs_resp.into_inner().references; + if refs.is_empty() { + return Ok(Vec::new()); + } + + let wants: Vec = refs.iter() + .filter_map(|r| r.target_oid.clone()) + .collect(); + + let want_count = wants.len(); + let have_count = haves.len(); + + tracing::info!( + relative_path = %relative_path, + want_count, + have_count, + "requesting incremental pack from primary" + ); + + let options = PackObjectsOptions { + wants, + haves, + shallow_revisions: Vec::new(), + deepen: 0, + thin_pack: false, + include_tag: true, + use_bitmaps: true, + delta_base_offset: true, + pathspec: Vec::new(), + }; + + let req = PackObjectsRequest { + repository: Some(header.clone()), + options: Some(options), + }; + + let resp = client.pack_objects(req).await + .map_err(|e| format!("PackObjects: {e}"))?; + + let mut stream = resp.into_inner(); + let mut pack_data = Vec::new(); + while let Some(chunk) = stream.next().await { + match chunk { + Ok(msg) => pack_data.extend_from_slice(&msg.data), + Err(e) => return Err(format!("pack stream: {e}")), + } + } + + tracing::info!( + relative_path = %relative_path, + pack_bytes = pack_data.len(), + "received pack data from primary" + ); + + Ok(pack_data) + }) +} + +fn apply_pack_data(repo_path: &PathBuf, pack_data: &[u8]) -> Result<(), String> { + let applicator = BundleApplicator::new(repo_path.clone()); + applicator.apply_bundle(pack_data) +} + +fn update_local_ref(repo_path: &PathBuf, ref_name: &str, new_oid: &str) { + if ref_name.is_empty() || new_oid.is_empty() { + return; + } + match std::process::Command::new("git") + .args(["--git-dir", &repo_path.to_string_lossy(), "update-ref", ref_name, new_oid]) + .output() + { + Ok(o) if o.status.success() => tracing::info!(ref_name = %ref_name, new_oid = %new_oid, "ref updated"), + Ok(o) => tracing::error!(ref_name = %ref_name, error = %String::from_utf8_lossy(&o.stderr), "update-ref failed"), + Err(e) => tracing::error!(ref_name = %ref_name, error = %e, "update-ref spawn failed"), + } +} diff --git a/main.rs b/main.rs index 77a9430..62bb469 100644 --- a/main.rs +++ b/main.rs @@ -41,7 +41,9 @@ async fn main() -> Result<(), Box> { storage_name.clone(), grpc_addr.clone(), ).await?; - let svc = GitksService::new(repo_prefix.clone()).with_actor(node_actor.clone()); + let svc = GitksService::new(repo_prefix.clone()) + .with_actor(node_actor.clone()) + .with_grpc_addr(grpc_addr.clone()); tracing::info!( "starting gitks gRPC server on {addr}, repo prefix: {}, storage: {storage_name}, advertise: {grpc_addr}", diff --git a/server/archive.rs b/server/archive.rs index 3738a36..8bc5cdd 100644 --- a/server/archive.rs +++ b/server/archive.rs @@ -6,12 +6,13 @@ use super::{GitksService, cache, into_status}; async fn remote_archive_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding archive rpc"); @@ -38,7 +39,7 @@ impl archive_service_server::ArchiveService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref(), false).await? { let resp = client.get_archive(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); @@ -63,7 +64,7 @@ impl archive_service_server::ArchiveService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_archive_client(self, inner.repository.as_ref(), false).await? { return client.list_archive_entries(inner).await; } return Err(err); diff --git a/server/blame.rs b/server/blame.rs index 537502b..b03e636 100644 --- a/server/blame.rs +++ b/server/blame.rs @@ -6,12 +6,13 @@ use super::{GitksService, cache, into_status, into_stream}; async fn remote_blame_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding blame rpc"); @@ -39,7 +40,7 @@ impl blame_service_server::BlameService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref(), false).await? { return client.blame(inner).await; } return Err(err); @@ -69,7 +70,7 @@ impl blame_service_server::BlameService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_blame_client(self, inner.repository.as_ref(), false).await? { let resp = client.stream_blame(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); diff --git a/server/branch.rs b/server/branch.rs index 616d60f..62cb7cb 100644 --- a/server/branch.rs +++ b/server/branch.rs @@ -6,12 +6,13 @@ use super::{GitksService, into_status}; async fn remote_branch_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding branch rpc"); @@ -35,7 +36,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), false).await? { return client.list_branches(inner).await; } return Err(err); @@ -59,7 +60,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), false).await? { return client.get_branch(inner).await; } return Err(err); @@ -82,7 +83,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), true).await? { return client.create_branch(inner).await; } return Err(err); @@ -91,6 +92,7 @@ impl branch_service_server::BranchService for GitksService { }; let resp = gb.create_branch(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch created"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", name), "", ""); Ok(tonic::Response::new(resp)) } @@ -106,7 +108,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), true).await? { return client.delete_branch(inner).await; } return Err(err); @@ -115,6 +117,7 @@ impl branch_service_server::BranchService for GitksService { }; gb.delete_branch(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch deleted"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", name), "", ""); Ok(tonic::Response::new(())) } @@ -131,7 +134,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), true).await? { return client.rename_branch(inner).await; } return Err(err); @@ -140,6 +143,7 @@ impl branch_service_server::BranchService for GitksService { }; let resp = gb.rename_branch(inner).map_err(into_status)?; tracing::info!(%repo, old = %old, new = %new, "branch renamed"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", new), "", ""); Ok(tonic::Response::new(resp)) } @@ -155,7 +159,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), true).await? { return client.update_branch_target(inner).await; } return Err(err); @@ -164,6 +168,7 @@ impl branch_service_server::BranchService for GitksService { }; let resp = gb.update_branch_target(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch target updated"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", name), "", ""); Ok(tonic::Response::new(resp)) } @@ -179,7 +184,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), true).await? { return client.set_branch_upstream(inner).await; } return Err(err); @@ -188,6 +193,7 @@ impl branch_service_server::BranchService for GitksService { }; let resp = gb.set_branch_upstream(inner).map_err(into_status)?; tracing::info!(%repo, %name, "branch upstream set"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", name), "", ""); Ok(tonic::Response::new(resp)) } @@ -204,7 +210,7 @@ impl branch_service_server::BranchService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_branch_client(self, inner.repository.as_ref(), false).await? { return client.compare_branch(inner).await; } return Err(err); diff --git a/server/commit.rs b/server/commit.rs index 0644143..968d0a5 100644 --- a/server/commit.rs +++ b/server/commit.rs @@ -6,12 +6,13 @@ use super::{GitksService, cache, into_status}; async fn remote_commit_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding commit rpc"); @@ -35,7 +36,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), false).await? { return client.list_commits(inner).await; } return Err(err); @@ -64,7 +65,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), false).await? { return client.get_commit(inner).await; } return Err(err); @@ -92,7 +93,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), false).await? { return client.get_commit_ancestors(inner).await; } return Err(err); @@ -122,7 +123,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), true).await? { return client.create_commit(inner).await; } return Err(err); @@ -134,6 +135,7 @@ impl commit_service_server::CommitService for GitksService { .and_then(|c| c.oid.as_ref().map(|o| o.hex.as_str()).or(Some("?"))) .unwrap_or("?"); tracing::info!(%repo, %branch, %commit_hex, "commit created"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", branch), "", ""); Ok(tonic::Response::new(resp)) } @@ -149,7 +151,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), true).await? { return client.revert_commit(inner).await; } return Err(err); @@ -158,6 +160,7 @@ impl commit_service_server::CommitService for GitksService { }; let resp = gb.revert_commit(inner).map_err(into_status)?; tracing::info!(%repo, %branch, "commit reverted"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", branch), "", ""); Ok(tonic::Response::new(resp)) } @@ -173,7 +176,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), true).await? { return client.cherry_pick_commit(inner).await; } return Err(err); @@ -182,6 +185,7 @@ impl commit_service_server::CommitService for GitksService { }; let resp = gb.cherry_pick_commit(inner).map_err(into_status)?; tracing::info!(%repo, %branch, "commit cherry-picked"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", branch), "", ""); Ok(tonic::Response::new(resp)) } @@ -196,7 +200,7 @@ impl commit_service_server::CommitService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_commit_client(self, inner.repository.as_ref(), false).await? { return client.compare_commits(inner).await; } return Err(err); diff --git a/server/diff.rs b/server/diff.rs index f002e20..b0f4ba2 100644 --- a/server/diff.rs +++ b/server/diff.rs @@ -6,12 +6,13 @@ use super::{GitksService, cache, into_status, into_stream}; async fn remote_diff_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding diff rpc"); @@ -38,7 +39,7 @@ impl diff_service_server::DiffService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref(), false).await? { return client.get_diff(inner).await; } return Err(err); @@ -67,7 +68,7 @@ impl diff_service_server::DiffService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref(), false).await? { return client.get_commit_diff(inner).await; } return Err(err); @@ -96,7 +97,7 @@ impl diff_service_server::DiffService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref(), false).await? { let resp = client.get_patch(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); @@ -126,7 +127,7 @@ impl diff_service_server::DiffService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_diff_client(self, inner.repository.as_ref(), false).await? { return client.get_diff_stats(inner).await; } return Err(err); diff --git a/server/merge.rs b/server/merge.rs index dc44cde..11724b4 100644 --- a/server/merge.rs +++ b/server/merge.rs @@ -6,12 +6,13 @@ use super::{GitksService, into_status}; async fn remote_merge_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding merge rpc"); @@ -35,7 +36,7 @@ impl merge_service_server::MergeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref(), false).await? { return client.check_merge(inner).await; } return Err(err); @@ -59,7 +60,7 @@ impl merge_service_server::MergeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref(), true).await? { return client.merge(inner).await; } return Err(err); @@ -68,6 +69,7 @@ impl merge_service_server::MergeService for GitksService { }; let resp = gb.merge(inner).map_err(into_status)?; tracing::info!(%repo, %target, status = resp.status, "merge done"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", target), "", ""); Ok(tonic::Response::new(resp)) } @@ -82,7 +84,7 @@ impl merge_service_server::MergeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref(), false).await? { return client.list_merge_conflicts(inner).await; } return Err(err); @@ -106,7 +108,7 @@ impl merge_service_server::MergeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref(), true).await? { return client.resolve_merge_conflicts(inner).await; } return Err(err); @@ -115,6 +117,7 @@ impl merge_service_server::MergeService for GitksService { }; let resp = gb.resolve_merge_conflicts(inner).map_err(into_status)?; tracing::info!(%repo, %target, status = resp.status, "merge conflicts resolved"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", target), "", ""); Ok(tonic::Response::new(resp)) } @@ -130,7 +133,7 @@ impl merge_service_server::MergeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_merge_client(self, inner.repository.as_ref(), true).await? { return client.rebase(inner).await; } return Err(err); @@ -139,6 +142,7 @@ impl merge_service_server::MergeService for GitksService { }; let resp = gb.rebase(inner).map_err(into_status)?; tracing::info!(%repo, %branch, status = resp.status, "rebase done"); + self.notify_ref_update(&repo, &format!("refs/heads/{}", branch), "", ""); Ok(tonic::Response::new(resp)) } } diff --git a/server/mod.rs b/server/mod.rs index fe94b3f..4d86f83 100644 --- a/server/mod.rs +++ b/server/mod.rs @@ -29,11 +29,12 @@ use crate::pb::{ pub struct GitksService { pub repo_prefix: PathBuf, pub node_actor: Option>, + pub grpc_addr: String, } impl GitksService { pub fn new(repo_prefix: PathBuf) -> Self { - Self { repo_prefix, node_actor: None } + Self { repo_prefix, node_actor: None, grpc_addr: String::new() } } pub fn with_actor(mut self, node_actor: ActorRef) -> Self { @@ -41,6 +42,11 @@ impl GitksService { self } + pub fn with_grpc_addr(mut self, grpc_addr: String) -> Self { + self.grpc_addr = grpc_addr; + self + } + pub fn scan_all_repo(&self) -> GitResult> { let root = self.repo_prefix.as_ref(); let mut repos = Vec::new(); @@ -57,19 +63,45 @@ impl GitksService { pub async fn route_repository( &self, header: &crate::pb::RepositoryHeader, + is_write: bool, ) -> Result, tonic::Status> { + use crate::actor::message::{ROLE_PRIMARY, ROLE_REPLICA}; let members = ractor::pg::get_members(&"gitks_nodes".to_string()); let local = self.node_actor.as_ref().map(|actor| actor.get_cell()); + let mut primary: Option = None; + let mut replica: Option = None; for member in members { if local.as_ref().is_some_and(|actor| actor == &member) { continue; } - if let Some(decision) = query_route(member, header.clone()).await? { + if let Some(decision) = query_find_primary(member.clone(), header.clone()).await? { if decision.found && !decision.grpc_addr.is_empty() { - return Ok(Some(decision)); + primary = Some(decision); + if is_write { + return Ok(primary); + } + } + } + if !is_write && replica.is_none() { + if let Some(decision) = query_find_replica(member.clone(), header.clone()).await? { + if decision.found && !decision.grpc_addr.is_empty() && decision.role == ROLE_REPLICA { + replica = Some(decision); + } } } } + if let Some(p) = primary { + return Ok(Some(p)); + } + if let Some(r) = replica { + tracing::info!( + storage_name = %r.storage_name, + relative_path = %r.relative_path, + "read request routed to replica" + ); + return Ok(Some(r)); + } + let _ = ROLE_PRIMARY; Ok(None) } @@ -127,6 +159,26 @@ impl GitksService { Ok(canonical) } + pub fn notify_ref_update( + &self, + relative_path: &str, + ref_name: &str, + old_oid: &str, + new_oid: &str, + ) { + if let Some(ref actor) = self.node_actor { + let event = crate::actor::message::RefUpdateEvent { + relative_path: relative_path.to_string(), + ref_name: ref_name.to_string(), + old_oid: old_oid.to_string(), + new_oid: new_oid.to_string(), + primary_grpc_addr: self.grpc_addr.clone(), + primary_storage_name: String::new(), + }; + crate::actor::handler::broadcast_ref_update(actor, event); + } + } + /// Inject repo_prefix as storage_path into the client-provided header fn prefixed_header(&self, header: &crate::pb::RepositoryHeader) -> crate::pb::RepositoryHeader { crate::pb::RepositoryHeader { @@ -139,7 +191,7 @@ impl GitksService { -pub(super) async fn remote_endpoint(addr: &str) -> Result { +pub async fn remote_endpoint(addr: &str) -> Result { let uri: tonic::codegen::http::Uri = addr .parse() .map_err(|e| tonic::Status::invalid_argument(format!("invalid URI: {e}")))?; @@ -162,15 +214,29 @@ pub(super) fn bridge_server_stream( tokio_stream::wrappers::ReceiverStream::new(rx) } -async fn query_route( +async fn query_find_primary( member: ActorCell, header: crate::pb::RepositoryHeader, ) -> Result, tonic::Status> { let actor_ref: ActorRef = member.into(); - match ractor::call_t!(actor_ref, GitNodeMessage::RouteRepository, 500, header) { + match ractor::call_t!(actor_ref, GitNodeMessage::FindPrimary, 500, header) { Ok(decision) => Ok(Some(decision)), Err(err) => { - tracing::warn!(error = %err, "repository route query failed"); + tracing::warn!(error = %err, "find primary query failed"); + Ok(None) + } + } +} + +async fn query_find_replica( + member: ActorCell, + header: crate::pb::RepositoryHeader, +) -> Result, tonic::Status> { + let actor_ref: ActorRef = member.into(); + match ractor::call_t!(actor_ref, GitNodeMessage::FindReplica, 500, header) { + Ok(decision) => Ok(Some(decision)), + Err(err) => { + tracing::warn!(error = %err, "find replica query failed"); Ok(None) } } diff --git a/server/pack.rs b/server/pack.rs index ae74f75..1459a44 100644 --- a/server/pack.rs +++ b/server/pack.rs @@ -9,12 +9,13 @@ use super::{GitksService, into_status}; async fn remote_pack_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding pack rpc"); @@ -42,7 +43,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { return client.advertise_refs(inner).await; } return Err(err); @@ -69,7 +70,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, first.repository.as_ref(), false).await? { let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { @@ -122,7 +123,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, first.repository.as_ref(), false).await? { let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { @@ -171,7 +172,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { let resp = client.pack_objects(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); @@ -200,7 +201,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(inputs.first().and_then(|r| r.repository.as_ref())) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, inputs.first().and_then(|r| r.repository.as_ref())).await? { + if let Some(mut client) = remote_pack_client(self, inputs.first().and_then(|r| r.repository.as_ref()), false).await? { return client.index_pack(tokio_stream::iter(inputs)).await; } return Err(err); @@ -223,7 +224,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { return client.list_packfiles(inner).await; } return Err(err); @@ -246,7 +247,7 @@ impl pack_service_server::PackService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { return client.fsck(inner).await; } return Err(err); diff --git a/server/repository.rs b/server/repository.rs index b6b478c..8865187 100644 --- a/server/repository.rs +++ b/server/repository.rs @@ -6,12 +6,13 @@ use super::{GitksService, git_cmd, into_status, repository_maint, remote_endpoin async fn remote_repository_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding repository rpc"); @@ -47,7 +48,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.get_repository(inner).await; } return Err(err); @@ -77,6 +78,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = crate::bare::GitBare::new(bare_dir); gb.init_repository(inner.bare).map_err(into_status)?; tracing::info!(%repo, bare = inner.bare, "repository initialized"); + self.notify_ref_update(&repo, "HEAD", "", ""); Ok(tonic::Response::new(Repository { header: inner.repository, bare: inner.bare, @@ -94,13 +96,14 @@ impl repository_service_server::RepositoryService for GitksService { let _enter = span.enter(); let bare_dir = self.resolve_for_init(inner.repository.as_ref())?; if !bare_dir.exists() { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.delete_repository(inner).await; } } tracing::warn!(%repo, path = %bare_dir.display(), "deleting repository"); std::fs::remove_dir_all(&bare_dir).map_err(|e| tonic::Status::internal(e.to_string()))?; tracing::info!(%repo, "repository deleted"); + self.notify_ref_update(&repo, "", "", ""); Ok(tonic::Response::new(())) } @@ -115,7 +118,7 @@ impl repository_service_server::RepositoryService for GitksService { let bare_dir = self.resolve_for_init(inner.repository.as_ref())?; let exists = bare_dir.exists() && bare_dir.is_dir() && bare_dir.join("HEAD").exists(); if !exists { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.repository_exists(inner).await; } } @@ -133,7 +136,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.get_object_format(inner).await; } return Err(err); @@ -156,7 +159,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.get_default_branch(inner).await; } return Err(err); @@ -180,7 +183,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.set_default_branch(inner).await; } return Err(err); @@ -195,6 +198,7 @@ impl repository_service_server::RepositoryService for GitksService { )); } tracing::info!(%repo, %name, "default branch set"); + self.notify_ref_update(&repo, &refname, "", ""); Ok(tonic::Response::new(())) } @@ -209,7 +213,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.get_repository_config(inner).await; } return Err(err); @@ -266,7 +270,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.set_repository_config(inner).await; } return Err(err); @@ -286,6 +290,7 @@ impl repository_service_server::RepositoryService for GitksService { } } } + self.notify_ref_update(&repo, "", "", ""); Ok(tonic::Response::new(())) } @@ -300,7 +305,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.get_repository_statistics(inner).await; } return Err(err); @@ -321,7 +326,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), false).await? { return client.check_repository_health(inner).await; } return Err(err); @@ -344,7 +349,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.garbage_collect(inner).await; } return Err(err); @@ -367,7 +372,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.repack(inner).await; } return Err(err); @@ -395,7 +400,7 @@ impl repository_service_server::RepositoryService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_repository_client(self, inner.repository.as_ref(), true).await? { return client.write_commit_graph(inner).await; } return Err(err); diff --git a/server/tag.rs b/server/tag.rs index 8a60fe3..9074c2e 100644 --- a/server/tag.rs +++ b/server/tag.rs @@ -6,12 +6,13 @@ use super::{GitksService, into_status}; async fn remote_tag_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding tag rpc"); @@ -35,7 +36,7 @@ impl tag_service_server::TagService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref(), false).await? { return client.list_tags(inner).await; } return Err(err); @@ -59,7 +60,7 @@ impl tag_service_server::TagService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref(), false).await? { return client.get_tag(inner).await; } return Err(err); @@ -82,7 +83,7 @@ impl tag_service_server::TagService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref(), true).await? { return client.create_tag(inner).await; } return Err(err); @@ -91,6 +92,7 @@ impl tag_service_server::TagService for GitksService { }; let resp = gb.create_tag(inner).map_err(into_status)?; tracing::info!(%repo, %name, "tag created"); + self.notify_ref_update(&repo, &format!("refs/tags/{}", name), "", ""); Ok(tonic::Response::new(resp)) } @@ -106,7 +108,7 @@ impl tag_service_server::TagService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref(), true).await? { return client.delete_tag(inner).await; } return Err(err); @@ -115,6 +117,7 @@ impl tag_service_server::TagService for GitksService { }; gb.delete_tag(inner).map_err(into_status)?; tracing::info!(%repo, %name, "tag deleted"); + self.notify_ref_update(&repo, &format!("refs/tags/{}", name), "", ""); Ok(tonic::Response::new(())) } @@ -130,7 +133,7 @@ impl tag_service_server::TagService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tag_client(self, inner.repository.as_ref(), false).await? { return client.verify_tag(inner).await; } return Err(err); diff --git a/server/tree.rs b/server/tree.rs index 1f21300..9e111a6 100644 --- a/server/tree.rs +++ b/server/tree.rs @@ -6,12 +6,13 @@ use super::{GitksService, cache, into_status, into_stream}; async fn remote_tree_client( svc: &GitksService, header: Option<&RepositoryHeader>, + is_write: bool, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; - let Some(route) = svc.route_repository(header).await? else { + let Some(route) = svc.route_repository(header, is_write).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding tree rpc"); @@ -38,7 +39,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { return client.list_tree(inner).await; } return Err(err); @@ -67,7 +68,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { return client.get_tree(inner).await; } return Err(err); @@ -96,7 +97,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { return client.get_blob(inner).await; } return Err(err); @@ -124,7 +125,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { let resp = client.get_raw_blob(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); @@ -158,7 +159,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { return client.get_file_metadata(inner).await; } return Err(err); @@ -186,7 +187,7 @@ impl tree_service_server::TreeService for GitksService { let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { - if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref()).await? { + if let Some(mut client) = remote_tree_client(self, inner.repository.as_ref(), false).await? { return client.find_files(inner).await; } return Err(err);