//! Copyright (c) 2022-2026 GitDataAi All rights reserved. //! Repository-level rate limiting via per-repo semaphores. //! //! Prevents any single repository from consuming all server resources. //! Uses `tokio::sync::Semaphore` with configurable max concurrent operations. //! //! Integration pattern: //! let _guard = rate_limit::acquire(svc, header).await?; //! // ... handle request ... //! // guard is dropped here → permit released use dashmap::DashMap; use std::sync::{Arc, OnceLock, RwLock}; use std::time::Instant; use tokio::sync::Semaphore; use crate::config::{DEFAULT_MAX_CONCURRENT_OPS, SEMAPHORE_IDLE_THRESHOLD_SECS}; /// Per-repository rate limiter entry with usage tracking. struct SemaphoreEntry { sem: Arc, max_permits: usize, last_accessed: RwLock, } /// Global rate limiter state. struct RateLimiter { /// Per-repository semaphores. Key = repository relative_path. semaphores: DashMap, /// Max concurrent operations per repository (protected by RwLock for runtime updates). max_concurrent: RwLock, } static RATE_LIMITER: OnceLock = OnceLock::new(); fn limiter() -> &'static RateLimiter { RATE_LIMITER.get_or_init(|| { let max = std::env::var("GITKS_RATE_LIMIT_MAX_CONCURRENT") .ok() .and_then(|v| v.parse().ok()) .unwrap_or(DEFAULT_MAX_CONCURRENT_OPS); tracing::info!( max_concurrent = max, "repository-level rate limiter initialized" ); RateLimiter { semaphores: DashMap::new(), max_concurrent: RwLock::new(max), } }) } /// Get the current max_concurrent value. fn get_max_concurrent() -> usize { *limiter() .max_concurrent .read() .unwrap_or_else(|e| e.into_inner()) } /// A guard that holds a rate-limit permit. The permit is released on drop. pub struct RateLimitGuard { /// The semaphore permit. Dropping this releases the permit. _permit: tokio::sync::OwnedSemaphorePermit, } /// Acquire a rate-limit permit for the given repository. /// /// If the repository has `max_concurrent` operations already in flight, /// this will wait asynchronously until a permit becomes available. /// /// Returns `None` if no repository header is provided (e.g., global health checks). pub async fn acquire(repo_relative_path: Option<&str>) -> Option { let repo = repo_relative_path?; if repo.is_empty() { return None; } let max_concurrent = get_max_concurrent(); if max_concurrent == 0 { return None; } let sem = { let entry = limiter() .semaphores .entry(repo.to_string()) .or_insert_with(|| SemaphoreEntry { sem: Arc::new(Semaphore::new(max_concurrent)), max_permits: max_concurrent, last_accessed: RwLock::new(Instant::now()), }); if let Ok(mut last) = entry.last_accessed.write() { *last = Instant::now(); } entry.sem.clone() }; let _ = repo; match tokio::time::timeout( std::time::Duration::from_secs(30), sem.clone().acquire_owned(), ) .await { Ok(Ok(permit)) => { tracing::debug!( repo = %repo_relative_path.unwrap_or(""), available = sem.available_permits(), "rate limit permit acquired" ); crate::metrics::record_rate_limit_acquire(repo_relative_path.unwrap_or("")); Some(RateLimitGuard { _permit: permit }) } Ok(Err(_closed)) => { // Semaphore was closed — recreate it tracing::warn!( repo = %repo_relative_path.unwrap_or(""), "rate limit semaphore closed, recreating" ); let max = get_max_concurrent(); let new_sem = Arc::new(Semaphore::new(max)); let permit = match new_sem.clone().acquire_owned().await { Ok(permit) => permit, Err(_closed) => { tracing::warn!( repo = %repo_relative_path.unwrap_or(""), "new rate limit semaphore closed unexpectedly" ); return None; } }; limiter().semaphores.insert( repo_relative_path.unwrap_or("").to_string(), SemaphoreEntry { sem: new_sem, max_permits: get_max_concurrent(), last_accessed: RwLock::new(Instant::now()), }, ); Some(RateLimitGuard { _permit: permit }) } Err(_elapsed) => { tracing::warn!( repo = %repo_relative_path.unwrap_or(""), max_concurrent = get_max_concurrent(), "rate limit timeout waiting for permit" ); None } } } /// Acquire a rate-limit permit, returning a tonic error on timeout / overload. pub async fn acquire_or_reject( repo_relative_path: Option<&str>, ) -> Result, tonic::Status> { let repo = repo_relative_path.unwrap_or(""); if repo.is_empty() { return Ok(None); } match acquire(Some(repo)).await { Some(guard) => Ok(Some(guard)), None => { if get_max_concurrent() == 0 { return Ok(None); } crate::metrics::record_rate_limit_reject(repo); Err(tonic::Status::resource_exhausted(format!( "rate limit exceeded for repository '{repo}': max {max} concurrent operations", max = get_max_concurrent() ))) } } } /// Remove the semaphore for a repository (called on repo deletion). pub fn remove_repository(repo_relative_path: &str) { limiter().semaphores.remove(repo_relative_path); tracing::debug!(repo = %repo_relative_path, "rate limit semaphore removed"); } /// Clean up idle semaphores that have no active permits and haven't been /// accessed within the idle threshold. /// /// Call this periodically (e.g., from a background task) to prevent /// unbounded growth of the semaphore map. pub fn cleanup_idle_semaphores() { let threshold = std::time::Duration::from_secs(SEMAPHORE_IDLE_THRESHOLD_SECS); let now = Instant::now(); let max_concurrent = get_max_concurrent(); let mut removed = 0u64; limiter().semaphores.retain(|_key, entry| { let is_idle = entry.sem.available_permits() == max_concurrent; let is_stale = entry .last_accessed .read() .map(|last| now.duration_since(*last) > threshold) .unwrap_or(false); let keep = !(is_idle && is_stale); if !keep { removed += 1; } keep }); if removed > 0 { tracing::info!(removed = removed, "cleaned up idle rate-limit semaphores"); } } /// Start a background task to periodically clean up idle semaphores. pub fn start_semaphore_cleanup_task() -> tokio::task::JoinHandle<()> { let interval = std::time::Duration::from_secs(60); tokio::spawn(async move { let mut ticker = tokio::time::interval(interval); loop { ticker.tick().await; cleanup_idle_semaphores(); } }) } /// Update the max concurrent limit at runtime. /// /// Only replaces semaphores that have no active permits (idle repos). /// Semaphores with in-flight operations are left untouched to avoid /// a race where active permits are held against a now-replaced semaphore /// while the new one grants a full set of permits — leading to over-admission. /// Those repos will pick up the new limit once all permits are returned. pub fn set_max_concurrent(max: usize) { let l = limiter(); let old_max = get_max_concurrent(); match l.max_concurrent.write() { Ok(mut guard) => { *guard = max; } Err(e) => { let mut guard = e.into_inner(); *guard = max; } } let keys: Vec = l .semaphores .iter() .filter_map(|entry| { if entry.value().max_permits == old_max { Some(entry.key().clone()) } else { None } }) .collect(); for key in keys { l.semaphores.insert( key, SemaphoreEntry { sem: Arc::new(Semaphore::new(max)), max_permits: max, last_accessed: RwLock::new(Instant::now()), }, ); } tracing::info!(max_concurrent = max, "rate limit max_concurrent updated"); }