Files
gitks/server/mod.rs
T
zhenyi 66afd932ed feat(api): extend commit and diff services with new functionality
- Add FindCommit, ListCommitsByOid, CommitIsAncestor RPCs to CommitService
- Add CheckObjectsExist, CommitsByMessage, GetCommitStats RPCs to CommitService
- Add LastCommitForPath, CountCommits, CountDivergingCommits RPCs to CommitService
- Add RawDiff, RawPatch, FindChangedPaths RPCs to DiffService
- Add FindMergeBase, WriteRef, SearchFilesByContent RPCs to RepositoryService
- Add SearchFilesByName, ObjectsSize, RepositorySize RPCs to RepositoryService
- Add FindLicense, OptimizeRepository, GetRawChanges RPCs to RepositoryService
- Add FetchRemote, CreateRepositoryFromURL RPCs to RepositoryService
- Implement server handlers for all new RPC methods
- Add new modules for commit counting, finding, and querying features
- Add new modules for diff changed paths and raw operations
- Add new modules for refs and remote operations
- Remove unnecessary comments from various source files
- Update proto definitions with new message types and service methods
2026-06-08 15:37:08 +08:00

485 lines
17 KiB
Rust

/// Generate a `remote_<service>_client` helper function that resolves a repository
/// route and returns a connected gRPC client for the given service.
macro_rules! remote_client {
($fn_name:ident, $client:ty, $svc_label:literal) => {
async fn $fn_name(
svc: &super::GitksService,
header: Option<&crate::pb::RepositoryHeader>,
is_write: bool,
) -> Result<Option<$client>, tonic::Status> {
let header = match header {
Some(h) => h,
None => return Ok(None),
};
let Some(route) = svc.route_repository(header, is_write).await? else {
return Ok(None);
};
tracing::info!(
storage_name = %route.storage_name,
relative_path = %route.relative_path,
actor_name = %route.actor_name,
grpc_addr = %route.grpc_addr,
concat!("forwarding ", $svc_label, " rpc")
);
let endpoint = super::remote_endpoint(&route.grpc_addr).await?;
let client = <$client>::connect(endpoint)
.await
.map_err(|e| tonic::Status::unavailable(e.to_string()))?;
Ok(Some(client))
}
};
}
mod archive;
mod blame;
mod branch;
mod cache;
mod commit;
mod diff;
mod merge;
mod pack;
mod refs;
mod remote;
mod repository;
mod repository_maint;
mod tag;
mod tree;
use gix::discover::is_git;
use ractor::{ActorCell, ActorRef};
use std::path::{Path, PathBuf};
use tokio_stream::wrappers::ReceiverStream;
use crate::actor::message::{GitNodeMessage, RouteDecision};
use crate::bare::GitBare;
use crate::error::{GitError, GitResult};
use crate::pb::{
archive_service_server, blame_service_server, branch_service_server, commit_service_server,
diff_service_server, merge_service_server, pack_service_server, ref_service_server,
remote_service_server, repository_service_server, tag_service_server, tree_service_server,
};
#[derive(Clone)]
pub struct GitksService {
pub repo_prefix: PathBuf,
pub node_actor: Option<ActorRef<GitNodeMessage>>,
pub grpc_addr: String,
pub disk_cache: Option<crate::disk_cache::DiskCache>,
pub pack_cache: Option<crate::pack_cache::PackCache>,
pub hook_manager: Option<crate::hooks::HookManager>,
}
impl GitksService {
pub fn new(repo_prefix: PathBuf) -> Self {
Self {
repo_prefix,
node_actor: None,
grpc_addr: String::new(),
disk_cache: None,
pack_cache: None,
hook_manager: None,
}
}
pub fn with_actor(mut self, node_actor: ActorRef<GitNodeMessage>) -> Self {
self.node_actor = Some(node_actor);
self
}
pub fn with_disk_cache(mut self, dc: crate::disk_cache::DiskCache) -> Self {
self.disk_cache = Some(dc);
self
}
pub fn with_pack_cache(mut self, pc: crate::pack_cache::PackCache) -> Self {
self.pack_cache = Some(pc);
self
}
pub fn with_hook_manager(mut self, hm: crate::hooks::HookManager) -> Self {
self.hook_manager = Some(hm);
self
}
pub fn with_grpc_addr(mut self, grpc_addr: String) -> Self {
self.grpc_addr = grpc_addr;
self
}
pub fn scan_all_repo(&self) -> GitResult<Vec<String>> {
let root = self.repo_prefix.as_ref();
let mut repos = Vec::new();
if is_bare_git_repo(root) {
repos.push(root.to_path_buf());
} else {
scan_bare_repos_recursively(root, &mut repos)?;
}
Ok(repos
.into_iter()
.filter_map(|path| path.to_str().map(str::to_owned))
.collect())
}
pub async fn route_repository(
&self,
header: &crate::pb::RepositoryHeader,
is_write: bool,
) -> Result<Option<RouteDecision>, tonic::Status> {
use crate::actor::message::{ROLE_PRIMARY, ROLE_REPLICA};
let members = ractor::pg::get_members(&"gitks_nodes".to_string());
let local = self.node_actor.as_ref().map(|actor| actor.get_cell());
let mut primary: Option<RouteDecision> = None;
let mut replica: Option<RouteDecision> = None;
for member in members {
if local.as_ref().is_some_and(|actor| actor == &member) {
continue;
}
if let Some(decision) = query_find_primary(member.clone(), header.clone()).await?
&& decision.found
&& !decision.grpc_addr.is_empty()
{
primary = Some(decision);
if is_write {
return Ok(primary);
}
}
if !is_write
&& replica.is_none()
&& let Some(decision) = query_find_replica(member.clone(), header.clone()).await?
&& decision.found
&& !decision.grpc_addr.is_empty()
&& decision.role == ROLE_REPLICA
{
replica = Some(decision);
}
}
if let Some(p) = primary {
return Ok(Some(p));
}
if let Some(r) = replica {
tracing::info!(
storage_name = %r.storage_name,
relative_path = %r.relative_path,
"read request routed to replica"
);
return Ok(Some(r));
}
let _ = ROLE_PRIMARY;
Ok(None)
}
fn repo_label(&self, header: Option<&crate::pb::RepositoryHeader>) -> String {
header
.and_then(|h| {
if h.relative_path.is_empty() {
None
} else {
Some(h.relative_path.clone())
}
})
.unwrap_or_else(|| "unknown".into())
}
/// Get the relative path from a repository header, if any.
pub(crate) fn repo_relative_path<'a>(&self, header: Option<&'a crate::pb::RepositoryHeader>) -> Option<&'a str> {
header.and_then(|h| {
if h.relative_path.is_empty() {
None
} else {
Some(h.relative_path.as_str())
}
})
}
/// Acquire a rate-limit permit for the repository in this request.
/// Returns a guard that releases the permit on drop.
pub(crate) async fn acquire_rate_limit(
&self,
header: Option<&crate::pb::RepositoryHeader>,
) -> Result<Option<crate::rate_limit::RateLimitGuard>, tonic::Status> {
crate::rate_limit::acquire_or_reject(self.repo_relative_path(header)).await
}
pub(crate) fn resolve(
&self,
header: Option<&crate::pb::RepositoryHeader>,
) -> Result<GitBare, tonic::Status> {
let header =
header.ok_or_else(|| tonic::Status::invalid_argument("repository is required"))?;
let header = self.prefixed_header(header);
let gb = GitBare::from_repository_header(&header).map_err(into_status)?;
tracing::debug!(
repo = %gb.bare_dir.display(),
"resolved repository"
);
Ok(gb)
}
pub(crate) fn resolve_for_init(
&self,
header: Option<&crate::pb::RepositoryHeader>,
) -> Result<PathBuf, tonic::Status> {
let header =
header.ok_or_else(|| tonic::Status::invalid_argument("repository is required"))?;
let relative_path = header.relative_path.trim();
if relative_path.is_empty() {
return Err(tonic::Status::invalid_argument("relative_path is required"));
}
// Validate early to reject '..' and other traversal patterns
crate::sanitize::validate_relative_path(relative_path)
.map_err(|e| tonic::Status::invalid_argument(e.to_string()))?;
let candidate = self.repo_prefix.join(relative_path);
// Canonicalize repo_prefix (which should exist) for a reliable check
let prefix_canon = self
.repo_prefix
.canonicalize()
.unwrap_or_else(|_| self.repo_prefix.clone());
// Unified path validation to avoid TOCTOU
let canonical = match candidate.canonicalize() {
Ok(canon) => {
// Path exists and was canonicalized
canon
}
Err(_) => {
// Path doesn't exist yet — validate via parent
let parent = candidate.parent().unwrap_or(&self.repo_prefix);
let filename = candidate.file_name().ok_or_else(|| {
tonic::Status::invalid_argument("invalid path: missing filename")
})?;
let parent_canon = parent
.canonicalize()
.unwrap_or_else(|_| parent.to_path_buf());
let constructed = parent_canon.join(filename);
// String-level verification for non-existent paths
let constructed_str = constructed.to_string_lossy();
let prefix_str = prefix_canon.to_string_lossy();
if !constructed_str.starts_with(&*prefix_str) {
return Err(tonic::Status::invalid_argument(
"path traversal detected: relative_path escapes repo prefix",
));
}
constructed
}
};
// Final check: canonical must be under prefix
if !canonical.starts_with(&prefix_canon) {
return Err(tonic::Status::invalid_argument(
"path traversal detected: relative_path escapes repo prefix",
));
}
Ok(canonical)
}
pub fn notify_ref_update(
&self,
relative_path: &str,
ref_name: &str,
old_oid: &str,
new_oid: &str,
) {
// Invalidate moka caches
crate::server::cache::invalidate_repo(relative_path);
// Invalidate disk cache
if let Some(ref pc) = self.pack_cache {
pc.invalidate_repo(relative_path);
}
if let Some(ref actor) = self.node_actor {
let event = crate::actor::message::RefUpdateEvent {
relative_path: relative_path.to_string(),
ref_name: ref_name.to_string(),
old_oid: old_oid.to_string(),
new_oid: new_oid.to_string(),
primary_grpc_addr: self.grpc_addr.clone(),
primary_storage_name: String::new(),
};
crate::actor::handler::broadcast_ref_update(actor, event);
}
}
/// Inject repo_prefix as storage_path into the client-provided header
fn prefixed_header(&self, header: &crate::pb::RepositoryHeader) -> crate::pb::RepositoryHeader {
crate::pb::RepositoryHeader {
storage_path: self.repo_prefix.to_string_lossy().into_owned(),
relative_path: header.relative_path.clone(),
storage_name: header.storage_name.clone(),
}
}
}
pub async fn remote_endpoint(addr: &str) -> Result<tonic::transport::Endpoint, tonic::Status> {
let uri: tonic::codegen::http::Uri = addr
.parse()
.map_err(|e| tonic::Status::invalid_argument(format!("invalid URI: {e}")))?;
tonic::transport::Endpoint::new(uri).map_err(|e| tonic::Status::internal(e.to_string()))
}
pub(super) fn bridge_server_stream<T: Send + 'static>(
mut remote: tonic::Streaming<T>,
) -> tokio_stream::wrappers::ReceiverStream<Result<T, tonic::Status>> {
let (tx, rx) = tokio::sync::mpsc::channel(16);
tokio::spawn(async move {
use tokio_stream::StreamExt;
while let Some(item) = remote.next().await {
if tx.send(item).await.is_err() {
break;
}
}
});
tokio_stream::wrappers::ReceiverStream::new(rx)
}
async fn query_find_primary(
member: ActorCell,
header: crate::pb::RepositoryHeader,
) -> Result<Option<RouteDecision>, tonic::Status> {
let actor_ref: ActorRef<GitNodeMessage> = member.into();
match ractor::call_t!(actor_ref, GitNodeMessage::FindPrimary, 500, header) {
Ok(decision) => Ok(Some(decision)),
Err(err) => {
tracing::warn!(error = %err, "find primary query failed");
Ok(None)
}
}
}
async fn query_find_replica(
member: ActorCell,
header: crate::pb::RepositoryHeader,
) -> Result<Option<RouteDecision>, tonic::Status> {
let actor_ref: ActorRef<GitNodeMessage> = member.into();
match ractor::call_t!(actor_ref, GitNodeMessage::FindReplica, 500, header) {
Ok(decision) => Ok(Some(decision)),
Err(err) => {
tracing::warn!(error = %err, "find replica query failed");
Ok(None)
}
}
}
fn scan_bare_repos_recursively(dir: &Path, repos: &mut Vec<PathBuf>) -> GitResult<()> {
for entry in std::fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if is_bare_git_repo(&path) {
repos.push(path);
continue;
}
if path.is_dir() {
scan_bare_repos_recursively(&path, repos)?;
}
}
Ok(())
}
fn is_bare_git_repo(path: &Path) -> bool {
match is_git(path) {
Ok(repo) => repo.is_bare(),
Err(_) => false,
}
}
pub(crate) fn into_status(e: GitError) -> tonic::Status {
match &e {
GitError::NotFound(_)
| GitError::ObjectNotFound(_)
| GitError::RefNotFound(_)
| GitError::RepoNotFound => tonic::Status::not_found(e.to_string()),
GitError::InvalidArgument(_) => tonic::Status::invalid_argument(e.to_string()),
GitError::PermissionDenied(_) => tonic::Status::permission_denied(e.to_string()),
GitError::Locked(_) => tonic::Status::failed_precondition(e.to_string()),
GitError::AuthFailed(_) => tonic::Status::unauthenticated(e.to_string()),
GitError::NotBareRepository => tonic::Status::failed_precondition(e.to_string()),
_ => tonic::Status::internal(e.to_string()),
}
}
impl From<GitError> for tonic::Status {
fn from(e: GitError) -> Self {
into_status(e)
}
}
pub(crate) fn into_stream<T: Send + 'static>(
items: Vec<T>,
) -> ReceiverStream<Result<T, tonic::Status>> {
let (tx, rx) = tokio::sync::mpsc::channel(items.len().max(1));
for item in items {
let _ = tx.try_send(Ok(item));
}
ReceiverStream::new(rx)
}
pub(crate) fn git_cmd(gb: &GitBare, args: &[&str]) -> Result<std::process::Output, tonic::Status> {
let mut full_args: Vec<String> = vec![
"--git-dir".into(),
gb.bare_dir.to_string_lossy().into_owned(),
];
full_args.extend(args.iter().map(|s| s.to_string()));
tracing::debug!(
repo = %gb.bare_dir.display(),
args = %full_args.iter().skip(2).cloned().collect::<Vec<_>>().join(" "),
"spawning git subprocess"
);
let result = std::process::Command::new("git")
.args(&full_args)
.output()
.map_err(|e| {
tracing::error!(
repo = %gb.bare_dir.display(),
error = %e,
"failed to spawn git subprocess"
);
tonic::Status::internal(e.to_string())
})?;
if !result.status.success() {
let stderr = String::from_utf8_lossy(&result.stderr);
tracing::warn!(
repo = %gb.bare_dir.display(),
status = ?result.status.code(),
stderr = %stderr.trim(),
"git subprocess exited with non-zero status"
);
}
Ok(result)
}
pub async fn serve(
addr: std::net::SocketAddr,
svc: GitksService,
) -> Result<(), tonic::transport::Error> {
let span = tracing::info_span!("gitks.server", %addr);
let _enter = span.enter();
tracing::info!("registering gRPC services");
let server = tonic::transport::Server::builder()
.add_service(repository_service_server::RepositoryServiceServer::new(
svc.clone(),
))
.add_service(archive_service_server::ArchiveServiceServer::new(
svc.clone(),
))
.add_service(blame_service_server::BlameServiceServer::new(svc.clone()))
.add_service(branch_service_server::BranchServiceServer::new(svc.clone()))
.add_service(commit_service_server::CommitServiceServer::new(svc.clone()))
.add_service(diff_service_server::DiffServiceServer::new(svc.clone()))
.add_service(merge_service_server::MergeServiceServer::new(svc.clone()))
.add_service(pack_service_server::PackServiceServer::new(svc.clone()))
.add_service(ref_service_server::RefServiceServer::new(svc.clone()))
.add_service(remote_service_server::RemoteServiceServer::new(svc.clone()))
.add_service(tag_service_server::TagServiceServer::new(svc.clone()))
.add_service(tree_service_server::TreeServiceServer::new(svc));
tracing::info!("server ready, starting to accept connections");
server.serve(addr).await
}