281 lines
8.7 KiB
Rust
281 lines
8.7 KiB
Rust
//! Copyright (c) 2022-2026 GitDataAi All rights reserved.
|
|
|
|
//! Repository-level rate limiting via per-repo semaphores.
|
|
//!
|
|
//! Prevents any single repository from consuming all server resources.
|
|
//! Uses `tokio::sync::Semaphore` with configurable max concurrent operations.
|
|
//!
|
|
//! Integration pattern:
|
|
//! let _guard = rate_limit::acquire(svc, header).await?;
|
|
//! // ... handle request ...
|
|
//! // guard is dropped here → permit released
|
|
|
|
use dashmap::DashMap;
|
|
use std::sync::{Arc, OnceLock, RwLock};
|
|
use std::time::Instant;
|
|
use tokio::sync::Semaphore;
|
|
|
|
use crate::config::{DEFAULT_MAX_CONCURRENT_OPS, SEMAPHORE_IDLE_THRESHOLD_SECS};
|
|
|
|
/// Per-repository rate limiter entry with usage tracking.
|
|
struct SemaphoreEntry {
|
|
sem: Arc<Semaphore>,
|
|
max_permits: usize,
|
|
last_accessed: RwLock<Instant>,
|
|
}
|
|
|
|
/// Global rate limiter state.
|
|
struct RateLimiter {
|
|
/// Per-repository semaphores. Key = repository relative_path.
|
|
semaphores: DashMap<String, SemaphoreEntry>,
|
|
/// Max concurrent operations per repository (protected by RwLock for runtime updates).
|
|
max_concurrent: RwLock<usize>,
|
|
}
|
|
|
|
static RATE_LIMITER: OnceLock<RateLimiter> = OnceLock::new();
|
|
|
|
fn limiter() -> &'static RateLimiter {
|
|
RATE_LIMITER.get_or_init(|| {
|
|
let max = std::env::var("GITKS_RATE_LIMIT_MAX_CONCURRENT")
|
|
.ok()
|
|
.and_then(|v| v.parse().ok())
|
|
.unwrap_or(DEFAULT_MAX_CONCURRENT_OPS);
|
|
|
|
tracing::info!(
|
|
max_concurrent = max,
|
|
"repository-level rate limiter initialized"
|
|
);
|
|
|
|
RateLimiter {
|
|
semaphores: DashMap::new(),
|
|
max_concurrent: RwLock::new(max),
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Get the current max_concurrent value.
|
|
fn get_max_concurrent() -> usize {
|
|
*limiter()
|
|
.max_concurrent
|
|
.read()
|
|
.unwrap_or_else(|e| e.into_inner())
|
|
}
|
|
|
|
|
|
|
|
/// A guard that holds a rate-limit permit. The permit is released on drop.
|
|
pub struct RateLimitGuard {
|
|
/// The semaphore permit. Dropping this releases the permit.
|
|
_permit: tokio::sync::OwnedSemaphorePermit,
|
|
}
|
|
|
|
/// Acquire a rate-limit permit for the given repository.
|
|
///
|
|
/// If the repository has `max_concurrent` operations already in flight,
|
|
/// this will wait asynchronously until a permit becomes available.
|
|
///
|
|
/// Returns `None` if no repository header is provided (e.g., global health checks).
|
|
pub async fn acquire(repo_relative_path: Option<&str>) -> Option<RateLimitGuard> {
|
|
let repo = repo_relative_path?;
|
|
if repo.is_empty() {
|
|
return None;
|
|
}
|
|
let max_concurrent = get_max_concurrent();
|
|
if max_concurrent == 0 {
|
|
return None;
|
|
}
|
|
|
|
let sem = {
|
|
let entry = limiter()
|
|
.semaphores
|
|
.entry(repo.to_string())
|
|
.or_insert_with(|| SemaphoreEntry {
|
|
sem: Arc::new(Semaphore::new(max_concurrent)),
|
|
max_permits: max_concurrent,
|
|
last_accessed: RwLock::new(Instant::now()),
|
|
});
|
|
if let Ok(mut last) = entry.last_accessed.write() {
|
|
*last = Instant::now();
|
|
}
|
|
entry.sem.clone()
|
|
};
|
|
|
|
let _ = repo;
|
|
|
|
match tokio::time::timeout(
|
|
std::time::Duration::from_secs(30),
|
|
sem.clone().acquire_owned(),
|
|
)
|
|
.await
|
|
{
|
|
Ok(Ok(permit)) => {
|
|
tracing::debug!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
available = sem.available_permits(),
|
|
"rate limit permit acquired"
|
|
);
|
|
crate::metrics::record_rate_limit_acquire(repo_relative_path.unwrap_or(""));
|
|
Some(RateLimitGuard { _permit: permit })
|
|
}
|
|
Ok(Err(_closed)) => {
|
|
// Semaphore was closed — recreate it
|
|
tracing::warn!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
"rate limit semaphore closed, recreating"
|
|
);
|
|
let max = get_max_concurrent();
|
|
let new_sem = Arc::new(Semaphore::new(max));
|
|
let permit = match new_sem.clone().acquire_owned().await {
|
|
Ok(permit) => permit,
|
|
Err(_closed) => {
|
|
tracing::warn!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
"new rate limit semaphore closed unexpectedly"
|
|
);
|
|
return None;
|
|
}
|
|
};
|
|
limiter().semaphores.insert(
|
|
repo_relative_path.unwrap_or("").to_string(),
|
|
SemaphoreEntry {
|
|
sem: new_sem,
|
|
max_permits: get_max_concurrent(),
|
|
last_accessed: RwLock::new(Instant::now()),
|
|
},
|
|
);
|
|
Some(RateLimitGuard { _permit: permit })
|
|
}
|
|
Err(_elapsed) => {
|
|
tracing::warn!(
|
|
repo = %repo_relative_path.unwrap_or(""),
|
|
max_concurrent = get_max_concurrent(),
|
|
"rate limit timeout waiting for permit"
|
|
);
|
|
None
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Acquire a rate-limit permit, returning a tonic error on timeout / overload.
|
|
pub async fn acquire_or_reject(
|
|
repo_relative_path: Option<&str>,
|
|
) -> Result<Option<RateLimitGuard>, tonic::Status> {
|
|
let repo = repo_relative_path.unwrap_or("");
|
|
if repo.is_empty() {
|
|
return Ok(None);
|
|
}
|
|
match acquire(Some(repo)).await {
|
|
Some(guard) => Ok(Some(guard)),
|
|
None => {
|
|
if get_max_concurrent() == 0 {
|
|
return Ok(None);
|
|
}
|
|
crate::metrics::record_rate_limit_reject(repo);
|
|
Err(tonic::Status::resource_exhausted(format!(
|
|
"rate limit exceeded for repository '{repo}': max {max} concurrent operations",
|
|
max = get_max_concurrent()
|
|
)))
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Remove the semaphore for a repository (called on repo deletion).
|
|
pub fn remove_repository(repo_relative_path: &str) {
|
|
limiter().semaphores.remove(repo_relative_path);
|
|
tracing::debug!(repo = %repo_relative_path, "rate limit semaphore removed");
|
|
}
|
|
|
|
/// Clean up idle semaphores that have no active permits and haven't been
|
|
/// accessed within the idle threshold.
|
|
///
|
|
/// Call this periodically (e.g., from a background task) to prevent
|
|
/// unbounded growth of the semaphore map.
|
|
pub fn cleanup_idle_semaphores() {
|
|
let threshold = std::time::Duration::from_secs(SEMAPHORE_IDLE_THRESHOLD_SECS);
|
|
let now = Instant::now();
|
|
let max_concurrent = get_max_concurrent();
|
|
let mut removed = 0u64;
|
|
|
|
limiter().semaphores.retain(|_key, entry| {
|
|
let is_idle = entry.sem.available_permits() == max_concurrent;
|
|
let is_stale = entry
|
|
.last_accessed
|
|
.read()
|
|
.map(|last| now.duration_since(*last) > threshold)
|
|
.unwrap_or(false);
|
|
|
|
let keep = !(is_idle && is_stale);
|
|
if !keep {
|
|
removed += 1;
|
|
}
|
|
keep
|
|
});
|
|
|
|
if removed > 0 {
|
|
tracing::info!(
|
|
removed = removed,
|
|
"cleaned up idle rate-limit semaphores"
|
|
);
|
|
}
|
|
}
|
|
|
|
/// Start a background task to periodically clean up idle semaphores.
|
|
pub fn start_semaphore_cleanup_task() -> tokio::task::JoinHandle<()> {
|
|
let interval = std::time::Duration::from_secs(60);
|
|
tokio::spawn(async move {
|
|
let mut ticker = tokio::time::interval(interval);
|
|
loop {
|
|
ticker.tick().await;
|
|
cleanup_idle_semaphores();
|
|
}
|
|
})
|
|
}
|
|
|
|
/// Update the max concurrent limit at runtime.
|
|
///
|
|
/// Only replaces semaphores that have no active permits (idle repos).
|
|
/// Semaphores with in-flight operations are left untouched to avoid
|
|
/// a race where active permits are held against a now-replaced semaphore
|
|
/// while the new one grants a full set of permits — leading to over-admission.
|
|
/// Those repos will pick up the new limit once all permits are returned.
|
|
pub fn set_max_concurrent(max: usize) {
|
|
let l = limiter();
|
|
|
|
let old_max = get_max_concurrent();
|
|
|
|
match l.max_concurrent.write() {
|
|
Ok(mut guard) => {
|
|
*guard = max;
|
|
}
|
|
Err(e) => {
|
|
let mut guard = e.into_inner();
|
|
*guard = max;
|
|
}
|
|
}
|
|
|
|
let keys: Vec<String> = l
|
|
.semaphores
|
|
.iter()
|
|
.filter_map(|entry| {
|
|
if entry.value().max_permits == old_max {
|
|
Some(entry.key().clone())
|
|
} else {
|
|
None
|
|
}
|
|
})
|
|
.collect();
|
|
|
|
for key in keys {
|
|
l.semaphores.insert(
|
|
key,
|
|
SemaphoreEntry {
|
|
sem: Arc::new(Semaphore::new(max)),
|
|
max_permits: max,
|
|
last_accessed: RwLock::new(Instant::now()),
|
|
},
|
|
);
|
|
}
|
|
|
|
tracing::info!(max_concurrent = max, "rate limit max_concurrent updated");
|
|
}
|