use std::path::PathBuf; use crate::actor::message::RefUpdateEvent; use crate::pb::Oid; pub struct BundleApplicator { pub repo_path: PathBuf, } impl BundleApplicator { pub fn new(repo_path: PathBuf) -> Self { Self { repo_path } } pub fn apply_bundle(&self, data: &[u8]) -> Result<(), String> { let mut child = std::process::Command::new("git") .args(["--git-dir", &self.repo_path.to_string_lossy(), "bundle", "unbundle", "-"]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| format!("spawn git bundle unbundle: {e}"))?; use std::io::Write; if let Some(ref mut stdin) = child.stdin { stdin.write_all(data).map_err(|e| format!("write bundle: {e}"))?; } let output = child.wait_with_output().map_err(|e| format!("wait bundle: {e}"))?; if !output.status.success() { return Err(String::from_utf8_lossy(&output.stderr).into_owned()); } Ok(()) } } pub fn collect_local_haves(repo_path: &PathBuf) -> Result, String> { let result = std::process::Command::new("git") .args([ "--git-dir", &repo_path.to_string_lossy(), "for-each-ref", "--format=%(objectname)", ]) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .output() .map_err(|e| format!("git for-each-ref: {e}"))?; if !result.status.success() { return Err(String::from_utf8_lossy(&result.stderr).into_owned()); } let stdout = String::from_utf8_lossy(&result.stdout); let haves: Vec = stdout .lines() .filter(|line| !line.trim().is_empty() && line.trim() != crate::oid::ZERO_OID) .map(|hex| { let hex = hex.trim().to_string(); Oid { value: crate::oid::hex_to_bytes(&hex).unwrap_or_default(), hex, format: crate::pb::ObjectFormat::Sha1 as i32, } }) .collect(); tracing::debug!( repo = %repo_path.display(), haves_count = haves.len(), "collected local haves from refs" ); Ok(haves) } pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf) { tracing::info!( relative_path = %event.relative_path, ref_name = %event.ref_name, primary = %event.primary_grpc_addr, "replica sync starting" ); let grpc_addr = event.primary_grpc_addr.clone(); let relative_path = event.relative_path.clone(); let repo_for_haves = local_repo_path.clone(); match tokio::task::spawn_blocking(move || { sync_via_pack_service(&grpc_addr, &relative_path, &repo_for_haves) }).await { Ok(Ok(pack_data)) if !pack_data.is_empty() => { let pack_len = pack_data.len(); let repo = local_repo_path.clone(); match tokio::task::spawn_blocking(move || { apply_pack_data(&repo, &pack_data) }).await { Ok(Ok(())) => { update_local_ref(&local_repo_path, &event.ref_name, &event.new_oid); tracing::info!( relative_path = %event.relative_path, bytes = pack_len, "replica sync done" ); } Ok(Err(e)) => tracing::error!(relative_path = %event.relative_path, error = %e, "pack apply failed"), Err(e) => tracing::error!(relative_path = %event.relative_path, error = %e, "apply task failed"), } } Ok(Ok(_)) => tracing::warn!(relative_path = %event.relative_path, "empty pack data from primary"), Ok(Err(e)) => tracing::error!(relative_path = %event.relative_path, error = %e, "pack fetch failed"), Err(e) => tracing::error!(relative_path = %event.relative_path, error = %e, "sync task failed"), } } fn sync_via_pack_service( grpc_addr: &str, relative_path: &str, local_repo_path: &PathBuf, ) -> Result, String> { let haves = collect_local_haves(local_repo_path)?; let rt = tokio::runtime::Handle::current(); rt.block_on(async { use crate::pb::pack_service_client::PackServiceClient; use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader}; use tokio_stream::StreamExt; let endpoint = crate::server::remote_endpoint(grpc_addr) .await .map_err(|e| e.to_string())?; let mut client = PackServiceClient::connect(endpoint) .await .map_err(|e| format!("connect to primary: {e}"))?; let header = RepositoryHeader { storage_name: String::new(), relative_path: relative_path.to_string(), storage_path: String::new(), }; let refs_resp = client.advertise_refs(AdvertiseRefsRequest { repository: Some(header.clone()), protocol: None, service: "upload-pack".to_string(), }).await.map_err(|e| format!("AdvertiseRefs: {e}"))?; let refs = refs_resp.into_inner().references; if refs.is_empty() { return Ok(Vec::new()); } let wants: Vec = refs.iter() .filter_map(|r| r.target_oid.clone()) .collect(); let want_count = wants.len(); let have_count = haves.len(); tracing::info!( relative_path = %relative_path, want_count, have_count, "requesting incremental pack from primary" ); let options = PackObjectsOptions { wants, haves, shallow_revisions: Vec::new(), deepen: 0, thin_pack: false, include_tag: true, use_bitmaps: true, delta_base_offset: true, pathspec: Vec::new(), }; let req = PackObjectsRequest { repository: Some(header.clone()), options: Some(options), }; let resp = client.pack_objects(req).await .map_err(|e| format!("PackObjects: {e}"))?; let mut stream = resp.into_inner(); let mut pack_data = Vec::new(); while let Some(chunk) = stream.next().await { match chunk { Ok(msg) => pack_data.extend_from_slice(&msg.data), Err(e) => return Err(format!("pack stream: {e}")), } } tracing::info!( relative_path = %relative_path, pack_bytes = pack_data.len(), "received pack data from primary" ); Ok(pack_data) }) } fn apply_pack_data(repo_path: &PathBuf, pack_data: &[u8]) -> Result<(), String> { let applicator = BundleApplicator::new(repo_path.clone()); applicator.apply_bundle(pack_data) } fn update_local_ref(repo_path: &PathBuf, ref_name: &str, new_oid: &str) { if ref_name.is_empty() || new_oid.is_empty() { return; } match std::process::Command::new("git") .args(["--git-dir", &repo_path.to_string_lossy(), "update-ref", ref_name, new_oid]) .output() { Ok(o) if o.status.success() => tracing::info!(ref_name = %ref_name, new_oid = %new_oid, "ref updated"), Ok(o) => tracing::error!(ref_name = %ref_name, error = %String::from_utf8_lossy(&o.stderr), "update-ref failed"), Err(e) => tracing::error!(ref_name = %ref_name, error = %e, "update-ref spawn failed"), } }