use std::process::{Command, Stdio}; use tokio_stream::wrappers::ReceiverStream; use crate::bare::GitBare; use crate::pb::{ArchiveChunk, ArchiveRequest, archive_options, object_selector}; impl GitBare { /// Stream archive data via a channel to avoid loading the entire archive into memory. /// Returns a ReceiverStream that yields ArchiveChunk as the subprocess produces output. pub fn get_archive_stream( &self, request: ArchiveRequest, ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.clone(); tracing::info!( repo = %bare_dir.display(), "spawning git archive subprocess" ); let (tx, rx) = tokio::sync::mpsc::channel(16); let revision = match request.treeish.and_then(|s| s.selector) { Some(object_selector::Selector::Oid(oid)) => { crate::sanitize::validate_oid_hex(&oid.hex) .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; oid.hex } Some(object_selector::Selector::Revision(name)) => { crate::sanitize::validate_revision(&name.revision) .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; name.revision } None => "HEAD".into(), }; let options = request.options.unwrap_or_default(); if !options.prefix.is_empty() { crate::sanitize::validate_file_path(&options.prefix) .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; } for path in &options.pathspec { crate::sanitize::validate_file_path(path) .map_err(|e| tonic::Status::invalid_argument(e.to_string()))?; } tokio::task::spawn_blocking(move || { let format = archive_options::Format::try_from(options.format) .unwrap_or(archive_options::Format::ArchiveFormatTar); let mut args = vec!["archive".to_string()]; args.push(match format { archive_options::Format::ArchiveFormatZip => "--format=zip".into(), _ => "--format=tar".into(), }); if !options.prefix.is_empty() { args.push(format!("--prefix={}", options.prefix)); } args.push(revision); if !options.pathspec.is_empty() { args.push("--".into()); args.extend(options.pathspec); } let mut child = match Command::new("git") .arg("--git-dir") .arg(&bare_dir) .args(&args) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => { let _ = tx.blocking_send(Err(tonic::Status::internal(format!( "failed to spawn git archive: {e}" )))); return; } }; let stdout = match child.stdout.take() { Some(s) => s, None => { let _ = tx.blocking_send(Err(tonic::Status::internal("failed to capture stdout"))); return; } }; use std::io::Read; let mut reader = std::io::BufReader::new(stdout); let mut buf = vec![0u8; 65536]; loop { match reader.read(&mut buf) { Ok(0) => break, Ok(n) => { let chunk = ArchiveChunk { data: buf[..n].to_vec(), }; if tx.blocking_send(Ok(chunk)).is_err() { break; } } Err(e) => { let _ = tx.blocking_send(Err(tonic::Status::internal(format!( "read error: {e}" )))); break; } } } match child.wait() { Ok(status) if !status.success() => { let _ = tx.blocking_send(Err(tonic::Status::internal( "git archive exited with error", ))); } Err(e) => { let _ = tx.blocking_send(Err(tonic::Status::internal(format!("wait error: {e}")))); } _ => {} } }); Ok(ReceiverStream::new(rx)) } }