use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::pb::*; use super::{GitksService, into_status, resolve}; #[tonic::async_trait] impl pack_service_server::PackService for GitksService { type UploadPackStream = ReceiverStream>; type ReceivePackStream = ReceiverStream>; type PackObjectsStream = ReceiverStream>; async fn advertise_refs( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let gb = resolve(inner.repository.as_ref())?; let resp = gb.advertise_refs(inner).map_err(into_status)?; Ok(tonic::Response::new(resp)) } async fn upload_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty upload-pack stream"))??; let gb = resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.upload_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn receive_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty receive-pack stream"))??; let gb = resolve(first.repository.as_ref())?; let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.receive_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn pack_objects( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let gb = resolve(inner.repository.as_ref())?; let stream = gb.pack_objects(inner).await?; Ok(tonic::Response::new(stream)) } async fn index_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let mut inputs = Vec::new(); while let Some(msg) = stream.next().await { inputs.push(msg?); } let gb = resolve(inputs.first().and_then(|r| r.repository.as_ref()))?; let resp = gb.index_pack(inputs).map_err(into_status)?; Ok(tonic::Response::new(resp)) } async fn list_packfiles( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let gb = resolve(inner.repository.as_ref())?; let resp = gb.list_packfiles(inner).map_err(into_status)?; Ok(tonic::Response::new(resp)) } async fn fsck( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let gb = resolve(inner.repository.as_ref())?; let resp = gb.fsck(inner).map_err(into_status)?; Ok(tonic::Response::new(resp)) } }