Files
gitks/archive/get_archive.rs
T
zhenyi cc202d6d1f feat(server): add tracing spans and caching to archive and blame services
- Add tracing spans with repo labels for archive and blame operations
- Implement caching for archive list entries when using OID selectors
- Implement caching for blame operations when using OID selectors
- Add detailed
2026-06-04 15:33:16 +08:00

114 lines
4.1 KiB
Rust

use std::process::{Command, Stdio};
use tokio_stream::wrappers::ReceiverStream;
use crate::bare::GitBare;
use crate::pb::{ArchiveChunk, ArchiveRequest, archive_options, object_selector};
impl GitBare {
/// Stream archive data via a channel to avoid loading the entire archive into memory.
/// Returns a ReceiverStream that yields ArchiveChunk as the subprocess produces output.
pub fn get_archive_stream(
&self,
request: ArchiveRequest,
) -> Result<ReceiverStream<Result<ArchiveChunk, tonic::Status>>, tonic::Status> {
let bare_dir = self.bare_dir.clone();
tracing::info!(
repo = %bare_dir.display(),
"spawning git archive subprocess"
);
let (tx, rx) = tokio::sync::mpsc::channel(16);
// Spawn the blocking git subprocess in a dedicated thread
tokio::task::spawn_blocking(move || {
let revision = match request.treeish.and_then(|s| s.selector) {
Some(object_selector::Selector::Oid(oid)) => oid.hex,
Some(object_selector::Selector::Revision(name)) => name.revision,
None => "HEAD".into(),
};
let options = request.options.unwrap_or_default();
let format = archive_options::Format::try_from(options.format)
.unwrap_or(archive_options::Format::ArchiveFormatTar);
let mut args = vec!["archive".to_string()];
args.push(match format {
archive_options::Format::ArchiveFormatZip => "--format=zip".into(),
_ => "--format=tar".into(),
});
if !options.prefix.is_empty() {
args.push(format!("--prefix={}", options.prefix));
}
args.push(revision);
if !options.pathspec.is_empty() {
args.push("--".into());
args.extend(options.pathspec);
}
let mut child = match Command::new("git")
.arg("--git-dir")
.arg(&bare_dir)
.args(&args)
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
let _ = tx.blocking_send(Err(tonic::Status::internal(format!(
"failed to spawn git archive: {e}"
))));
return;
}
};
let stdout = match child.stdout.take() {
Some(s) => s,
None => {
let _ =
tx.blocking_send(Err(tonic::Status::internal("failed to capture stdout")));
return;
}
};
// Read stdout in 64KB chunks and stream them
use std::io::Read;
let mut reader = std::io::BufReader::new(stdout);
let mut buf = vec![0u8; 65536];
loop {
match reader.read(&mut buf) {
Ok(0) => break,
Ok(n) => {
let chunk = ArchiveChunk {
data: buf[..n].to_vec(),
};
if tx.blocking_send(Ok(chunk)).is_err() {
break;
}
}
Err(e) => {
let _ = tx.blocking_send(Err(tonic::Status::internal(format!(
"read error: {e}"
))));
break;
}
}
}
match child.wait() {
Ok(status) if !status.success() => {
let _ = tx.blocking_send(Err(tonic::Status::internal(
"git archive exited with error",
)));
}
Err(e) => {
let _ =
tx.blocking_send(Err(tonic::Status::internal(format!("wait error: {e}"))));
}
_ => {}
}
});
Ok(ReceiverStream::new(rx))
}
}