feat(core): implement Git repository operations with gRPC services
- Add advertise_refs functionality for Git protocol communication - Implement archive service with TAR/ZIP format support and streaming - Create blame service for Git file annotation with line tracking - Add branch management including create, delete, rename and compare operations - Implement merge checking with conflict detection and fast-forward handling - Add cherry-pick functionality for applying commits between branches - Integrate gix library for Git repository operations and object handling - Add comprehensive test suite covering all Git operations - Implement proper error handling and repository validation - Add pagination support for large result sets - Create protobuf definitions for all Git operations and data structures - Add build system for gRPC code generation and dependency management
This commit is contained in:
@@ -0,0 +1,57 @@
|
||||
use crate::bare::GitBare;
|
||||
use crate::error::GitResult;
|
||||
use crate::pb::{AdvertiseRefsRequest, AdvertiseRefsResponse, ReferenceAdvertisement};
|
||||
|
||||
impl GitBare {
|
||||
pub fn advertise_refs(
|
||||
&self,
|
||||
_request: AdvertiseRefsRequest,
|
||||
) -> GitResult<AdvertiseRefsResponse> {
|
||||
let repo = self.gix_repo()?;
|
||||
let mut references = Vec::new();
|
||||
for r in repo.references()?.all()? {
|
||||
let mut r = match r {
|
||||
Ok(r) => r,
|
||||
Err(_) => continue,
|
||||
};
|
||||
let name = r.name().to_string();
|
||||
let target_oid = r.peel_to_id().ok().map(|id| self.oid_to_pb(id.to_string()));
|
||||
let is_symbolic = r.target().try_id().is_none();
|
||||
let symbolic_target = if is_symbolic {
|
||||
match r.target() {
|
||||
gix::refs::TargetRef::Symbolic(name) => name.to_string(),
|
||||
_ => String::new(),
|
||||
}
|
||||
} else {
|
||||
String::new()
|
||||
};
|
||||
// Peel past tags to get the commit OID if this is a tag ref
|
||||
let peeled_oid = if name.starts_with("refs/tags/") {
|
||||
r.peel_to_id().ok().map(|id| self.oid_to_pb(id.to_string()))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
references.push(ReferenceAdvertisement {
|
||||
name,
|
||||
target_oid,
|
||||
peeled_oid,
|
||||
symbolic: is_symbolic,
|
||||
symbolic_target,
|
||||
});
|
||||
}
|
||||
// Sort by name for deterministic output
|
||||
references.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
Ok(AdvertiseRefsResponse {
|
||||
references,
|
||||
capabilities: vec![
|
||||
"report-status".into(),
|
||||
"delete-refs".into(),
|
||||
"side-band-64k".into(),
|
||||
"ofs-delta".into(),
|
||||
"multi_ack_detailed".into(),
|
||||
"multi_ack".into(),
|
||||
"symref=HEAD".into(),
|
||||
],
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,45 @@
|
||||
use crate::bare::GitBare;
|
||||
use crate::error::GitResult;
|
||||
use crate::pb::{FsckRequest, FsckResponse};
|
||||
|
||||
impl GitBare {
|
||||
pub fn fsck(&self, request: FsckRequest) -> GitResult<FsckResponse> {
|
||||
let mut args = vec![
|
||||
"--git-dir".to_string(),
|
||||
self.bare_dir.to_string_lossy().into_owned(),
|
||||
"fsck".to_string(),
|
||||
];
|
||||
if request.strict {
|
||||
args.push("--strict".into());
|
||||
}
|
||||
if request.connectivity_only {
|
||||
args.push("--connectivity-only".into());
|
||||
}
|
||||
let result = duct::cmd("git", &args)
|
||||
.stdout_capture()
|
||||
.stderr_capture()
|
||||
.unchecked()
|
||||
.run()?;
|
||||
let stdout = String::from_utf8_lossy(&result.stdout);
|
||||
let stderr = String::from_utf8_lossy(&result.stderr);
|
||||
let ok = result.status.success();
|
||||
let mut errors = Vec::new();
|
||||
let mut warnings = Vec::new();
|
||||
for line in stdout.lines().chain(stderr.lines()) {
|
||||
if line.contains("error:") || line.contains("fatal:") {
|
||||
errors.push(
|
||||
line.trim_start_matches("error: ")
|
||||
.trim_start_matches("fatal: ")
|
||||
.to_string(),
|
||||
);
|
||||
} else if line.contains("warning:") {
|
||||
warnings.push(line.trim_start_matches("warning: ").to_string());
|
||||
}
|
||||
}
|
||||
Ok(FsckResponse {
|
||||
ok,
|
||||
errors,
|
||||
warnings,
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,138 @@
|
||||
use std::io::Write;
|
||||
|
||||
use crate::bare::GitBare;
|
||||
use crate::error::{GitError, GitResult};
|
||||
use crate::pb::{IndexPackRequest, IndexPackResponse};
|
||||
|
||||
impl GitBare {
|
||||
/// Index a pack file from streamed input.
|
||||
///
|
||||
/// Client-streaming → unary response.
|
||||
/// Collects all input chunks into a single pack, then runs `git index-pack`.
|
||||
pub fn index_pack(&self, inputs: Vec<IndexPackRequest>) -> GitResult<IndexPackResponse> {
|
||||
// Reassemble all chunks into a single pack data buffer
|
||||
let mut pack_data = Vec::new();
|
||||
let mut strict = false;
|
||||
let mut keep = false;
|
||||
|
||||
for input in &inputs {
|
||||
pack_data.extend_from_slice(&input.data);
|
||||
if input.strict {
|
||||
strict = true;
|
||||
}
|
||||
if input.keep {
|
||||
keep = true;
|
||||
}
|
||||
}
|
||||
|
||||
if pack_data.is_empty() {
|
||||
return Err(GitError::InvalidArgument("empty pack data".into()));
|
||||
}
|
||||
|
||||
let pack_dir = self.bare_dir.join("objects").join("pack");
|
||||
std::fs::create_dir_all(&pack_dir).map_err(GitError::Io)?;
|
||||
|
||||
// Write pack data to a unique temp file in the pack directory.
|
||||
let mut tmp_file = tempfile::Builder::new()
|
||||
.prefix("tmp_index_pack_")
|
||||
.tempfile_in(&pack_dir)
|
||||
.map_err(GitError::Io)?;
|
||||
tmp_file.write_all(&pack_data).map_err(GitError::Io)?;
|
||||
let tmp_path = tmp_file.path().to_path_buf();
|
||||
|
||||
let mut args = vec![
|
||||
"--git-dir".to_string(),
|
||||
self.bare_dir.to_string_lossy().into_owned(),
|
||||
"index-pack".to_string(),
|
||||
];
|
||||
if strict {
|
||||
args.push("--strict".into());
|
||||
}
|
||||
if keep {
|
||||
args.push("--keep".into());
|
||||
}
|
||||
args.push(tmp_path.to_string_lossy().into_owned());
|
||||
|
||||
let result = duct::cmd("git", &args)
|
||||
.stdout_capture()
|
||||
.stderr_capture()
|
||||
.unchecked()
|
||||
.run()?;
|
||||
|
||||
drop(tmp_file);
|
||||
|
||||
if !result.status.success() {
|
||||
return Err(GitError::CommandFailed {
|
||||
status_code: result.status.code(),
|
||||
stderr: String::from_utf8_lossy(&result.stderr).into_owned(),
|
||||
});
|
||||
}
|
||||
|
||||
// Parse the output to extract the pack hash
|
||||
let output = String::from_utf8_lossy(&result.stdout);
|
||||
let stderr = String::from_utf8_lossy(&result.stderr);
|
||||
let all_output = format!("{output}\n{stderr}");
|
||||
|
||||
// git index-pack outputs the .idx and .pack filenames
|
||||
// e.g. "... pack-<hex>.pack ... pack-<hex>.idx"
|
||||
let pack_hash = all_output
|
||||
.lines()
|
||||
.filter_map(|line| {
|
||||
// Look for the hash after "pack-" and before ".idx" or ".pack"
|
||||
let trimmed = line.trim();
|
||||
if let Some(idx) = trimmed.find("pack-") {
|
||||
let rest = &trimmed[idx + 5..];
|
||||
if let Some(end) = rest.find('.') {
|
||||
let hex = &rest[..end];
|
||||
if hex.len() == 40 || hex.len() == 64 {
|
||||
return Some(hex.to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
None
|
||||
})
|
||||
.next();
|
||||
|
||||
// Try to get object count from .idx if it exists
|
||||
let mut object_count = 0u64;
|
||||
if let Some(ref hash) = pack_hash {
|
||||
let idx_path = pack_dir.join(format!("pack-{}.idx", hash));
|
||||
if idx_path.exists() {
|
||||
let verify = duct::cmd(
|
||||
"git",
|
||||
[
|
||||
"--git-dir",
|
||||
self.bare_dir.to_string_lossy().as_ref(),
|
||||
"verify-pack",
|
||||
"-v",
|
||||
idx_path.to_string_lossy().as_ref(),
|
||||
],
|
||||
)
|
||||
.stdout_capture()
|
||||
.stderr_capture()
|
||||
.unchecked()
|
||||
.run();
|
||||
if let Ok(v) = verify {
|
||||
let out = String::from_utf8_lossy(&v.stdout);
|
||||
object_count = out
|
||||
.lines()
|
||||
.filter(|l| {
|
||||
let parts: Vec<&str> = l.split_whitespace().collect();
|
||||
parts.len() >= 3
|
||||
&& parts
|
||||
.first()
|
||||
.map(|s| s.len() == 40 || s.len() == 64)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.count() as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(IndexPackResponse {
|
||||
pack_hash: pack_hash.map(|h| self.oid_to_pb(h)),
|
||||
object_count,
|
||||
stderr: stderr.into_owned(),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,92 @@
|
||||
use crate::bare::GitBare;
|
||||
use crate::error::GitError;
|
||||
use crate::paginate;
|
||||
use crate::pb::{ListPackfilesRequest, ListPackfilesResponse, PackfileInfo};
|
||||
|
||||
impl GitBare {
|
||||
pub fn list_packfiles(
|
||||
&self,
|
||||
request: ListPackfilesRequest,
|
||||
) -> crate::error::GitResult<ListPackfilesResponse> {
|
||||
let pack_dir = self.bare_dir.join("objects").join("pack");
|
||||
let mut packfiles = Vec::new();
|
||||
|
||||
if pack_dir.exists() {
|
||||
for entry in std::fs::read_dir(&pack_dir).map_err(GitError::Io)? {
|
||||
let entry = entry.map_err(GitError::Io)?;
|
||||
let name = entry.file_name().to_string_lossy().into_owned();
|
||||
if !name.ends_with(".pack") {
|
||||
continue;
|
||||
}
|
||||
let metadata = entry.metadata().map_err(GitError::Io)?;
|
||||
let base_name = name.trim_end_matches(".pack");
|
||||
let idx_name = format!("{base_name}.idx");
|
||||
let bmp_name = format!("{base_name}.bitmap");
|
||||
let rev_name = format!("{base_name}.rev");
|
||||
let keep_name = format!("{base_name}.keep");
|
||||
|
||||
let pack_hash = base_name
|
||||
.strip_prefix("pack-")
|
||||
.filter(|hex| !hex.is_empty())
|
||||
.map(|hex| self.oid_to_pb(hex));
|
||||
|
||||
// Count objects
|
||||
let mut object_count = 0u64;
|
||||
if let Some(hash_str) = base_name.strip_prefix("pack-") {
|
||||
let idx_path = pack_dir.join(format!("pack-{hash_str}.idx"));
|
||||
if idx_path.exists() {
|
||||
let verify = duct::cmd(
|
||||
"git",
|
||||
[
|
||||
"--git-dir",
|
||||
self.bare_dir.to_string_lossy().as_ref(),
|
||||
"verify-pack",
|
||||
"-v",
|
||||
idx_path.to_string_lossy().as_ref(),
|
||||
],
|
||||
)
|
||||
.stdout_capture()
|
||||
.stderr_capture()
|
||||
.unchecked()
|
||||
.run();
|
||||
if let Ok(v) = verify {
|
||||
let out = String::from_utf8_lossy(&v.stdout);
|
||||
object_count = out
|
||||
.lines()
|
||||
.filter(|l| {
|
||||
let parts: Vec<&str> = l.split_whitespace().collect();
|
||||
parts.len() >= 3
|
||||
&& parts
|
||||
.first()
|
||||
.map(|s| s.len() == 40 || s.len() == 64)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.count() as u64;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
packfiles.push(PackfileInfo {
|
||||
name,
|
||||
pack_hash,
|
||||
size_bytes: metadata.len(),
|
||||
index_size_bytes: pack_dir
|
||||
.join(&idx_name)
|
||||
.metadata()
|
||||
.map(|m| m.len())
|
||||
.unwrap_or(0),
|
||||
object_count,
|
||||
has_bitmap: pack_dir.join(&bmp_name).exists(),
|
||||
has_rev_index: pack_dir.join(&rev_name).exists(),
|
||||
kept: pack_dir.join(&keep_name).exists(),
|
||||
});
|
||||
}
|
||||
}
|
||||
packfiles.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
let (packfiles, page_info) = paginate::paginate(&packfiles, request.pagination.as_ref());
|
||||
Ok(ListPackfilesResponse {
|
||||
packfiles,
|
||||
page_info: Some(page_info),
|
||||
})
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,7 @@
|
||||
pub mod advertise_refs;
|
||||
pub mod fsck;
|
||||
pub mod index_pack;
|
||||
pub mod list_packfiles;
|
||||
pub mod pack_objects;
|
||||
pub mod receive_pack;
|
||||
pub mod upload_pack;
|
||||
@@ -0,0 +1,160 @@
|
||||
use std::process::Stdio;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::Command;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::bare::GitBare;
|
||||
use crate::pb::PackfileChunk;
|
||||
|
||||
impl GitBare {
|
||||
/// Pack objects using git-pack-objects --stdout.
|
||||
///
|
||||
/// Unary request → server-streaming response.
|
||||
/// The returned stream yields `PackfileChunk` chunks as pack data is produced.
|
||||
pub async fn pack_objects(
|
||||
&self,
|
||||
request: crate::pb::PackObjectsRequest,
|
||||
) -> Result<ReceiverStream<Result<PackfileChunk, tonic::Status>>, tonic::Status> {
|
||||
let bare_dir = self.bare_dir.clone();
|
||||
let bare_dir_str = bare_dir.to_string_lossy().into_owned();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(8);
|
||||
|
||||
tokio::spawn(async move {
|
||||
let opts = request.options.as_ref();
|
||||
let has_wants = opts.is_some_and(|o| !o.wants.is_empty());
|
||||
|
||||
let mut args = vec![
|
||||
"--git-dir".to_string(),
|
||||
bare_dir_str,
|
||||
"pack-objects".to_string(),
|
||||
"--stdout".to_string(),
|
||||
];
|
||||
|
||||
// --all is mutually exclusive with explicit revision selection.
|
||||
if !has_wants {
|
||||
args.push("--all".into());
|
||||
} else {
|
||||
args.push("--revs".into());
|
||||
}
|
||||
|
||||
if opts.is_some_and(|o| o.thin_pack) {
|
||||
args.push("--thin".into());
|
||||
}
|
||||
if opts.is_some_and(|o| !o.use_bitmaps) {
|
||||
args.push("--no-use-bitmaps".into());
|
||||
}
|
||||
if opts.is_some_and(|o| o.delta_base_offset) {
|
||||
args.push("--delta-base-offset".into());
|
||||
}
|
||||
let stdin_data = generate_pack_input(&request);
|
||||
|
||||
let mut child = match Command::new("git")
|
||||
.args(&args)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!(
|
||||
"failed to spawn git pack-objects: {e}"
|
||||
))))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut stdin = child.stdin.take();
|
||||
let mut stdout = child.stdout.take();
|
||||
let mut stderr = child.stderr.take();
|
||||
|
||||
let stdin_task = async move {
|
||||
if let Some(mut stdin) = stdin.take() {
|
||||
let _ = stdin.write_all(&stdin_data).await;
|
||||
}
|
||||
};
|
||||
|
||||
let stdout_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stdout) = stdout.take() {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match stdout.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
if tx
|
||||
.send(Ok(PackfileChunk {
|
||||
data: buf[..n].to_vec(),
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!(
|
||||
"read error: {e}"
|
||||
))))
|
||||
.await;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stderr_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stderr) = stderr.take() {
|
||||
let mut s = String::new();
|
||||
if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() {
|
||||
let _ = tx.send(Err(tonic::Status::internal(s))).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::join!(stdin_task, stdout_task, stderr_task);
|
||||
|
||||
match child.wait().await {
|
||||
Ok(status) if !status.success() => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(
|
||||
"git pack-objects exited with error",
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!("wait error: {e}"))))
|
||||
.await;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ReceiverStream::new(rx))
|
||||
}
|
||||
}
|
||||
|
||||
fn generate_pack_input(req: &crate::pb::PackObjectsRequest) -> Vec<u8> {
|
||||
let mut input = String::new();
|
||||
if let Some(opts) = req.options.as_ref() {
|
||||
for want in &opts.wants {
|
||||
input.push_str(&format!("{}\n", want.hex));
|
||||
}
|
||||
for have in &opts.haves {
|
||||
input.push_str(&format!("^{}\n", have.hex));
|
||||
}
|
||||
}
|
||||
input.into_bytes()
|
||||
}
|
||||
@@ -0,0 +1,142 @@
|
||||
use std::process::Stdio;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::Command;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::bare::GitBare;
|
||||
use crate::pb::ReceivePackResponse;
|
||||
|
||||
impl GitBare {
|
||||
/// Receive pack data using git-receive-pack with true concurrent streaming.
|
||||
///
|
||||
/// Client-streaming input → server-streaming output.
|
||||
/// Stdin packets are forwarded to the child process as they arrive from the client,
|
||||
/// while stdout is concurrently read and streamed back via a channel.
|
||||
pub async fn receive_pack(
|
||||
&self,
|
||||
input: impl tokio_stream::Stream<Item = Result<crate::pb::ReceivePackRequest, tonic::Status>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
) -> Result<ReceiverStream<Result<ReceivePackResponse, tonic::Status>>, tonic::Status> {
|
||||
let bare_dir = self.bare_dir.to_string_lossy().into_owned();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
let stream = Box::pin(input);
|
||||
tokio::spawn(async move {
|
||||
let stream = stream;
|
||||
let mut child = match Command::new("git")
|
||||
.arg("--git-dir")
|
||||
.arg(&bare_dir)
|
||||
.arg("receive-pack")
|
||||
.arg(&bare_dir)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!(
|
||||
"failed to spawn git receive-pack: {e}"
|
||||
))))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut stdin = child.stdin.take();
|
||||
let mut stdout = child.stdout.take();
|
||||
let mut stderr = child.stderr.take();
|
||||
|
||||
let stdin_task = {
|
||||
let mut stream = stream;
|
||||
async move {
|
||||
if let Some(mut stdin) = stdin.take() {
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(req) => {
|
||||
if stdin.write_all(&req.packet).await.is_err() {
|
||||
break;
|
||||
}
|
||||
if req.done {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
drop(stdin);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stdout_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stdout) = stdout.take() {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match stdout.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
if tx
|
||||
.send(Ok(ReceivePackResponse {
|
||||
packet: buf[..n].to_vec(),
|
||||
stderr: String::new(),
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stderr_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stderr) = stderr.take() {
|
||||
let mut s = String::new();
|
||||
if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() {
|
||||
let _ = tx
|
||||
.send(Ok(ReceivePackResponse {
|
||||
packet: Vec::new(),
|
||||
stderr: s,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::join!(stdin_task, stdout_task, stderr_task);
|
||||
|
||||
match child.wait().await {
|
||||
Ok(status) if !status.success() => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(
|
||||
"git receive-pack exited with error",
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!("wait error: {e}"))))
|
||||
.await;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ReceiverStream::new(rx))
|
||||
}
|
||||
}
|
||||
@@ -0,0 +1,147 @@
|
||||
use std::process::Stdio;
|
||||
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::process::Command;
|
||||
use tokio_stream::StreamExt;
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::bare::GitBare;
|
||||
use crate::pb::UploadPackResponse;
|
||||
|
||||
impl GitBare {
|
||||
/// Upload pack data using git-upload-pack with true concurrent streaming.
|
||||
///
|
||||
/// Client-streaming input → server-streaming output.
|
||||
/// Stdin packets are forwarded to the child process as they arrive from the client,
|
||||
/// while stdout is concurrently read and streamed back via a channel.
|
||||
pub async fn upload_pack(
|
||||
&self,
|
||||
input: impl tokio_stream::Stream<Item = Result<crate::pb::UploadPackRequest, tonic::Status>>
|
||||
+ Send
|
||||
+ 'static,
|
||||
) -> Result<ReceiverStream<Result<UploadPackResponse, tonic::Status>>, tonic::Status> {
|
||||
let bare_dir = self.bare_dir.to_string_lossy().into_owned();
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Move input into the spawned task to make it 'static
|
||||
let stream = Box::pin(input);
|
||||
tokio::spawn(async move {
|
||||
let stream = stream;
|
||||
let mut child = match Command::new("git")
|
||||
.arg("--git-dir")
|
||||
.arg(&bare_dir)
|
||||
.arg("upload-pack")
|
||||
.arg(&bare_dir)
|
||||
.stdin(Stdio::piped())
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!(
|
||||
"failed to spawn git upload-pack: {e}"
|
||||
))))
|
||||
.await;
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let mut stdin = child.stdin.take();
|
||||
let mut stdout = child.stdout.take();
|
||||
let mut stderr = child.stderr.take();
|
||||
|
||||
// Concurrent: write stdin packets, read stdout chunks, read stderr
|
||||
let stdin_task = {
|
||||
let mut stream = stream;
|
||||
async move {
|
||||
if let Some(mut stdin) = stdin.take() {
|
||||
while let Some(result) = stream.next().await {
|
||||
match result {
|
||||
Ok(req) => {
|
||||
if stdin.write_all(&req.packet).await.is_err() {
|
||||
break;
|
||||
}
|
||||
if req.done {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
// Close stdin to signal end-of-input
|
||||
drop(stdin);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stdout_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stdout) = stdout.take() {
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match stdout.read(&mut buf).await {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
if tx
|
||||
.send(Ok(UploadPackResponse {
|
||||
packet: buf[..n].to_vec(),
|
||||
stderr: String::new(),
|
||||
}))
|
||||
.await
|
||||
.is_err()
|
||||
{
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let stderr_task = {
|
||||
let tx = tx.clone();
|
||||
async move {
|
||||
if let Some(mut stderr) = stderr.take() {
|
||||
let mut s = String::new();
|
||||
if stderr.read_to_string(&mut s).await.is_ok() && !s.is_empty() {
|
||||
let _ = tx
|
||||
.send(Ok(UploadPackResponse {
|
||||
packet: Vec::new(),
|
||||
stderr: s,
|
||||
}))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Run all three concurrently
|
||||
tokio::join!(stdin_task, stdout_task, stderr_task);
|
||||
|
||||
// Wait for child exit
|
||||
match child.wait().await {
|
||||
Ok(status) if !status.success() => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(
|
||||
"git upload-pack exited with error",
|
||||
)))
|
||||
.await;
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx
|
||||
.send(Err(tonic::Status::internal(format!("wait error: {e}"))))
|
||||
.await;
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ReceiverStream::new(rx))
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user