feat(server): add tracing spans and caching to archive and blame services
- Add tracing spans with repo labels for archive and blame operations - Implement caching for archive list entries when using OID selectors - Implement caching for blame operations when using OID selectors - Add detailed
This commit is contained in:
+105
-37
@@ -1,45 +1,113 @@
|
||||
use std::process::Command;
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
use tokio_stream::wrappers::ReceiverStream;
|
||||
|
||||
use crate::bare::GitBare;
|
||||
use crate::error::{GitError, GitResult};
|
||||
use crate::pb::{ArchiveChunk, ArchiveRequest, archive_options, object_selector};
|
||||
|
||||
impl GitBare {
|
||||
pub fn get_archive(&self, request: ArchiveRequest) -> GitResult<Vec<ArchiveChunk>> {
|
||||
let revision = match request.treeish.and_then(|s| s.selector) {
|
||||
Some(object_selector::Selector::Oid(oid)) => oid.hex,
|
||||
Some(object_selector::Selector::Revision(name)) => name.revision,
|
||||
None => "HEAD".into(),
|
||||
};
|
||||
let options = request.options.unwrap_or_default();
|
||||
let format = archive_options::Format::try_from(options.format)
|
||||
.unwrap_or(archive_options::Format::ArchiveFormatTar);
|
||||
let mut args = vec!["archive".to_string()];
|
||||
args.push(match format {
|
||||
archive_options::Format::ArchiveFormatZip => "--format=zip".into(),
|
||||
_ => "--format=tar".into(),
|
||||
});
|
||||
if !options.prefix.is_empty() {
|
||||
args.push(format!("--prefix={}", options.prefix));
|
||||
}
|
||||
args.push(revision);
|
||||
if !options.pathspec.is_empty() {
|
||||
args.push("--".into());
|
||||
args.extend(options.pathspec);
|
||||
}
|
||||
let output = Command::new("git")
|
||||
.arg("--git-dir")
|
||||
.arg(&self.bare_dir)
|
||||
.args(&args)
|
||||
.output()?;
|
||||
if !output.status.success() {
|
||||
return Err(GitError::CommandFailed {
|
||||
status_code: output.status.code(),
|
||||
stderr: String::from_utf8_lossy(&output.stderr).into_owned(),
|
||||
/// Stream archive data via a channel to avoid loading the entire archive into memory.
|
||||
/// Returns a ReceiverStream that yields ArchiveChunk as the subprocess produces output.
|
||||
pub fn get_archive_stream(
|
||||
&self,
|
||||
request: ArchiveRequest,
|
||||
) -> Result<ReceiverStream<Result<ArchiveChunk, tonic::Status>>, tonic::Status> {
|
||||
let bare_dir = self.bare_dir.clone();
|
||||
tracing::info!(
|
||||
repo = %bare_dir.display(),
|
||||
"spawning git archive subprocess"
|
||||
);
|
||||
|
||||
let (tx, rx) = tokio::sync::mpsc::channel(16);
|
||||
|
||||
// Spawn the blocking git subprocess in a dedicated thread
|
||||
tokio::task::spawn_blocking(move || {
|
||||
let revision = match request.treeish.and_then(|s| s.selector) {
|
||||
Some(object_selector::Selector::Oid(oid)) => oid.hex,
|
||||
Some(object_selector::Selector::Revision(name)) => name.revision,
|
||||
None => "HEAD".into(),
|
||||
};
|
||||
let options = request.options.unwrap_or_default();
|
||||
let format = archive_options::Format::try_from(options.format)
|
||||
.unwrap_or(archive_options::Format::ArchiveFormatTar);
|
||||
let mut args = vec!["archive".to_string()];
|
||||
args.push(match format {
|
||||
archive_options::Format::ArchiveFormatZip => "--format=zip".into(),
|
||||
_ => "--format=tar".into(),
|
||||
});
|
||||
}
|
||||
Ok(vec![ArchiveChunk {
|
||||
data: output.stdout,
|
||||
}])
|
||||
if !options.prefix.is_empty() {
|
||||
args.push(format!("--prefix={}", options.prefix));
|
||||
}
|
||||
args.push(revision);
|
||||
if !options.pathspec.is_empty() {
|
||||
args.push("--".into());
|
||||
args.extend(options.pathspec);
|
||||
}
|
||||
|
||||
let mut child = match Command::new("git")
|
||||
.arg("--git-dir")
|
||||
.arg(&bare_dir)
|
||||
.args(&args)
|
||||
.stdout(Stdio::piped())
|
||||
.stderr(Stdio::piped())
|
||||
.spawn()
|
||||
{
|
||||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(tonic::Status::internal(format!(
|
||||
"failed to spawn git archive: {e}"
|
||||
))));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
let stdout = match child.stdout.take() {
|
||||
Some(s) => s,
|
||||
None => {
|
||||
let _ =
|
||||
tx.blocking_send(Err(tonic::Status::internal("failed to capture stdout")));
|
||||
return;
|
||||
}
|
||||
};
|
||||
|
||||
// Read stdout in 64KB chunks and stream them
|
||||
use std::io::Read;
|
||||
let mut reader = std::io::BufReader::new(stdout);
|
||||
let mut buf = vec![0u8; 65536];
|
||||
loop {
|
||||
match reader.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => {
|
||||
let chunk = ArchiveChunk {
|
||||
data: buf[..n].to_vec(),
|
||||
};
|
||||
if tx.blocking_send(Ok(chunk)).is_err() {
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
let _ = tx.blocking_send(Err(tonic::Status::internal(format!(
|
||||
"read error: {e}"
|
||||
))));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match child.wait() {
|
||||
Ok(status) if !status.success() => {
|
||||
let _ = tx.blocking_send(Err(tonic::Status::internal(
|
||||
"git archive exited with error",
|
||||
)));
|
||||
}
|
||||
Err(e) => {
|
||||
let _ =
|
||||
tx.blocking_send(Err(tonic::Status::internal(format!("wait error: {e}"))));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
});
|
||||
|
||||
Ok(ReceiverStream::new(rx))
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user