use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::bare::GitBare; use crate::pb::UploadPackResponse; impl GitBare { /// Upload pack data using git-upload-pack with true concurrent streaming. /// /// Client-streaming input → server-streaming output. /// Stdin packets are forwarded to the child process as they arrive from the client, /// while stdout is concurrently read and streamed back via a channel. pub async fn upload_pack( &self, input: impl tokio_stream::Stream> + Send + 'static, ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); let (tx, rx) = tokio::sync::mpsc::channel(16); // Move input into the spawned task to make it 'static let stream = Box::pin(input); tokio::spawn(async move { let stream = stream; let mut child = match Command::new("git") .arg("--git-dir") .arg(&bare_dir) .arg("upload-pack") .arg(&bare_dir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!( "failed to spawn git upload-pack: {e}" )))) .await; return; } }; let mut stdin = child.stdin.take(); let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); // Concurrent: write stdin packets, read stdout chunks, read stderr let stdin_task = { let mut stream = stream; async move { if let Some(mut stdin) = stdin.take() { while let Some(result) = stream.next().await { match result { Ok(req) => { if stdin.write_all(&req.packet).await.is_err() { break; } if req.done { break; } } Err(_) => break, } } // Close stdin to signal end-of-input drop(stdin); } } }; let stdout_task = { let tx = tx.clone(); async move { if let Some(mut stdout) = stdout.take() { let mut buf = vec![0u8; 65536]; loop { match stdout.read(&mut buf).await { Ok(0) => break, Ok(n) => { if tx .send(Ok(UploadPackResponse { packet: buf[..n].to_vec(), stderr: String::new(), })) .await .is_err() { break; } } Err(_) => break, } } } } }; let stderr_task = { let tx = tx.clone(); async move { if let Some(mut stderr) = stderr.take() { let mut s = String::new(); if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() { let _ = tx .send(Ok(UploadPackResponse { packet: Vec::new(), stderr: s, })) .await; } } } }; // Run all three concurrently tokio::join!(stdin_task, stdout_task, stderr_task); // Wait for child exit match child.wait().await { Ok(status) if !status.success() => { let _ = tx .send(Err(tonic::Status::internal( "git upload-pack exited with error", ))) .await; } Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!("wait error: {e}")))) .await; } _ => {} } }); Ok(ReceiverStream::new(rx)) } }