use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::pb::pack_service_client::PackServiceClient; use crate::pb::*; use crate::pack::CancellableReceiverStream; use super::{GitksService, into_status}; remote_client!( remote_pack_client, PackServiceClient, "pack" ); #[tonic::async_trait] impl pack_service_server::PackService for GitksService { type UploadPackStream = CancellableReceiverStream>; type ReceivePackStream = CancellableReceiverStream>; type PackObjectsStream = ReceiverStream>; async fn advertise_refs( &self, request: tonic::Request, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/AdvertiseRefs"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.advertise_refs", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { m.record("ok"); return client.advertise_refs(inner).await; } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; if let Some(ref pc) = self.pack_cache { let protocol_key = if inner.raw { format!("{}:raw", inner.service) } else { inner.service.clone() }; if let Ok(digest) = pc.disk_cache().compute_info_refs_key(&repo, &protocol_key) { if let Some(cached) = pc.lookup_info_refs::(&digest) { tracing::info!(%repo, refs = cached.references.len(), "advertise_refs done (cached)"); m.record("ok"); return Ok(tonic::Response::new(cached)); } let resp = gb.advertise_refs(inner).map_err(into_status)?; pc.store_info_refs(&digest, &resp); tracing::info!(%repo, refs = resp.references.len(), "advertise_refs done (written to cache)"); m.record("ok"); return Ok(tonic::Response::new(resp)); } } let resp = gb.advertise_refs(inner).map_err(into_status)?; tracing::info!(%repo, refs = resp.references.len(), "advertise_refs done"); m.record("ok"); Ok(tonic::Response::new(resp)) } async fn upload_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/UploadPack"); let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty upload-pack stream"))??; let _rate = self.acquire_rate_limit(first.repository.as_ref()).await?; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.upload_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, first.repository.as_ref(), false).await? { m.record("ok"); let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { use tokio_stream::StreamExt; while let Some(msg) = stream.next().await { match msg { Ok(m) => { if tx.send(m).await.is_err() { break; } } Err(_) => break, } } }); let resp = client .upload_pack(tokio_stream::wrappers::ReceiverStream::new(rx)) .await?; let out = super::bridge_server_stream(resp.into_inner()); // Create a dummy cancel token for the forwarded stream let cancel_token = tokio_util::sync::CancellationToken::new(); let cancel_guard = cancel_token.drop_guard(); return Ok(tonic::Response::new( crate::pack::CancellableReceiverStream::new(out, cancel_guard), )); } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; tracing::info!(%repo, "upload-pack streaming started"); let stateless = first.protocol.as_ref().is_some_and(|p| p.stateless); let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.upload_pack(stateless, ReceiverStream::new(rx)).await?; m.record("ok"); Ok(tonic::Response::new(result)) } async fn receive_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/ReceivePack"); let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty receive-pack stream"))??; let _rate = self.acquire_rate_limit(first.repository.as_ref()).await?; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.receive_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, first.repository.as_ref(), true).await? { m.record("ok"); let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { use tokio_stream::StreamExt; while let Some(msg) = stream.next().await { match msg { Ok(m) => { if tx.send(m).await.is_err() { break; } } Err(_) => break, } } }); let resp = client .receive_pack(tokio_stream::wrappers::ReceiverStream::new(rx)) .await?; let out = super::bridge_server_stream(resp.into_inner()); // Create a dummy cancel token for the forwarded stream let cancel_token = tokio_util::sync::CancellationToken::new(); let cancel_guard = cancel_token.drop_guard(); return Ok(tonic::Response::new( crate::pack::CancellableReceiverStream::new(out, cancel_guard), )); } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; tracing::info!(%repo, "receive-pack streaming started"); let stateless = first.protocol.as_ref().is_some_and(|p| p.stateless); let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.receive_pack(stateless, ReceiverStream::new(rx)).await?; m.record("ok"); Ok(tonic::Response::new(result)) } async fn pack_objects( &self, request: tonic::Request, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/PackObjects"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.pack_objects", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { m.record("ok"); let resp = client.pack_objects(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; if let Some(ref pc) = self.pack_cache && let Some(opts) = inner.options.as_ref() { let wants_hex: Vec = opts.wants.iter().map(|w| w.hex.clone()).collect(); let haves_hex: Vec = opts.haves.iter().map(|h| h.hex.clone()).collect(); if let Ok(digest) = pc.disk_cache().compute_pack_objects_key( &repo, &wants_hex, &haves_hex, opts.thin_pack, opts.use_bitmaps, opts.delta_base_offset, ) { if let Some(file) = pc .disk_cache() .open_stream_read(crate::pack_cache::PACK_CACHE_NAMESPACE, &digest) .ok() .flatten() { tracing::info!(%repo, digest = %digest, "pack-objects cache hit, streaming from disk"); m.record("ok"); let (tx, rx) = tokio::sync::mpsc::channel(16); tokio::spawn(async move { let result = tokio::task::spawn_blocking(move || { use std::io::Read; let mut file = file; let mut buf = vec![0u8; 65536]; let mut chunks = Vec::new(); loop { match file.read(&mut buf) { Ok(0) => break, Ok(n) => chunks.push(Ok(PackfileChunk { data: buf[..n].to_vec(), })), Err(e) => { chunks.push(Err(tonic::Status::internal(format!( "cache read error: {e}" )))); break; } } } chunks }) .await; match result { Ok(chunks) => { for chunk in chunks { if tx.send(chunk).await.is_err() { break; } } } Err(e) => { let _ = tx .send(Err(tonic::Status::internal(format!( "cache read task failed: {e}" )))) .await; } } }); return Ok(tonic::Response::new(ReceiverStream::new(rx))); } // Cache miss: execute pack-objects and tee to cache tracing::info!(%repo, digest = %digest, "pack-objects cache miss"); let stream = gb.pack_objects(inner).await?; let tee_stream = pc.tee_pack_stream(&digest, stream); m.record("ok"); return Ok(tonic::Response::new(tee_stream)); } } let stream = gb.pack_objects(inner).await?; tracing::info!(%repo, "pack-objects streaming started"); m.record("ok"); Ok(tonic::Response::new(stream)) } async fn index_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/IndexPack"); let mut stream = request.into_inner(); let mut inputs = Vec::new(); while let Some(msg) = stream.next().await { inputs.push(msg?); } let _rate = self .acquire_rate_limit( inputs .first() .and_then(|r: &IndexPackRequest| r.repository.as_ref()), ) .await?; let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref())); let span = tracing::info_span!("pack.index_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(inputs.first().and_then(|r| r.repository.as_ref())) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client( self, inputs.first().and_then(|r| r.repository.as_ref()), false, ) .await? { m.record("ok"); return client.index_pack(tokio_stream::iter(inputs)).await; } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; let resp = gb.index_pack(inputs).map_err(into_status)?; tracing::info!(%repo, objects = resp.object_count, "index_pack done"); m.record("ok"); Ok(tonic::Response::new(resp)) } async fn list_packfiles( &self, request: tonic::Request, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/ListPackfiles"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.list_packfiles", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { m.record("ok"); return client.list_packfiles(inner).await; } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; let resp = gb.list_packfiles(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.packfiles.len(), "list_packfiles done"); m.record("ok"); Ok(tonic::Response::new(resp)) } async fn fsck( &self, request: tonic::Request, ) -> Result, tonic::Status> { let m = crate::metrics::RequestMetrics::new("gitks.PackService/Fsck"); let inner = request.into_inner(); let _rate = self.acquire_rate_limit(inner.repository.as_ref()).await?; let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.fsck", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref(), false).await? { m.record("ok"); return client.fsck(inner).await; } crate::metrics::record_rpc_error(&m, &err); return Err(err); } Err(err) => { crate::metrics::record_rpc_error(&m, &err); return Err(err); } }; let resp = gb.fsck(inner).map_err(into_status)?; tracing::info!(%repo, ok = resp.ok, errors = resp.errors.len(), warnings = resp.warnings.len(), "fsck done"); m.record("ok"); Ok(tonic::Response::new(resp)) } }