use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::pb::*; use super::{GitksService, into_status}; #[tonic::async_trait] impl pack_service_server::PackService for GitksService { type UploadPackStream = ReceiverStream>; type ReceivePackStream = ReceiverStream>; type PackObjectsStream = ReceiverStream>; async fn advertise_refs( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.advertise_refs", %repo); let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.advertise_refs(inner).map_err(into_status)?; tracing::info!(%repo, refs = resp.references.len(), "advertise_refs done"); Ok(tonic::Response::new(resp)) } async fn upload_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty upload-pack stream"))??; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.upload_pack", %repo); let _enter = span.enter(); tracing::info!(%repo, "upload-pack streaming started"); let gb = self.resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.upload_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn receive_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty receive-pack stream"))??; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.receive_pack", %repo); let _enter = span.enter(); tracing::info!(%repo, "receive-pack streaming started"); let gb = self.resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.receive_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn pack_objects( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.pack_objects", %repo); let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let stream = gb.pack_objects(inner).await?; tracing::info!(%repo, "pack-objects streaming started"); Ok(tonic::Response::new(stream)) } async fn index_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let mut inputs = Vec::new(); while let Some(msg) = stream.next().await { inputs.push(msg?); } let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref())); let span = tracing::info_span!("pack.index_pack", %repo); let _enter = span.enter(); let gb = self.resolve(inputs.first().and_then(|r| r.repository.as_ref()))?; let resp = gb.index_pack(inputs).map_err(into_status)?; tracing::info!(%repo, objects = resp.object_count, "index_pack done"); Ok(tonic::Response::new(resp)) } async fn list_packfiles( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.list_packfiles", %repo); let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.list_packfiles(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.packfiles.len(), "list_packfiles done"); Ok(tonic::Response::new(resp)) } async fn fsck( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.fsck", %repo); let _enter = span.enter(); let gb = self.resolve(inner.repository.as_ref())?; let resp = gb.fsck(inner).map_err(into_status)?; tracing::info!(%repo, ok = resp.ok, errors = resp.errors.len(), warnings = resp.warnings.len(), "fsck done"); Ok(tonic::Response::new(resp)) } }