//! Copyright (c) 2022-2026 GitDataAi All rights reserved. use std::process::Stdio; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::process::Command; use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use super::CancellableReceiverStream; use crate::bare::GitBare; use crate::config::{MAX_RECEIVE_PACKET_BYTES, MAX_RECEIVE_STDERR_BYTES, RECEIVE_PACK_TIMEOUT}; use crate::pb::ReceivePackResponse; impl GitBare { /// Receive pack data using git-receive-pack with true concurrent streaming. /// /// Client-streaming input → server-streaming output. /// Stdin packets are forwarded to the child process as they arrive from the client, /// while stdout is concurrently read and streamed back via a channel. /// /// `stateless` enables `--stateless-rpc` for HTTP smart protocol. /// Leave `false` for SSH (persistent connection). pub async fn receive_pack( &self, stateless: bool, input: impl tokio_stream::Stream> + Send + 'static, ) -> Result>, tonic::Status> { let bare_dir = self.bare_dir.to_string_lossy().into_owned(); tracing::info!( repo = %bare_dir, stateless = stateless, "spawning git receive-pack subprocess" ); let (tx, rx) = tokio::sync::mpsc::channel(16); let cancel_token = tokio_util::sync::CancellationToken::new(); let cancel_token_clone = cancel_token.clone(); let stream = Box::pin(input); tokio::spawn(async move { let stream = stream; let mut cmd = Command::new("git"); cmd.arg("--git-dir").arg(&bare_dir); if stateless { cmd.arg("--stateless-rpc"); } cmd.arg("receive-pack").arg(&bare_dir); let mut child = match cmd .stdin(Stdio::piped()) .stdout(Stdio::piped()) .stderr(Stdio::piped()) .spawn() { Ok(c) => c, Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!( "failed to spawn git receive-pack: {e}" )))) .await; return; } }; let child_id = child.id(); let mut stdin = child.stdin.take(); let mut stdout = child.stdout.take(); let mut stderr = child.stderr.take(); let stdin_task = { let mut stream = stream; let cancel = cancel_token.clone(); async move { if let Some(mut stdin) = stdin.take() { while let Some(result) = stream.next().await { if cancel.is_cancelled() { break; } match result { Ok(req) => { if req.packet.len() > MAX_RECEIVE_PACKET_BYTES { break; } if stdin.write_all(&req.packet).await.is_err() { break; } if req.done { break; } } Err(_) => break, } } drop(stdin); } } }; let stdout_task = { let tx = tx.clone(); let cancel = cancel_token.clone(); async move { if let Some(mut stdout) = stdout.take() { let mut buf = vec![0u8; 65536]; loop { if cancel.is_cancelled() { break; } match stdout.read(&mut buf).await { Ok(0) => break, Ok(n) => { if tx .send(Ok(ReceivePackResponse { packet: buf[..n].to_vec(), stderr: String::new(), })) .await .is_err() { break; } } Err(_) => break, } } } } }; let stderr_task = { let tx = tx.clone(); async move { if let Some(stderr) = stderr.take() { let mut stderr = stderr.take(MAX_RECEIVE_STDERR_BYTES); let mut s = String::new(); if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() { let _ = tx .send(Ok(ReceivePackResponse { packet: Vec::new(), stderr: s, })) .await; } } } }; let _process_future = tokio::join!(stdin_task, stdout_task, stderr_task); match tokio::time::timeout(RECEIVE_PACK_TIMEOUT, child.wait()).await { Ok(Ok(status)) => { if !status.success() { let _ = tx .send(Err(tonic::Status::internal( "git receive-pack exited with error", ))) .await; } } Ok(Err(e)) => { let _ = tx .send(Err(tonic::Status::internal(format!("wait error: {e}")))) .await; } Err(_timeout) => { tracing::warn!( repo = %bare_dir, pid = ?child_id, timeout_secs = RECEIVE_PACK_TIMEOUT.as_secs(), "git receive-pack timed out, killing" ); let _ = child.kill().await; let _ = tx .send(Err(tonic::Status::deadline_exceeded( "git receive-pack timed out", ))) .await; } } }); let rx_stream = ReceiverStream::new(rx); let cancel_guard = cancel_token_clone.clone().drop_guard(); Ok(super::CancellableReceiverStream::new( rx_stream, cancel_guard, )) } }