fix(rate-limit): avoid over-admission when updating max_concurrent
Only replace semaphores that have no active permits (idle repos). Previously all semaphores were replaced, allowing active permits from old semaphores plus full permits from new ones simultaneously.
This commit is contained in:
+16
-5
@@ -157,27 +157,38 @@ pub fn remove_repository(repo_relative_path: &str) {
|
||||
}
|
||||
|
||||
/// Update the max concurrent limit at runtime.
|
||||
/// This properly updates the limit and recreates all existing semaphores.
|
||||
///
|
||||
/// Only replaces semaphores that have no active permits (idle repos).
|
||||
/// Semaphores with in-flight operations are left untouched to avoid
|
||||
/// a race where active permits are held against a now-replaced semaphore
|
||||
/// while the new one grants a full set of permits — leading to over-admission.
|
||||
/// Those repos will pick up the new limit once all permits are returned.
|
||||
pub fn set_max_concurrent(max: usize) {
|
||||
let l = limiter();
|
||||
|
||||
// Update the max_concurrent value
|
||||
let old_max = get_max_concurrent();
|
||||
|
||||
match l.max_concurrent.write() {
|
||||
Ok(mut guard) => {
|
||||
*guard = max;
|
||||
}
|
||||
Err(e) => {
|
||||
// Poisoned lock - recover and update
|
||||
let mut guard = e.into_inner();
|
||||
*guard = max;
|
||||
}
|
||||
}
|
||||
|
||||
// Recreate all existing semaphores with the new limit
|
||||
let keys: Vec<String> = l
|
||||
.semaphores
|
||||
.iter()
|
||||
.map(|entry| entry.key().clone())
|
||||
.filter_map(|entry| {
|
||||
let sem = entry.value();
|
||||
if sem.available_permits() == old_max {
|
||||
Some(entry.key().clone())
|
||||
} else {
|
||||
None
|
||||
}
|
||||
})
|
||||
.collect();
|
||||
|
||||
for key in keys {
|
||||
|
||||
Reference in New Issue
Block a user