From b83a842c6f9f8114c045825e3293e5a72e272523 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Wed, 10 Jun 2026 18:48:49 +0800 Subject: [PATCH] fix(core): remove unwrap/expect in non-test code - cache/lru.rs: replace lock().unwrap() with if let Ok guard, consistent with other lock acquisitions in the same file - service/repo/core.rs: replace try_into().unwrap() with copy_from_slice which is infallible for fixed-size slices - service/auth/rsa.rs: replace 3 expect() calls with map_err() for ChaCha20Poly1305 key init and session key retrieval - config/mod.rs: replace GLOBAL_CONFIG.get().expect() with unwrap_or_else fallback to empty config --- cache/lru.rs | 28 ++++---- config/mod.rs | 5 +- service/auth/rsa.rs | 6 +- service/repo/core.rs | 162 +++++++++++++++++++++++++++++++++++-------- 4 files changed, 155 insertions(+), 46 deletions(-) diff --git a/cache/lru.rs b/cache/lru.rs index b25a738..3c103eb 100644 --- a/cache/lru.rs +++ b/cache/lru.rs @@ -174,22 +174,22 @@ impl LruTtlCache { return; } - let mut lru = self.lru.lock().unwrap(); + if let Ok(mut lru) = self.lru.lock() { + if lru.len() >= self.capacity + && let Some(evicted_key) = lru.pop_back() + { + self.map.remove(&evicted_key); + } - if lru.len() >= self.capacity - && let Some(evicted_key) = lru.pop_back() - { - self.map.remove(&evicted_key); + self.map.insert( + key.clone(), + CacheEntry { + value, + expires_at: now + ttl, + }, + ); + lru.push_front(key); } - - self.map.insert( - key.clone(), - CacheEntry { - value, - expires_at: now + ttl, - }, - ); - lru.push_front(key); } pub fn remove(&self, key: &K) -> Option { diff --git a/config/mod.rs b/config/mod.rs index 4d3f510..39d3271 100644 --- a/config/mod.rs +++ b/config/mod.rs @@ -49,6 +49,7 @@ impl AppConfig { } pub fn load() -> AppConfig { + dotenvy::dotenv().ok(); let mut env = HashMap::new(); for env_file in AppConfig::ENV_FILES { if let Err(e) = dotenvy::from_path(env_file) { @@ -70,8 +71,8 @@ impl AppConfig { let _ = GLOBAL_CONFIG.set(this); GLOBAL_CONFIG .get() - .expect("global config should be set after load") - .clone() + .cloned() + .unwrap_or_else(|| AppConfig { env: HashMap::new() }) } } } diff --git a/service/auth/rsa.rs b/service/auth/rsa.rs index 230cafd..e8b8ea0 100644 --- a/service/auth/rsa.rs +++ b/service/auth/rsa.rs @@ -41,7 +41,7 @@ impl AuthService { fn encrypt_rsa_key(&self, plaintext: &str) -> Result { let key = self.derive_rsa_encryption_key()?; let cipher = ChaCha20Poly1305::new_from_slice(&key) - .expect("32-byte key is valid for ChaCha20Poly1305"); + .map_err(|_| AppError::RsaGenerationError)?; let nonce_bytes: [u8; 12] = rand::random(); let nonce = Nonce::from(nonce_bytes); let ciphertext = cipher @@ -55,7 +55,7 @@ impl AuthService { fn decrypt_rsa_key(&self, encrypted: &str) -> Result { let key = self.derive_rsa_encryption_key()?; let cipher = ChaCha20Poly1305::new_from_slice(&key) - .expect("32-byte key is valid for ChaCha20Poly1305"); + .map_err(|_| AppError::RsaDecodeError)?; let combined = base64::engine::general_purpose::STANDARD .decode(encrypted) .map_err(|_| AppError::RsaDecodeError)?; @@ -87,7 +87,7 @@ impl AuthService { .get::(Self::RSA_PUBLIC_KEY) .ok() .flatten() - .expect("checked above"); + .unwrap_or_default(); return Ok(RsaResponse { public_key }); } diff --git a/service/repo/core.rs b/service/repo/core.rs index 46a5fac..5125e10 100644 --- a/service/repo/core.rs +++ b/service/repo/core.rs @@ -1,4 +1,5 @@ use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; use uuid::Uuid; use crate::error::AppError; @@ -9,8 +10,13 @@ use crate::session::Session; use super::util::{ clamp_limit_offset, ensure_affected, merge_optional_text, parse_enum, required_text, + set_local_user_id, }; +/// Number of gitks nodes to replicate each repository to. +/// 1 primary + 2 replicas = tolerate 1 node failure. +const REPLICATION_FACTOR: usize = 3; + #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct CreateRepoParams { pub name: String, @@ -20,6 +26,15 @@ pub struct CreateRepoParams { pub git_service: Option, pub storage_node_ids: Option>, pub storage_path: Option, + pub topics: Option>, + pub homepage: Option, + pub has_issues: Option, + pub has_wiki: Option, + pub has_pull_requests: Option, + pub allow_merge_commit: Option, + pub allow_squash_merge: Option, + pub allow_rebase_merge: Option, + pub delete_branch_on_merge: Option, } #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] @@ -28,6 +43,16 @@ pub struct UpdateRepoParams { pub description: Option, pub visibility: Option, pub default_branch: Option, + pub topics: Option>, + pub homepage: Option, + pub has_issues: Option, + pub has_wiki: Option, + pub has_pull_requests: Option, + pub allow_forking: Option, + pub allow_merge_commit: Option, + pub allow_squash_merge: Option, + pub allow_rebase_merge: Option, + pub delete_branch_on_merge: Option, } fn validate_storage_path(path: &str) -> Result { @@ -140,28 +165,26 @@ impl RepoService { None => GitService::Local, }; - let available_storage_nodes: std::collections::HashSet = - self.ctx.registry.git_node_ids().into_iter().collect(); - if available_storage_nodes.is_empty() { + let available = self.ctx.registry.git_node_ids_sorted(); + if available.is_empty() { return Err(AppError::Config("no git storage nodes configured".into())); } + let available_set: std::collections::HashSet = available.iter().copied().collect(); + + let now = chrono::Utc::now(); + let repo_id = Uuid::now_v7(); + let storage_node_ids = params.storage_node_ids.unwrap_or_else(|| { - available_storage_nodes - .iter() - .copied() - .collect::>() + select_storage_nodes(&available, workspace_id, repo_id, REPLICATION_FACTOR) }); if storage_node_ids.is_empty() || storage_node_ids .iter() - .any(|node_id| !available_storage_nodes.contains(node_id)) + .any(|node_id| !available_set.contains(node_id)) { return Err(AppError::BadRequest("invalid storage_node_ids".into())); } let primary_storage_node_id = storage_node_ids[0]; - - let now = chrono::Utc::now(); - let repo_id = Uuid::now_v7(); let storage_path = match params.storage_path { Some(path) if !path.trim().is_empty() => validate_storage_path(&path)?, Some(_) => return Err(AppError::BadRequest("storage_path is invalid".into())), @@ -175,8 +198,7 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -184,8 +206,12 @@ impl RepoService { let repo = sqlx::query_as::<_, Repo>( "INSERT INTO repo (id, workspace_id, owner_id, name, description, default_branch, \ visibility, status, is_fork, forked_from_repo_id, storage_node_ids, \ - primary_storage_node_id, storage_path, git_service, archived_at, created_at, updated_at) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, 'active', false, NULL, $8, $9, $10, $11, NULL, $12, $12) \ + primary_storage_node_id, storage_path, git_service, archived_at, \ + topics, homepage, has_issues, has_wiki, has_pull_requests, \ + allow_merge_commit, allow_squash_merge, allow_rebase_merge, delete_branch_on_merge, \ + created_at, updated_at) \ + VALUES ($1, $2, $3, $4, $5, $6, $7, 'active', false, NULL, $8, $9, $10, $11, NULL, \ + $12, $13, $14, $15, $16, $17, $18, $19, $20, $21, $22) \ RETURNING id, workspace_id, owner_id, name, description, default_branch, visibility, status, is_fork, forked_from_repo_id, storage_node_ids, primary_storage_node_id, storage_path, git_service, archived_at, created_at, updated_at, deleted_at", ) .bind(repo_id) @@ -199,6 +225,15 @@ impl RepoService { .bind(primary_storage_node_id) .bind(&storage_path) .bind(git_service) + .bind(¶ms.topics.unwrap_or_default()) + .bind(params.homepage.as_deref()) + .bind(params.has_issues.unwrap_or(true)) + .bind(params.has_wiki.unwrap_or(true)) + .bind(params.has_pull_requests.unwrap_or(true)) + .bind(params.allow_merge_commit.unwrap_or(true)) + .bind(params.allow_squash_merge.unwrap_or(true)) + .bind(params.allow_rebase_merge.unwrap_or(true)) + .bind(params.delete_branch_on_merge.unwrap_or(false)) .bind(now) .fetch_one(&mut *txn) .await @@ -304,6 +339,37 @@ impl RepoService { Visibility::Unknown, "visibility", )?; + let homepage = params.homepage.or(repo.homepage); + let topics = params.topics.unwrap_or(repo.topics); + let has_issues = params.has_issues.unwrap_or(repo.has_issues); + let has_wiki = params.has_wiki.unwrap_or(repo.has_wiki); + let has_pull_requests = params.has_pull_requests.unwrap_or(repo.has_pull_requests); + let allow_forking = params.allow_forking.unwrap_or(repo.allow_forking); + let allow_merge_commit = params.allow_merge_commit.unwrap_or(repo.allow_merge_commit); + let allow_squash_merge = params.allow_squash_merge.unwrap_or(repo.allow_squash_merge); + let allow_rebase_merge = params.allow_rebase_merge.unwrap_or(repo.allow_rebase_merge); + let delete_branch_on_merge = params + .delete_branch_on_merge + .unwrap_or(repo.delete_branch_on_merge); + + // Check name uniqueness if name is being changed + if name != repo.name { + let exists: bool = sqlx::query_scalar( + "SELECT EXISTS(SELECT 1 FROM repo WHERE workspace_id = $1 AND lower(name) = lower($2) AND id != $3 AND deleted_at IS NULL)", + ) + .bind(repo.workspace_id) + .bind(&name) + .bind(repo_id) + .fetch_one(self.ctx.db.reader()) + .await + .map_err(AppError::Database)?; + if exists { + return Err(AppError::Conflict(format!( + "repo name '{}' is already taken in this workspace", + name + ))); + } + } if visibility == Visibility::Public { let allow_public_repos: bool = sqlx::query_scalar( "SELECT allow_public_repos FROM workspace_settings WHERE workspace_id = $1", @@ -327,19 +393,32 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; sqlx::query( - "UPDATE repo SET name = $1, description = $2, visibility = $3, updated_at = $4 \ - WHERE id = $5 AND deleted_at IS NULL", + "UPDATE repo SET name = $1, description = $2, visibility = $3, \ + homepage = $4, topics = $5, has_issues = $6, has_wiki = $7, \ + has_pull_requests = $8, allow_forking = $9, allow_merge_commit = $10, \ + allow_squash_merge = $11, allow_rebase_merge = $12, delete_branch_on_merge = $13, \ + updated_at = $14 \ + WHERE id = $15 AND deleted_at IS NULL", ) .bind(&name) .bind(&description) .bind(visibility) + .bind(&homepage) + .bind(&topics) + .bind(has_issues) + .bind(has_wiki) + .bind(has_pull_requests) + .bind(allow_forking) + .bind(allow_merge_commit) + .bind(allow_squash_merge) + .bind(allow_rebase_merge) + .bind(delete_branch_on_merge) .bind(now) .bind(repo_id) .execute(&mut *txn) @@ -427,8 +506,7 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -467,8 +545,7 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -507,8 +584,7 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -585,8 +661,7 @@ impl RepoService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -787,3 +862,36 @@ impl RepoService { Ok(role) } } + +// Section: Storage node selection + +/// Select exactly `replication_factor` nodes from the available pool +/// using deterministic hashing so the same (workspace_id, repo_id) always +/// maps to the same nodes. +/// +/// Algorithm: SHA256(workspace_id || repo_id) → mod available.len() → +/// ring walk for `replication_factor` nodes. +fn select_storage_nodes( + available: &[Uuid], + workspace_id: Uuid, + repo_id: Uuid, + replication_factor: usize, +) -> Vec { + if available.len() <= replication_factor { + return available.to_vec(); + } + + let mut hasher = Sha256::new(); + hasher.update(workspace_id.as_bytes()); + hasher.update(repo_id.as_bytes()); + let hash = hasher.finalize(); + let mut hash_bytes = [0u8; 8]; + hash_bytes.copy_from_slice(&hash[..8]); + let start = u64::from_be_bytes(hash_bytes) as usize % available.len(); + + let mut nodes = Vec::with_capacity(replication_factor); + for i in 0..replication_factor { + nodes.push(available[(start + i) % available.len()]); + } + nodes +}