//! Repository-level rate limiting via per-repo semaphores. //! //! Prevents any single repository from consuming all server resources. //! Uses `tokio::sync::Semaphore` with configurable max concurrent operations. //! //! Integration pattern: //! let _guard = rate_limit::acquire(svc, header).await?; //! // ... handle request ... //! // guard is dropped here → permit released use dashmap::DashMap; use std::sync::{Arc, OnceLock, RwLock}; use tokio::sync::Semaphore; /// Default max concurrent operations per repository. const DEFAULT_MAX_CONCURRENT: usize = 5; /// Global rate limiter state. struct RateLimiter { /// Per-repository semaphores. Key = repository relative_path. semaphores: DashMap>, /// Max concurrent operations per repository (protected by RwLock for runtime updates). max_concurrent: RwLock, } static RATE_LIMITER: OnceLock = OnceLock::new(); fn limiter() -> &'static RateLimiter { RATE_LIMITER.get_or_init(|| { let max = std::env::var("GITKS_RATE_LIMIT_MAX_CONCURRENT") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(DEFAULT_MAX_CONCURRENT); tracing::info!( max_concurrent = max, "repository-level rate limiter initialized" ); RateLimiter { semaphores: DashMap::new(), max_concurrent: RwLock::new(max), } }) } /// Get the current max_concurrent value. fn get_max_concurrent() -> usize { *limiter() .max_concurrent .read() .unwrap_or_else(|e| e.into_inner()) } /// A guard that holds a rate-limit permit. The permit is released on drop. pub struct RateLimitGuard { /// The semaphore permit. Dropping this releases the permit. _permit: tokio::sync::OwnedSemaphorePermit, } /// Acquire a rate-limit permit for the given repository. /// /// If the repository has `max_concurrent` operations already in flight, /// this will wait asynchronously until a permit becomes available. /// /// Returns `None` if no repository header is provided (e.g., global health checks). pub async fn acquire(repo_relative_path: Option<&str>) -> Option { let repo = repo_relative_path?; if repo.is_empty() { return None; } let max_concurrent = get_max_concurrent(); if max_concurrent == 0 { // Unlimited return None; } let sem = limiter() .semaphores .entry(repo.to_string()) .or_insert_with(|| Arc::new(Semaphore::new(max_concurrent))) .value() .clone(); // Release DashMap reference before awaiting let _ = repo; match tokio::time::timeout( std::time::Duration::from_secs(30), sem.clone().acquire_owned(), ) .await { Ok(Ok(permit)) => { tracing::debug!( repo = %repo_relative_path.unwrap_or(""), available = sem.available_permits(), "rate limit permit acquired" ); Some(RateLimitGuard { _permit: permit }) } Ok(Err(_closed)) => { // Semaphore was closed — recreate it tracing::warn!( repo = %repo_relative_path.unwrap_or(""), "rate limit semaphore closed, recreating" ); let new_sem = Arc::new(Semaphore::new(get_max_concurrent())); let permit = new_sem .clone() .acquire_owned() .await .expect("newly created semaphore should have permits"); limiter() .semaphores .insert(repo_relative_path.unwrap_or("").to_string(), new_sem); Some(RateLimitGuard { _permit: permit }) } Err(_elapsed) => { tracing::warn!( repo = %repo_relative_path.unwrap_or(""), max_concurrent = get_max_concurrent(), "rate limit timeout waiting for permit" ); None } } } /// Acquire a rate-limit permit, returning a tonic error on timeout / overload. pub async fn acquire_or_reject( repo_relative_path: Option<&str>, ) -> Result, tonic::Status> { let repo = repo_relative_path.unwrap_or(""); if repo.is_empty() { return Ok(None); } match acquire(Some(repo)).await { Some(guard) => Ok(Some(guard)), None => { if get_max_concurrent() == 0 { return Ok(None); } // Timeout — reject with resource exhausted Err(tonic::Status::resource_exhausted(format!( "rate limit exceeded for repository '{repo}': max {max} concurrent operations", max = get_max_concurrent() ))) } } } /// Remove the semaphore for a repository (called on repo deletion). pub fn remove_repository(repo_relative_path: &str) { limiter().semaphores.remove(repo_relative_path); tracing::debug!(repo = %repo_relative_path, "rate limit semaphore removed"); } /// Update the max concurrent limit at runtime. /// This properly updates the limit and recreates all existing semaphores. pub fn set_max_concurrent(max: usize) { let l = limiter(); // Update the max_concurrent value match l.max_concurrent.write() { Ok(mut guard) => { *guard = max; } Err(e) => { // Poisoned lock - recover and update let mut guard = e.into_inner(); *guard = max; } } // Recreate all existing semaphores with the new limit let keys: Vec = l .semaphores .iter() .map(|entry| entry.key().clone()) .collect(); for key in keys { l.semaphores .insert(key, Arc::new(Semaphore::new(max))); } tracing::info!(max_concurrent = max, "rate limit max_concurrent updated"); }