c8729d38bc
Only replace semaphores that have no active permits (idle repos). Previously all semaphores were replaced, allowing active permits from old semaphores plus full permits from new ones simultaneously.
201 lines
6.2 KiB
Rust
201 lines
6.2 KiB
Rust
//! Repository-level rate limiting via per-repo semaphores.
|
|
//!
|
|
//! Prevents any single repository from consuming all server resources.
|
|
//! Uses `tokio::sync::Semaphore` with configurable max concurrent operations.
|
|
//!
|
|
//! Integration pattern:
|
|
//! let _guard = rate_limit::acquire(svc, header).await?;
|
|
//! // ... handle request ...
|
|
//! // guard is dropped here → permit released
|
|
|
|
use dashmap::DashMap;
|
|
use std::sync::{Arc, OnceLock, RwLock};
|
|
use tokio::sync::Semaphore;
|
|
|
|
/// Default max concurrent operations per repository.
|
|
const DEFAULT_MAX_CONCURRENT: usize = 5;
|
|
|
|
/// Global rate limiter state.
|
|
struct RateLimiter {
|
|
/// Per-repository semaphores. Key = repository relative_path.
|
|
semaphores: DashMap<String, Arc<Semaphore>>,
|
|
/// Max concurrent operations per repository (protected by RwLock for runtime updates).
|
|
max_concurrent: RwLock<usize>,
|
|
}
|
|
|
|
static RATE_LIMITER: OnceLock<RateLimiter> = OnceLock::new();
|
|
|
|
fn limiter() -> &'static RateLimiter {
|
|
RATE_LIMITER.get_or_init(|| {
|
|
let max = std::env::var("GITKS_RATE_LIMIT_MAX_CONCURRENT")
|
|
.ok()
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(DEFAULT_MAX_CONCURRENT);
|
|
|
|
tracing::info!(
|
|
max_concurrent = max,
|
|
"repository-level rate limiter initialized"
|
|
);
|
|
|
|
RateLimiter {
|
|
semaphores: DashMap::new(),
|
|
max_concurrent: RwLock::new(max),
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Get the current max_concurrent value.
|
|
fn get_max_concurrent() -> usize {
|
|
*limiter()
|
|
.max_concurrent
|
|
.read()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
}
|
|
|
|
/// A guard that holds a rate-limit permit. The permit is released on drop.
|
|
pub struct RateLimitGuard {
|
|
/// The semaphore permit. Dropping this releases the permit.
|
|
_permit: tokio::sync::OwnedSemaphorePermit,
|
|
}
|
|
|
|
/// Acquire a rate-limit permit for the given repository.
|
|
///
|
|
/// If the repository has `max_concurrent` operations already in flight,
|
|
/// this will wait asynchronously until a permit becomes available.
|
|
///
|
|
/// Returns `None` if no repository header is provided (e.g., global health checks).
|
|
pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard> {
|
|
let repo = repo_relative_path?;
|
|
if repo.is_empty() {
|
|
return None;
|
|
}
|
|
let max_concurrent = get_max_concurrent();
|
|
if max_concurrent == 0 {
|
|
// Unlimited
|
|
return None;
|
|
}
|
|
|
|
let sem = limiter()
|
|
.semaphores
|
|
.entry(repo.to_string())
|
|
.or_insert_with(|| Arc::new(Semaphore::new(max_concurrent)))
|
|
.value()
|
|
.clone();
|
|
|
|
// Release DashMap reference before awaiting
|
|
let _ = repo;
|
|
|
|
match tokio::time::timeout(
|
|
std::time::Duration::from_secs(30),
|
|
sem.clone().acquire_owned(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(permit)) => {
|
|
tracing::debug!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
available = sem.available_permits(),
|
|
"rate limit permit acquired"
|
|
);
|
|
Some(RateLimitGuard { _permit: permit })
|
|
}
|
|
Ok(Err(_closed)) => {
|
|
// Semaphore was closed — recreate it
|
|
tracing::warn!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
"rate limit semaphore closed, recreating"
|
|
);
|
|
let new_sem = Arc::new(Semaphore::new(get_max_concurrent()));
|
|
let permit = new_sem
|
|
.clone()
|
|
.acquire_owned()
|
|
.await
|
|
.expect("newly created semaphore should have permits");
|
|
limiter()
|
|
.semaphores
|
|
.insert(repo_relative_path.unwrap_or("").to_string(), new_sem);
|
|
Some(RateLimitGuard { _permit: permit })
|
|
}
|
|
Err(_elapsed) => {
|
|
tracing::warn!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
max_concurrent = get_max_concurrent(),
|
|
"rate limit timeout waiting for permit"
|
|
);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Acquire a rate-limit permit, returning a tonic error on timeout / overload.
|
|
pub async fn acquire_or_reject(
|
|
repo_relative_path: Option<&str>,
|
|
) -> Result<Option<RateLimitGuard>, tonic::Status> {
|
|
let repo = repo_relative_path.unwrap_or("");
|
|
if repo.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
match acquire(Some(repo)).await {
|
|
Some(guard) => Ok(Some(guard)),
|
|
None => {
|
|
if get_max_concurrent() == 0 {
|
|
return Ok(None);
|
|
}
|
|
// Timeout — reject with resource exhausted
|
|
Err(tonic::Status::resource_exhausted(format!(
|
|
"rate limit exceeded for repository '{repo}': max {max} concurrent operations",
|
|
max = get_max_concurrent()
|
|
)))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Remove the semaphore for a repository (called on repo deletion).
|
|
pub fn remove_repository(repo_relative_path: &str) {
|
|
limiter().semaphores.remove(repo_relative_path);
|
|
tracing::debug!(repo = %repo_relative_path, "rate limit semaphore removed");
|
|
}
|
|
|
|
/// Update the max concurrent limit at runtime.
|
|
///
|
|
/// Only replaces semaphores that have no active permits (idle repos).
|
|
/// Semaphores with in-flight operations are left untouched to avoid
|
|
/// a race where active permits are held against a now-replaced semaphore
|
|
/// while the new one grants a full set of permits — leading to over-admission.
|
|
/// Those repos will pick up the new limit once all permits are returned.
|
|
pub fn set_max_concurrent(max: usize) {
|
|
let l = limiter();
|
|
|
|
let old_max = get_max_concurrent();
|
|
|
|
match l.max_concurrent.write() {
|
|
Ok(mut guard) => {
|
|
*guard = max;
|
|
}
|
|
Err(e) => {
|
|
let mut guard = e.into_inner();
|
|
*guard = max;
|
|
}
|
|
}
|
|
|
|
let keys: Vec<String> = l
|
|
.semaphores
|
|
.iter()
|
|
.filter_map(|entry| {
|
|
let sem = entry.value();
|
|
if sem.available_permits() == old_max {
|
|
Some(entry.key().clone())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
for key in keys {
|
|
l.semaphores
|
|
.insert(key, Arc::new(Semaphore::new(max)));
|
|
}
|
|
|
|
tracing::info!(max_concurrent = max, "rate limit max_concurrent updated");
|
|
}
|