use crate::actor::message::RefUpdateEvent; use crate::pb::Oid; use std::path::{Path, PathBuf}; pub struct BundleApplicator { pub repo_path: PathBuf, } impl BundleApplicator { pub fn new(repo_path: PathBuf) -> Self { Self { repo_path } } pub fn apply_bundle(&self, data: &[u8]) -> Result<(), String> { let mut child = std::process::Command::new("git") .args([ "--git-dir", &self.repo_path.to_string_lossy(), "bundle", "unbundle", "-", ]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| format!("spawn git bundle unbundle: {e}"))?; use std::io::Write; if let Some(ref mut stdin) = child.stdin { stdin .write_all(data) .map_err(|e| format!("write bundle: {e}"))?; } let output = child .wait_with_output() .map_err(|e| format!("wait bundle: {e}"))?; if !output.status.success() { return Err(String::from_utf8_lossy(&output.stderr).into_owned()); } Ok(()) } /// Apply bundle from a file path (for streaming writes). pub fn apply_bundle_from_file(&self, path: &Path) -> Result<(), String> { let file = std::fs::File::open(path).map_err(|e| format!("open bundle file: {e}"))?; let mut child = std::process::Command::new("git") .args([ "--git-dir", &self.repo_path.to_string_lossy(), "bundle", "unbundle", "-", ]) .stdin(std::process::Stdio::piped()) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .spawn() .map_err(|e| format!("spawn git bundle unbundle: {e}"))?; // Stream file contents to stdin in a background thread let mut stdin = child.stdin.take().ok_or("no stdin")?; let file_handle = file; let writer = std::thread::spawn(move || -> Result<(), String> { use std::io::{Read, Write}; let mut reader = std::io::BufReader::new(file_handle); let mut buf = vec![0u8; 65536]; loop { match reader.read(&mut buf) { Ok(0) => break, Ok(n) => { stdin .write_all(&buf[..n]) .map_err(|e| format!("write to stdin: {e}"))?; } Err(e) => return Err(format!("read bundle file: {e}")), } } Ok(()) }); let output = child .wait_with_output() .map_err(|e| format!("wait bundle: {e}"))?; // Wait for writer thread let _ = writer.join().map_err(|_| "writer thread panicked")?; if !output.status.success() { return Err(String::from_utf8_lossy(&output.stderr).into_owned()); } Ok(()) } } pub fn collect_local_haves(repo_path: &Path) -> Result, String> { let result = std::process::Command::new("git") .args([ "--git-dir", &repo_path.to_string_lossy(), "for-each-ref", "--format=%(objectname)", ]) .stdout(std::process::Stdio::piped()) .stderr(std::process::Stdio::piped()) .output() .map_err(|e| format!("git for-each-ref: {e}"))?; if !result.status.success() { return Err(String::from_utf8_lossy(&result.stderr).into_owned()); } let stdout = String::from_utf8_lossy(&result.stdout); let haves: Vec = stdout .lines() .filter(|line| !line.trim().is_empty() && line.trim() != crate::oid::ZERO_OID) .map(|hex| { let hex = hex.trim().to_string(); Oid { value: crate::oid::hex_to_bytes(&hex).unwrap_or_default(), hex, format: crate::pb::ObjectFormat::Sha1 as i32, } }) .collect(); tracing::debug!( repo = %repo_path.display(), haves_count = haves.len(), "collected local haves from refs" ); Ok(haves) } pub async fn sync_from_primary(event: RefUpdateEvent, local_repo_path: PathBuf) { tracing::info!( relative_path = %event.relative_path, ref_name = %event.ref_name, primary = %event.primary_grpc_addr, "replica sync starting" ); let grpc_addr = event.primary_grpc_addr.clone(); let relative_path = event.relative_path.clone(); let repo_for_haves = local_repo_path.clone(); // Collect haves in a blocking thread let haves = match tokio::task::spawn_blocking(move || collect_local_haves(&repo_for_haves)) .await { Ok(Ok(h)) => h, Ok(Err(e)) => { tracing::error!(relative_path = %event.relative_path, error = %e, "collect haves failed"); return; } Err(e) => { tracing::error!(relative_path = %event.relative_path, error = %e, "haves task failed"); return; } }; // Stream pack data to a temporary file to avoid OOM let temp_dir = local_repo_path.join(".gitks_tmp"); if let Err(e) = std::fs::create_dir_all(&temp_dir) { tracing::error!(relative_path = %event.relative_path, error = %e, "create temp dir failed"); return; } let pack_result = sync_via_pack_service_to_file(&grpc_addr, &relative_path, &haves, &temp_dir).await; match pack_result { Ok(Some(pack_file)) => { let repo = local_repo_path.clone(); let pack_path = pack_file.clone(); match tokio::task::spawn_blocking(move || { let applicator = BundleApplicator::new(repo); applicator.apply_bundle_from_file(&pack_path) }) .await { Ok(Ok(())) => { update_local_ref(&local_repo_path, &event.ref_name, &event.new_oid); tracing::info!( relative_path = %event.relative_path, "replica sync done" ); } Ok(Err(e)) => { tracing::error!(relative_path = %event.relative_path, error = %e, "pack apply failed") } Err(e) => { tracing::error!(relative_path = %event.relative_path, error = %e, "apply task failed") } } // Cleanup temp file let _ = std::fs::remove_file(&pack_file); } Ok(None) => { tracing::warn!(relative_path = %event.relative_path, "empty pack data from primary") } Err(e) => { tracing::error!(relative_path = %event.relative_path, error = %e, "pack fetch failed") } } // Cleanup temp dir if empty let _ = std::fs::remove_dir(&temp_dir); } /// Maximum pack size before we reject (10GB) const MAX_PACK_SIZE: u64 = 10 * 1024 * 1024 * 1024; /// Stream pack data from primary to a temporary file. /// Returns Ok(Some(path)) on success, Ok(None) if empty, Err on failure. async fn sync_via_pack_service_to_file( grpc_addr: &str, relative_path: &str, haves: &[Oid], temp_dir: &Path, ) -> Result, String> { use crate::pb::pack_service_client::PackServiceClient; use crate::pb::{AdvertiseRefsRequest, PackObjectsOptions, PackObjectsRequest, RepositoryHeader}; use tokio::io::AsyncWriteExt; use tokio_stream::StreamExt; let endpoint = crate::server::remote_endpoint(grpc_addr) .await .map_err(|e| e.to_string())?; let mut client = PackServiceClient::connect(endpoint) .await .map_err(|e| format!("connect to primary: {e}"))?; let header = RepositoryHeader { storage_name: String::new(), relative_path: relative_path.to_string(), storage_path: String::new(), }; let refs_resp = client .advertise_refs(AdvertiseRefsRequest { repository: Some(header.clone()), protocol: None, service: "upload-pack".to_string(), raw: false, }) .await .map_err(|e| format!("AdvertiseRefs: {e}"))?; let refs = refs_resp.into_inner().references; if refs.is_empty() { return Ok(None); } let wants: Vec = refs.iter().filter_map(|r| r.target_oid.clone()).collect(); let want_count = wants.len(); let have_count = haves.len(); tracing::info!( relative_path = %relative_path, want_count, have_count, "requesting incremental pack from primary" ); let options = PackObjectsOptions { wants, haves: haves.to_vec(), shallow_revisions: Vec::new(), deepen: 0, thin_pack: false, include_tag: true, use_bitmaps: true, delta_base_offset: true, pathspec: Vec::new(), }; let req = PackObjectsRequest { repository: Some(header.clone()), options: Some(options), }; let resp = client .pack_objects(req) .await .map_err(|e| format!("PackObjects: {e}"))?; let mut stream = resp.into_inner(); // Create a temporary file for streaming let temp_file = temp_dir.join(format!("pack_{}.bundle", std::process::id())); let mut file = tokio::fs::File::create(&temp_file) .await .map_err(|e| format!("create temp file: {e}"))?; let mut total_bytes: u64 = 0; while let Some(chunk) = stream.next().await { match chunk { Ok(msg) => { total_bytes += msg.data.len() as u64; if total_bytes > MAX_PACK_SIZE { let _ = tokio::fs::remove_file(&temp_file).await; return Err(format!( "pack data exceeds maximum size ({}GB)", MAX_PACK_SIZE / (1024 * 1024 * 1024) )); } file.write_all(&msg.data) .await .map_err(|e| format!("write pack data: {e}"))?; } Err(e) => { let _ = tokio::fs::remove_file(&temp_file).await; return Err(format!("pack stream: {e}")); } } } // Flush and close the file file.flush() .await .map_err(|e| format!("flush pack file: {e}"))?; drop(file); tracing::info!( relative_path = %relative_path, pack_bytes = total_bytes, "received pack data from primary" ); Ok(Some(temp_file)) } fn update_local_ref(repo_path: &Path, ref_name: &str, new_oid: &str) { if ref_name.is_empty() || new_oid.is_empty() { return; } match std::process::Command::new("git") .args([ "--git-dir", &repo_path.to_string_lossy(), "update-ref", ref_name, new_oid, ]) .output() { Ok(o) if o.status.success() => { tracing::info!(ref_name = %ref_name, new_oid = %new_oid, "ref updated") } Ok(o) => { tracing::error!(ref_name = %ref_name, error = %String::from_utf8_lossy(&o.stderr), "update-ref failed") } Err(e) => tracing::error!(ref_name = %ref_name, error = %e, "update-ref spawn failed"), } }