use tokio_stream::StreamExt; use tokio_stream::wrappers::ReceiverStream; use crate::pb::*; use crate::pb::pack_service_client::PackServiceClient; use super::{GitksService, into_status}; async fn remote_pack_client( svc: &GitksService, header: Option<&RepositoryHeader>, ) -> Result>, tonic::Status> { let header = match header { Some(h) => h, None => return Ok(None), }; let Some(route) = svc.route_repository(header).await? else { return Ok(None); }; tracing::info!(storage_name = %route.storage_name, relative_path = %route.relative_path, actor_name = %route.actor_name, grpc_addr = %route.grpc_addr, "forwarding pack rpc"); let endpoint = super::remote_endpoint(&route.grpc_addr).await?; let client = PackServiceClient::connect(endpoint) .await .map_err(|e| tonic::Status::unavailable(e.to_string()))?; Ok(Some(client)) } #[tonic::async_trait] impl pack_service_server::PackService for GitksService { type UploadPackStream = ReceiverStream>; type ReceivePackStream = ReceiverStream>; type PackObjectsStream = ReceiverStream>; async fn advertise_refs( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.advertise_refs", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { return client.advertise_refs(inner).await; } return Err(err); } Err(err) => return Err(err), }; let resp = gb.advertise_refs(inner).map_err(into_status)?; tracing::info!(%repo, refs = resp.references.len(), "advertise_refs done"); Ok(tonic::Response::new(resp)) } async fn upload_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty upload-pack stream"))??; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.upload_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { use tokio_stream::StreamExt; while let Some(msg) = stream.next().await { match msg { Ok(m) => { if tx.send(m).await.is_err() { break; } } Err(_) => break, } } }); let resp = client.upload_pack(tokio_stream::wrappers::ReceiverStream::new(rx)).await?; let out = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(out)); } return Err(err); } Err(err) => return Err(err), }; tracing::info!(%repo, "upload-pack streaming started"); let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.upload_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn receive_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let first = stream .next() .await .ok_or_else(|| tonic::Status::invalid_argument("empty receive-pack stream"))??; let repo = self.repo_label(first.repository.as_ref()); let span = tracing::info_span!("pack.receive_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(first.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, first.repository.as_ref()).await? { let (tx, rx) = tokio::sync::mpsc::channel(16); let _ = tx.send(first).await; tokio::spawn(async move { use tokio_stream::StreamExt; while let Some(msg) = stream.next().await { match msg { Ok(m) => { if tx.send(m).await.is_err() { break; } } Err(_) => break, } } }); let resp = client.receive_pack(tokio_stream::wrappers::ReceiverStream::new(rx)).await?; let out = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(out)); } return Err(err); } Err(err) => return Err(err), }; tracing::info!(%repo, "receive-pack streaming started"); let (tx, rx) = tokio::sync::mpsc::channel(16); tx.send(Ok(first)) .await .map_err(|_| tonic::Status::internal("channel closed"))?; tokio::spawn(async move { while let Some(msg) = stream.next().await { if tx.send(msg).await.is_err() { break; } } }); let result = gb.receive_pack(ReceiverStream::new(rx)).await?; Ok(tonic::Response::new(result)) } async fn pack_objects( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.pack_objects", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { let resp = client.pack_objects(inner).await?; let stream = super::bridge_server_stream(resp.into_inner()); return Ok(tonic::Response::new(stream)); } return Err(err); } Err(err) => return Err(err), }; let stream = gb.pack_objects(inner).await?; tracing::info!(%repo, "pack-objects streaming started"); Ok(tonic::Response::new(stream)) } async fn index_pack( &self, request: tonic::Request>, ) -> Result, tonic::Status> { let mut stream = request.into_inner(); let mut inputs = Vec::new(); while let Some(msg) = stream.next().await { inputs.push(msg?); } let repo = self.repo_label(inputs.first().and_then(|r| r.repository.as_ref())); let span = tracing::info_span!("pack.index_pack", %repo); let _enter = span.enter(); let gb = match self.resolve(inputs.first().and_then(|r| r.repository.as_ref())) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inputs.first().and_then(|r| r.repository.as_ref())).await? { return client.index_pack(tokio_stream::iter(inputs)).await; } return Err(err); } Err(err) => return Err(err), }; let resp = gb.index_pack(inputs).map_err(into_status)?; tracing::info!(%repo, objects = resp.object_count, "index_pack done"); Ok(tonic::Response::new(resp)) } async fn list_packfiles( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.list_packfiles", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { return client.list_packfiles(inner).await; } return Err(err); } Err(err) => return Err(err), }; let resp = gb.list_packfiles(inner).map_err(into_status)?; tracing::info!(%repo, count = resp.packfiles.len(), "list_packfiles done"); Ok(tonic::Response::new(resp)) } async fn fsck( &self, request: tonic::Request, ) -> Result, tonic::Status> { let inner = request.into_inner(); let repo = self.repo_label(inner.repository.as_ref()); let span = tracing::info_span!("pack.fsck", %repo); let _enter = span.enter(); let gb = match self.resolve(inner.repository.as_ref()) { Ok(gb) => gb, Err(err) if err.code() == tonic::Code::NotFound => { if let Some(mut client) = remote_pack_client(self, inner.repository.as_ref()).await? { return client.fsck(inner).await; } return Err(err); } Err(err) => return Err(err), }; let resp = gb.fsck(inner).map_err(into_status)?; tracing::info!(%repo, ok = resp.ok, errors = resp.errors.len(), warnings = resp.warnings.len(), "fsck done"); Ok(tonic::Response::new(resp)) } }