use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::bare::GitBare; use crate::pb::ReceivePackResponse; impl GitBare { /// Receive pack data using git-receive-pack with true concurrent streaming. /// /// Client-streaming input → server-streaming output. /// Stdin packets are forwarded to the child process as they arrive from the client, /// while stdout is concurrently read and streamed back via a channel. pub async fn receive_pack( &self, input: impl tokio_stream::Stream> + Send + 'static, ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); let (tx, rx) = tokio::sync::mpsc::channel(16); let stream = Box::pin(input); tokio::spawn(async move { let stream = stream; let mut child = match Command::new("git") .arg("--git-dir") .arg(&bare_dir) .arg("receive-pack") .arg(&bare_dir) .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!( "failed to spawn git receive-pack: {e}" )))) .await; return; } }; let mut stdin = child.stdin.take(); let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); let stdin_task = { let mut stream = stream; async move { if let Some(mut stdin) = stdin.take() { while let Some(result) = stream.next().await { match result { Ok(req) => { if stdin.write_all(&req.packet).await.is_err() { break; } if req.done { break; } } Err(_) => break, } } drop(stdin); } } }; let stdout_task = { let tx = tx.clone(); async move { if let Some(mut stdout) = stdout.take() { let mut buf = vec![0u8; 65536]; loop { match stdout.read(&mut buf).await { Ok(0) => break, Ok(n) => { if tx .send(Ok(ReceivePackResponse { packet: buf[..n].to_vec(), stderr: String::new(), })) .await .is_err() { break; } } Err(_) => break, } } } } }; let stderr_task = { let tx = tx.clone(); async move { if let Some(mut stderr) = stderr.take() { let mut s = String::new(); if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() { let _ = tx .send(Ok(ReceivePackResponse { packet: Vec::new(), stderr: s, })) .await; } } } }; tokio::join!(stdin_task, stdout_task, stderr_task); match child.wait().await { Ok(status) if !status.success() => { let _ = tx .send(Err(tonic::Status::internal( "git receive-pack exited with error", ))) .await; } Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!("wait error: {e}")))) .await; } _ => {} } }); Ok(ReceiverStream::new(rx)) } }