diff --git a/models/repos/repo_queries.rs b/models/repos/repo_queries.rs index 340f937..739ce16 100644 --- a/models/repos/repo_queries.rs +++ b/models/repos/repo_queries.rs @@ -39,6 +39,26 @@ impl Repo { .await } + /// Find multiple non-deleted repos by their IDs in a single query. + pub async fn find_by_ids( + pool: &PgPool, + ids: &[Uuid], + ) -> Result, sqlx::Error> { + if ids.is_empty() { + return Ok(Vec::new()); + } + sqlx::query_as::<_, Repo>( + r#"SELECT id, workspace_id, owner_id, name, description, default_branch, visibility, + status, is_fork, forked_from_repo_id, storage_node_ids, + primary_storage_node_id, storage_path, git_service, + archived_at, created_at, updated_at, deleted_at + FROM repo WHERE id = ANY($1) AND deleted_at IS NULL"#, + ) + .bind(ids) + .fetch_all(pool) + .await + } + /// Check if a user is an active member of a repo. pub async fn is_member( pool: &PgPool, diff --git a/service/issues/core.rs b/service/issues/core.rs index de3185d..abb8bdd 100644 --- a/service/issues/core.rs +++ b/service/issues/core.rs @@ -8,7 +8,9 @@ use crate::models::workspaces::Workspace; use crate::service::IssueService; use crate::session::Session; -use super::util::{clamp_limit_offset, ensure_affected, merge_optional_text, parse_enum}; +use super::util::{ + clamp_limit_offset, ensure_affected, merge_optional_text, parse_enum, set_local_user_id, +}; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct CreateIssueParams { @@ -73,8 +75,8 @@ impl IssueService { i.state, i.priority, i.visibility, i.locked, i.milestone_id, i.closed_by, i.closed_at, i.due_at, \ i.created_at, i.updated_at, i.deleted_at \ FROM issue i \ - LEFT JOIN issue_assignee ia ON ia.issue_id = i.id \ - LEFT JOIN issue_label_relation ilr ON ilr.issue_id = i.id \ + LEFT JOIN issue_assignee ia ON ia.issue_id = i.id AND $5::uuid IS NOT NULL \ + LEFT JOIN issue_label_relation ilr ON ilr.issue_id = i.id AND $7::uuid IS NOT NULL \ WHERE i.workspace_id = $1 AND i.deleted_at IS NULL \ AND ($2::text IS NULL OR i.state::text = $2) \ AND ($3::text IS NULL OR i.priority::text = $3) \ @@ -127,88 +129,12 @@ impl IssueService { self.ensure_workspace_role_at_least(wk_name, user_uid, Role::Member) .await?; - // Validate repo_ids belong to this workspace - for repo_id in ¶ms.repo_ids { - let repo = crate::models::repos::Repo::find_by_id(self.ctx.db.reader(), *repo_id) - .await - .map_err(AppError::Database)? - .ok_or_else(|| AppError::NotFound(format!("Repository {} not found", repo_id)))?; + self.validate_issue_repos(user_uid, &ws, ¶ms.repo_ids).await?; + self.validate_issue_labels(ws.id, ¶ms.label_ids).await?; + self.validate_issue_assignees(ws.id, ¶ms.assignee_ids).await?; - if repo.workspace_id != ws.id { - return Err(AppError::BadRequest(format!( - "Repository {} does not belong to this workspace", - repo_id - ))); - } - - self.ensure_repo_readable(user_uid, &repo).await?; - } - - // Validate label_ids belong to repos in this workspace - for label_id in ¶ms.label_ids { - let label: Option<(Uuid,)> = sqlx::query_as( - "SELECT r.workspace_id FROM issue_label il \ - JOIN repo r ON r.id = il.repo_id \ - WHERE il.id = $1", - ) - .bind(label_id) - .fetch_optional(self.ctx.db.reader()) - .await - .map_err(AppError::Database)?; - - match label { - Some((workspace_id,)) if workspace_id == ws.id => {} - Some(_) => { - return Err(AppError::BadRequest(format!( - "Label {} does not belong to this workspace", - label_id - ))); - } - None => return Err(AppError::NotFound(format!("Label {} not found", label_id))), - } - } - - // Validate assignee_ids are workspace members - for assignee_id in ¶ms.assignee_ids { - let is_member: bool = sqlx::query_scalar( - "SELECT EXISTS(SELECT 1 FROM workspace_member \ - WHERE workspace_id = $1 AND user_id = $2 AND status = 'active')", - ) - .bind(ws.id) - .bind(assignee_id) - .fetch_one(self.ctx.db.reader()) - .await - .map_err(AppError::Database)?; - - if !is_member { - return Err(AppError::BadRequest(format!( - "User {} is not a member of this workspace", - assignee_id - ))); - } - } - - // Validate milestone_id belongs to a repo in this workspace if let Some(milestone_id) = params.milestone_id { - let milestone: Option<(Uuid,)> = sqlx::query_as( - "SELECT r.workspace_id FROM issue_milestone im \ - JOIN repo r ON r.id = im.repo_id \ - WHERE im.id = $1", - ) - .bind(milestone_id) - .fetch_optional(self.ctx.db.reader()) - .await - .map_err(AppError::Database)?; - - match milestone { - Some((workspace_id,)) if workspace_id == ws.id => {} - Some(_) => { - return Err(AppError::BadRequest( - "Milestone does not belong to this workspace".into(), - )); - } - None => return Err(AppError::NotFound("Milestone not found".into())), - } + self.validate_issue_milestone(ws.id, milestone_id).await?; } let priority = match params.priority { @@ -240,8 +166,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -297,50 +222,15 @@ impl IssueService { .await .map_err(AppError::Database)?; - for repo_id in ¶ms.repo_ids { - sqlx::query( - "INSERT INTO issue_repo_relation (id, issue_id, repo_id, relation_type, created_by, created_at) \ - VALUES ($1, $2, $3, 'references', $4, $5)", - ) - .bind(Uuid::now_v7()) - .bind(issue_id) - .bind(repo_id) - .bind(user_uid) - .bind(now) - .execute(&mut *txn) - .await - .map_err(AppError::Database)?; - } - - for label_id in ¶ms.label_ids { - sqlx::query( - "INSERT INTO issue_label_relation (id, issue_id, label_id, created_by, created_at) \ - VALUES ($1, $2, $3, $4, $5)", - ) - .bind(Uuid::now_v7()) - .bind(issue_id) - .bind(label_id) - .bind(user_uid) - .bind(now) - .execute(&mut *txn) - .await - .map_err(AppError::Database)?; - } - - for assignee_id in ¶ms.assignee_ids { - sqlx::query( - "INSERT INTO issue_assignee (id, issue_id, assignee_id, assigned_by, created_at) \ - VALUES ($1, $2, $3, $4, $5)", - ) - .bind(Uuid::now_v7()) - .bind(issue_id) - .bind(assignee_id) - .bind(user_uid) - .bind(now) - .execute(&mut *txn) - .await - .map_err(AppError::Database)?; - } + self.insert_issue_repo_relations( + &mut txn, issue_id, ¶ms.repo_ids, user_uid, now, + ).await?; + self.insert_issue_label_relations( + &mut txn, issue_id, ¶ms.label_ids, user_uid, now, + ).await?; + self.insert_issue_assignees( + &mut txn, issue_id, ¶ms.assignee_ids, user_uid, now, + ).await?; sqlx::query( "UPDATE workspace_stats SET issues_count = issues_count + 1, updated_at = $1 \ @@ -407,8 +297,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -456,8 +345,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -501,8 +389,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -545,8 +432,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -597,8 +483,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -649,8 +534,7 @@ impl IssueService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -864,4 +748,217 @@ impl IssueService { Err(AppError::Unauthorized) } } + + async fn validate_issue_repos( + &self, + user_uid: Uuid, + ws: &Workspace, + repo_ids: &[Uuid], + ) -> Result<(), AppError> { + use crate::models::repos::Repo; + if repo_ids.is_empty() { + return Ok(()); + } + + let repos = Repo::find_by_ids(self.ctx.db.reader(), repo_ids) + .await + .map_err(AppError::Database)?; + + if repos.len() != repo_ids.len() { + let found: std::collections::HashSet<_> = repos.iter().map(|r| r.id).collect(); + for id in repo_ids { + if !found.contains(id) { + return Err(AppError::NotFound(format!("Repository {id} not found"))); + } + } + } + + for repo in &repos { + if repo.workspace_id != ws.id { + return Err(AppError::BadRequest(format!( + "Repository {} does not belong to this workspace", + repo.id + ))); + } + self.ensure_repo_readable(user_uid, repo).await?; + } + + Ok(()) + } + + async fn validate_issue_labels( + &self, + workspace_id: Uuid, + label_ids: &[Uuid], + ) -> Result<(), AppError> { + if label_ids.is_empty() { + return Ok(()); + } + + let rows: Vec<(Uuid, Uuid)> = sqlx::query_as( + "SELECT il.id, r.workspace_id \ + FROM issue_label il JOIN repo r ON r.id = il.repo_id \ + WHERE il.id = ANY($1)", + ) + .bind(label_ids) + .fetch_all(self.ctx.db.reader()) + .await + .map_err(AppError::Database)?; + + let found: std::collections::HashSet = rows.iter().map(|(id, _)| *id).collect(); + for id in label_ids { + if !found.contains(id) { + return Err(AppError::NotFound(format!("Label {id} not found"))); + } + } + + for (id, ws_id) in &rows { + if *ws_id != workspace_id { + return Err(AppError::BadRequest(format!( + "Label {id} does not belong to this workspace" + ))); + } + } + + Ok(()) + } + + async fn validate_issue_assignees( + &self, + workspace_id: Uuid, + assignee_ids: &[Uuid], + ) -> Result<(), AppError> { + if assignee_ids.is_empty() { + return Ok(()); + } + + let member_ids: Vec = sqlx::query_scalar( + "SELECT user_id FROM workspace_member \ + WHERE workspace_id = $1 AND user_id = ANY($2) AND status = 'active'", + ) + .bind(workspace_id) + .bind(assignee_ids) + .fetch_all(self.ctx.db.reader()) + .await + .map_err(AppError::Database)?; + + let found: std::collections::HashSet = member_ids.into_iter().collect(); + for id in assignee_ids { + if !found.contains(id) { + return Err(AppError::BadRequest(format!( + "User {id} is not a member of this workspace" + ))); + } + } + + Ok(()) + } + + async fn validate_issue_milestone( + &self, + workspace_id: Uuid, + milestone_id: Uuid, + ) -> Result<(), AppError> { + let milestone: Option<(Uuid,)> = sqlx::query_as( + "SELECT r.workspace_id FROM issue_milestone im \ + JOIN repo r ON r.id = im.repo_id \ + WHERE im.id = $1", + ) + .bind(milestone_id) + .fetch_optional(self.ctx.db.reader()) + .await + .map_err(AppError::Database)?; + + match milestone { + Some((ws_id,)) if ws_id == workspace_id => Ok(()), + Some(_) => Err(AppError::BadRequest( + "Milestone does not belong to this workspace".into(), + )), + None => Err(AppError::NotFound("Milestone not found".into())), + } + } + + async fn insert_issue_repo_relations( + &self, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, + issue_id: Uuid, + repo_ids: &[Uuid], + user_uid: Uuid, + now: chrono::DateTime, + ) -> Result<(), AppError> { + if repo_ids.is_empty() { + return Ok(()); + } + let ids: Vec = repo_ids.iter().map(|_| Uuid::now_v7()).collect(); + sqlx::query( + "INSERT INTO issue_repo_relation (id, issue_id, repo_id, relation_type, created_by, created_at) \ + SELECT t.id, $2, t.repo_id, 'references', $3, $4 \ + FROM unnest($1::uuid[], $5::uuid[]) AS t(id, repo_id)", + ) + .bind(&ids) + .bind(issue_id) + .bind(user_uid) + .bind(now) + .bind(repo_ids) + .execute(&mut **txn) + .await + .map_err(AppError::Database)?; + Ok(()) + } + + async fn insert_issue_label_relations( + &self, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, + issue_id: Uuid, + label_ids: &[Uuid], + user_uid: Uuid, + now: chrono::DateTime, + ) -> Result<(), AppError> { + if label_ids.is_empty() { + return Ok(()); + } + let ids: Vec = label_ids.iter().map(|_| Uuid::now_v7()).collect(); + sqlx::query( + "INSERT INTO issue_label_relation (id, issue_id, label_id, created_by, created_at) \ + SELECT t.id, $2, t.label_id, $3, $4 \ + FROM unnest($1::uuid[], $5::uuid[]) AS t(id, label_id)", + ) + .bind(&ids) + .bind(issue_id) + .bind(user_uid) + .bind(now) + .bind(label_ids) + .execute(&mut **txn) + .await + .map_err(AppError::Database)?; + Ok(()) + } + + async fn insert_issue_assignees( + &self, + txn: &mut sqlx::Transaction<'_, sqlx::Postgres>, + issue_id: Uuid, + assignee_ids: &[Uuid], + user_uid: Uuid, + now: chrono::DateTime, + ) -> Result<(), AppError> { + if assignee_ids.is_empty() { + return Ok(()); + } + let ids: Vec = assignee_ids.iter().map(|_| Uuid::now_v7()).collect(); + sqlx::query( + "INSERT INTO issue_assignee (id, issue_id, assignee_id, assigned_by, created_at) \ + SELECT t.id, $2, t.assignee_id, $3, $4 \ + FROM unnest($1::uuid[], $5::uuid[]) AS t(id, assignee_id)", + ) + .bind(&ids) + .bind(issue_id) + .bind(user_uid) + .bind(now) + .bind(assignee_ids) + .execute(&mut **txn) + .await + .map_err(AppError::Database)?; + Ok(()) + } }