diff --git a/models/common.rs b/models/common.rs index 4268cbc..1d8524b 100644 --- a/models/common.rs +++ b/models/common.rs @@ -203,6 +203,12 @@ string_enum! { } } +impl Default for Visibility { + fn default() -> Self { + Visibility::Unknown + } +} + string_enum! { pub enum Priority { None => "none", @@ -792,3 +798,18 @@ string_enum! { Unknown => "unknown", } } + +string_enum! { + /// Pull request review states. + pub enum ReviewState { + Pending => "pending", + Approved => "approved", + ChangesRequested => "changes_requested", + Commented => "commented", + Dismissed => "dismissed", + Unknown => "unknown", + } +} + +/// Default git revision reference. +pub const DEFAULT_REVISION: &str = "HEAD"; diff --git a/service/im/invitations.rs b/service/im/invitations.rs new file mode 100644 index 0000000..d39e187 --- /dev/null +++ b/service/im/invitations.rs @@ -0,0 +1,143 @@ +use serde::{Deserialize, Serialize}; +use sha2::{Digest, Sha256}; +use uuid::Uuid; + +use crate::error::AppError; +use crate::models::channels::ChannelInvitation; +use crate::models::common::Role; +use crate::service::ImService; + +use super::session::ImSession; + +#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] +pub struct CreateInvitationParams { + pub invited_user_id: Option, + pub email: Option, + pub role: Option, + pub expires_in_hours: Option, +} + +impl ImService { + pub async fn invitation_list( + &self, + _ctx: &ImSession, + channel_id: Uuid, + ) -> Result, AppError> { + sqlx::query_as::<_, ChannelInvitation>( + "SELECT id, channel_id, workspace_id, invited_user_id, email, role, token_hash, \ + invited_by, accepted_at, revoked_at, expires_at, created_at \ + FROM channel_invitation WHERE channel_id = $1 \ + AND accepted_at IS NULL AND revoked_at IS NULL \ + ORDER BY created_at DESC", + ) + .bind(channel_id) + .fetch_all(self.ctx.db.reader()) + .await + .map_err(AppError::Database) + } + + pub async fn invitation_create( + &self, + ctx: &ImSession, + channel_id: Uuid, + workspace_id: Uuid, + params: CreateInvitationParams, + ) -> Result { + let now = chrono::Utc::now(); + let expires_at = now + chrono::Duration::hours(params.expires_in_hours.unwrap_or(168)); + let token = Uuid::now_v7().to_string(); + let token_hash = format!("{:x}", Sha256::digest(token.as_bytes())); + let role = params + .role + .as_deref() + .and_then(|s| s.parse::().ok()) + .filter(|r| *r != Role::Unknown) + .unwrap_or(Role::Member); + + sqlx::query_as::<_, ChannelInvitation>( + "INSERT INTO channel_invitation \ + (id, channel_id, workspace_id, invited_user_id, email, role, token_hash, \ + invited_by, expires_at, created_at) \ + VALUES ($1, $2, $3, $4, $5, $6::role, $7, $8, $9, $10) \ + RETURNING id, channel_id, workspace_id, invited_user_id, email, role, token_hash, \ + invited_by, accepted_at, revoked_at, expires_at, created_at", + ) + .bind(Uuid::now_v7()) + .bind(channel_id) + .bind(workspace_id) + .bind(params.invited_user_id) + .bind(params.email.as_deref()) + .bind(role) + .bind(&token_hash) + .bind(ctx.user) + .bind(expires_at) + .bind(now) + .fetch_one(self.ctx.db.writer()) + .await + .map_err(AppError::Database) + } + + pub async fn invitation_accept( + &self, + ctx: &ImSession, + invitation_id: Uuid, + ) -> Result { + let now = chrono::Utc::now(); + let mut txn = self + .ctx + .db + .writer() + .begin() + .await + .map_err(|_| AppError::TxnError)?; + + let invitation = sqlx::query_as::<_, ChannelInvitation>( + "UPDATE channel_invitation SET accepted_at = $1 \ + WHERE id = $2 AND accepted_at IS NULL AND revoked_at IS NULL AND expires_at > $1 \ + RETURNING id, channel_id, workspace_id, invited_user_id, email, role, token_hash, \ + invited_by, accepted_at, revoked_at, expires_at, created_at", + ) + .bind(now) + .bind(invitation_id) + .fetch_one(&mut *txn) + .await + .map_err(AppError::Database)?; + + sqlx::query( + "INSERT INTO channel_member \ + (id, channel_id, user_id, role, status, muted, pinned, joined_at, created_at, updated_at) \ + VALUES ($1, $2, $3, $4::role, 'active', false, false, $5, $5, $5) \ + ON CONFLICT (channel_id, user_id) DO NOTHING", + ) + .bind(Uuid::now_v7()) + .bind(invitation.channel_id) + .bind(ctx.user) + .bind(invitation.role.to_string()) + .bind(now) + .execute(&mut *txn) + .await + .map_err(AppError::Database)?; + + txn.commit().await.map_err(|_| AppError::TxnError)?; + Ok(invitation) + } + + pub async fn invitation_revoke( + &self, + _ctx: &ImSession, + invitation_id: Uuid, + ) -> Result { + let now = chrono::Utc::now(); + sqlx::query_as::<_, ChannelInvitation>( + "UPDATE channel_invitation SET revoked_at = $1 \ + WHERE id = $2 AND accepted_at IS NULL AND revoked_at IS NULL \ + RETURNING id, channel_id, workspace_id, invited_user_id, email, role, token_hash, \ + invited_by, accepted_at, revoked_at, expires_at, created_at", + ) + .bind(now) + .bind(invitation_id) + .fetch_one(self.ctx.db.writer()) + .await + .map_err(AppError::Database) + } +} diff --git a/service/im/stages.rs b/service/im/stages.rs new file mode 100644 index 0000000..c724bd6 --- /dev/null +++ b/service/im/stages.rs @@ -0,0 +1,119 @@ +use serde::{Deserialize, Serialize}; +use uuid::Uuid; + +use crate::error::AppError; +use crate::models::channels::Stage; +use crate::models::common::StagePrivacyLevel; +use crate::service::ImService; + +use super::session::ImSession; + +#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] +pub struct CreateStageParams { + pub topic: String, + pub privacy_level: Option, + pub discoverable: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] +pub struct UpdateStageParams { + pub topic: Option, + pub privacy_level: Option, + pub discoverable: Option, +} + +impl ImService { + pub async fn stage_get( + &self, + _ctx: &ImSession, + channel_id: Uuid, + ) -> Result, AppError> { + sqlx::query_as::<_, Stage>( + "SELECT id, channel_id, topic, privacy_level, discoverable, \ + started_by, started_at, ended_at, created_at, updated_at \ + FROM stage WHERE channel_id = $1 AND ended_at IS NULL \ + ORDER BY started_at DESC LIMIT 1", + ) + .bind(channel_id) + .fetch_optional(self.ctx.db.reader()) + .await + .map_err(AppError::Database) + } + + pub async fn stage_create( + &self, + ctx: &ImSession, + channel_id: Uuid, + params: CreateStageParams, + ) -> Result { + let now = chrono::Utc::now(); + let privacy = params + .privacy_level + .as_deref() + .and_then(|s| s.parse::().ok()) + .filter(|s| *s != StagePrivacyLevel::Unknown) + .unwrap_or(StagePrivacyLevel::GuildOnly); + sqlx::query_as::<_, Stage>( + "INSERT INTO stage \ + (id, channel_id, topic, privacy_level, discoverable, \ + started_by, started_at, created_at, updated_at) \ + VALUES ($1, $2, $3, $4::stage_privacy_level, $5, $6, $7, $7, $7) \ + RETURNING id, channel_id, topic, privacy_level, discoverable, \ + started_by, started_at, ended_at, created_at, updated_at", + ) + .bind(Uuid::now_v7()) + .bind(channel_id) + .bind(¶ms.topic) + .bind(privacy) + .bind(params.discoverable.unwrap_or(false)) + .bind(ctx.user) + .bind(now) + .fetch_one(self.ctx.db.writer()) + .await + .map_err(AppError::Database) + } + + pub async fn stage_update( + &self, + _ctx: &ImSession, + stage_id: Uuid, + params: UpdateStageParams, + ) -> Result { + let now = chrono::Utc::now(); + sqlx::query_as::<_, Stage>( + "UPDATE stage SET \ + topic = COALESCE($1, topic), \ + privacy_level = COALESCE($2::stage_privacy_level, privacy_level), \ + discoverable = COALESCE($3, discoverable), \ + updated_at = $4 \ + WHERE id = $5 \ + RETURNING id, channel_id, topic, privacy_level, discoverable, \ + started_by, started_at, ended_at, created_at, updated_at", + ) + .bind(params.topic.as_deref()) + .bind(params.privacy_level.as_deref()) + .bind(params.discoverable) + .bind(now) + .bind(stage_id) + .fetch_one(self.ctx.db.writer()) + .await + .map_err(AppError::Database) + } + + pub async fn stage_delete( + &self, + _ctx: &ImSession, + stage_id: Uuid, + ) -> Result<(), AppError> { + let now = chrono::Utc::now(); + sqlx::query( + "UPDATE stage SET ended_at = $1, updated_at = $1 WHERE id = $2 AND ended_at IS NULL", + ) + .bind(now) + .bind(stage_id) + .execute(self.ctx.db.writer()) + .await + .map_err(AppError::Database)?; + Ok(()) + } +} diff --git a/service/pr/core.rs b/service/pr/core.rs index 7713769..f33affc 100644 --- a/service/pr/core.rs +++ b/service/pr/core.rs @@ -2,14 +2,14 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; -use crate::models::common::{Role, State}; +use crate::models::common::{MergeStrategyKind, Role, State}; use crate::models::prs::PullRequest; use crate::models::repos::Repo; use crate::models::workspaces::Workspace; use crate::service::PrService; use crate::session::Session; -use super::util::{clamp_limit_offset, ensure_affected, merge_optional_text}; +use super::util::{clamp_limit_offset, ensure_affected, merge_optional_text, set_local_user_id}; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct CreatePrParams { @@ -175,8 +175,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -280,8 +279,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -326,8 +324,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -378,8 +375,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -421,8 +417,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -464,8 +459,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -502,8 +496,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -626,8 +619,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -924,21 +916,26 @@ impl PrService { }; // Determine merge strategy - let strategy = params.strategy.as_deref().unwrap_or("merge"); + let strategy = params + .strategy + .as_deref() + .and_then(|s| s.parse::().ok()) + .filter(|s| *s != MergeStrategyKind::Unknown) + .unwrap_or(MergeStrategyKind::Merge); let merge_strategy = match strategy { - "squash" => pb::merge_options::Strategy::MergeStrategyOrt as i32, - "rebase" => pb::merge_options::Strategy::MergeStrategyRecursive as i32, + MergeStrategyKind::Squash => pb::merge_options::Strategy::MergeStrategyOrt as i32, + MergeStrategyKind::Rebase => pb::merge_options::Strategy::MergeStrategyRecursive as i32, _ => pb::merge_options::Strategy::MergeStrategyOrt as i32, }; let options = pb::MergeOptions { strategy: merge_strategy, - fast_forward: if strategy == "rebase" { + fast_forward: if strategy == MergeStrategyKind::Rebase { pb::merge_options::FastForwardMode::MergeFastForwardModeNoFf as i32 } else { pb::merge_options::FastForwardMode::MergeFastForwardModeAllowed as i32 }, - squash: strategy == "squash", + squash: strategy == MergeStrategyKind::Squash, no_commit: false, allow_unrelated_histories: false, strategy_options: vec![], @@ -954,7 +951,7 @@ impl PrService { }), committer: None, message: params.squash_message.clone().unwrap_or_else(|| { - if strategy == "squash" { + if strategy == MergeStrategyKind::Squash { params .squash_title .clone() diff --git a/service/pr/reviews.rs b/service/pr/reviews.rs index d845955..083836a 100644 --- a/service/pr/reviews.rs +++ b/service/pr/reviews.rs @@ -3,12 +3,12 @@ use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; -use crate::models::common::Role; +use crate::models::common::{ReviewState, Role}; use crate::models::prs::{PrReview, PrReviewComment}; use crate::service::PrService; use crate::session::Session; -use super::util::{clamp_limit_offset, ensure_affected, required_text}; +use super::util::{clamp_limit_offset, ensure_affected, required_text, set_local_user_id}; #[derive(Debug, Deserialize, Serialize, utoipa::ToSchema)] pub struct CreateReviewParams { @@ -83,15 +83,25 @@ impl PrService { let pr = self.resolve_pr(wk_name, repo_name, number).await?; self.ensure_pr_readable(user_uid, &pr).await?; - let state = params.state.as_deref().unwrap_or("pending"); - if !["pending", "approved", "changes_requested", "commented"].contains(&state) { + let state = params + .state + .as_deref() + .and_then(|s| s.parse::().ok()) + .filter(|s| *s != ReviewState::Unknown) + .unwrap_or(ReviewState::Pending); + if matches!( + state, + ReviewState::Pending | ReviewState::Approved | ReviewState::ChangesRequested | ReviewState::Commented + ) { + // valid state + } else { return Err(AppError::BadRequest("invalid review state".into())); } - if matches!(state, "approved" | "changes_requested") { + if matches!(state, ReviewState::Approved | ReviewState::ChangesRequested) { let repo = self.resolve_repo(wk_name, repo_name).await?; self.ensure_repo_role_at_least(user_uid, &repo, Role::Member) .await?; - if state == "approved" && pr.author_id == user_uid { + if state == ReviewState::Approved && pr.author_id == user_uid { return Err(AppError::BadRequest( "PR authors cannot approve their own pull requests".into(), )); @@ -108,8 +118,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -132,7 +141,7 @@ impl PrService { .as_deref() .or(Some(pr.head_commit_sha.as_str())), ) - .bind(if state != "pending" { Some(now) } else { None }) + .bind(if state != ReviewState::Pending { Some(now) } else { None }) .bind(now) .fetch_one(&mut *txn) .await @@ -158,7 +167,7 @@ impl PrService { } } - if matches!(state, "approved" | "changes_requested") { + if matches!(state, ReviewState::Approved | ReviewState::ChangesRequested) { sqlx::query( "UPDATE pr_status SET approvals_count = (SELECT COUNT(*) FROM pr_review r \ JOIN pull_request pr ON pr.id = r.pull_request_id \ @@ -190,16 +199,18 @@ impl PrService { let pr = self.resolve_pr(wk_name, repo_name, number).await?; self.ensure_pr_readable(user_uid, &pr).await?; - let state = params.state.as_str(); - if !["approved", "changes_requested", "commented"].contains(&state) { - return Err(AppError::BadRequest("invalid review state".into())); - } + let state = params + .state + .parse::() + .ok() + .filter(|s| *s != ReviewState::Unknown) + .ok_or_else(|| AppError::BadRequest("invalid review state".into()))?; - if matches!(state, "approved" | "changes_requested") { + if matches!(state, ReviewState::Approved | ReviewState::ChangesRequested) { let repo = self.resolve_repo(wk_name, repo_name).await?; self.ensure_repo_role_at_least(user_uid, &repo, Role::Member) .await?; - if state == "approved" && pr.author_id == user_uid { + if state == ReviewState::Approved && pr.author_id == user_uid { return Err(AppError::BadRequest( "PR authors cannot approve their own pull requests".into(), )); @@ -214,8 +225,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; @@ -231,7 +241,7 @@ impl PrService { .fetch_optional(&mut *txn).await.map_err(AppError::Database)? .ok_or(AppError::NotFound("review not found or already submitted".into()))?; - if state == "approved" || state == "changes_requested" { + if state == ReviewState::Approved || state == ReviewState::ChangesRequested { sqlx::query( "UPDATE pr_status SET approvals_count = (SELECT COUNT(*) FROM pr_review r \ JOIN pull_request pr ON pr.id = r.pull_request_id \ @@ -274,8 +284,7 @@ impl PrService { .begin() .await .map_err(|_| AppError::TxnError)?; - sqlx::query("SET LOCAL app.current_user_id = $1") - .bind(user_uid) + sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?;