Files
appks/service/pr/core.rs
T
zhenyi 6205a6de0a refactor(models): replace hardcoded strings with typed enums
- Add ReviewState enum (pending, approved, changes_requested, etc.)
- Add DEFAULT_REVISION constant for git HEAD references
- service/pr/reviews.rs: use ReviewState for review creation and
  submission state validation
- service/pr/core.rs: use MergeStrategyKind for merge strategy
  selection
- service/im/stages.rs: use StagePrivacyLevel for stage creation
- service/im/invitations.rs: use Role enum for invitation role
  defaults
2026-06-10 18:49:06 +08:00

1082 lines
39 KiB
Rust

use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::AppError;
use crate::models::common::{MergeStrategyKind, Role, State};
use crate::models::prs::PullRequest;
use crate::models::repos::Repo;
use crate::models::workspaces::Workspace;
use crate::service::PrService;
use crate::session::Session;
use super::util::{clamp_limit_offset, ensure_affected, merge_optional_text, set_local_user_id};
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct CreatePrParams {
pub title: String,
pub body: Option<String>,
pub source_repo_id: Uuid,
pub source_branch: String,
pub target_branch: String,
pub head_commit_sha: String,
pub base_commit_sha: Option<String>,
pub draft: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct UpdatePrParams {
pub title: Option<String>,
pub body: Option<String>,
pub target_branch: Option<String>,
pub draft: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct MergePrParams {
pub strategy: Option<String>,
pub squash_title: Option<String>,
pub squash_message: Option<String>,
pub delete_source_branch: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct PrListFilters {
pub state: Option<String>,
pub author_id: Option<Uuid>,
pub draft: Option<bool>,
}
impl PrService {
pub async fn pr_list(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
filters: PrListFilters,
limit: i64,
offset: i64,
) -> Result<Vec<PullRequest>, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo = self.resolve_repo(wk_name, repo_name).await?;
self.ensure_repo_readable(user_uid, &repo).await?;
let (limit, offset) = clamp_limit_offset(limit, offset);
let state = filters
.state
.as_deref()
.and_then(|s| s.parse::<State>().ok())
.filter(|s| *s != State::Unknown);
sqlx::query_as::<_, PullRequest>(
"SELECT id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at \
FROM pull_request WHERE repo_id = $1 AND deleted_at IS NULL \
AND ($2::text IS NULL OR state::text = $2) \
AND ($3::uuid IS NULL OR author_id = $3) \
AND ($4::bool IS NULL OR draft = $4) \
ORDER BY number DESC LIMIT $5 OFFSET $6",
)
.bind(repo.id)
.bind(state.map(|s| s.to_string()))
.bind(filters.author_id)
.bind(filters.draft)
.bind(limit)
.bind(offset)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
pub async fn pr_get(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
let repo = Repo::find_by_id(self.ctx.db.reader(), pr.repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("repo not found".into()))?;
self.ensure_repo_readable(user_uid, &repo).await?;
Ok(pr)
}
#[tracing::instrument(skip(self, ctx, params), fields(title = %params.title))]
pub async fn pr_create(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
params: CreatePrParams,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo = self.resolve_repo(wk_name, repo_name).await?;
self.ensure_repo_role_at_least(user_uid, &repo, Role::Member)
.await?;
let title = crate::service::util::required_text(params.title, "title")?;
// Validate source repo exists and is readable
let source_repo = Repo::find_by_id(self.ctx.db.reader(), params.source_repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("Source repository not found".into()))?;
self.ensure_repo_readable(user_uid, &source_repo).await?;
// Validate target branch exists
if !self.branch_exists(&repo, &params.target_branch).await? {
return Err(AppError::BadRequest(format!(
"Target branch '{}' does not exist",
params.target_branch
)));
}
// Validate source branch exists
if !self
.branch_exists(&source_repo, &params.source_branch)
.await?
{
return Err(AppError::BadRequest(format!(
"Source branch '{}' does not exist",
params.source_branch
)));
}
// Validate head commit exists in source repo
if !self
.commit_exists(&source_repo, &params.head_commit_sha)
.await?
{
return Err(AppError::BadRequest(format!(
"Head commit '{}' does not exist",
params.head_commit_sha
)));
}
// For cross-repo PRs, validate fork relationship
if source_repo.id != repo.id && source_repo.forked_from_repo_id != Some(repo.id) {
return Err(AppError::BadRequest(
"Source repository is not a fork of the target repository".into(),
));
}
let now = chrono::Utc::now();
let pr_id = Uuid::now_v7();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let number = PullRequest::next_number(&mut *txn, repo.id)
.await
.map_err(AppError::Database)?;
let pr = sqlx::query_as::<_, PullRequest>(
"INSERT INTO pull_request (id, repo_id, author_id, number, title, body, state, \
source_repo_id, source_branch, target_repo_id, target_branch, base_commit_sha, \
head_commit_sha, merge_commit_sha, draft, locked, merged_by, merged_at, \
closed_by, closed_at, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, 'open', $7, $8, $9, $10, $11, $12, NULL, $13, false, \
NULL, NULL, NULL, NULL, $14, $14) \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(pr_id)
.bind(repo.id)
.bind(user_uid)
.bind(number)
.bind(&title)
.bind(params.body.as_deref())
.bind(params.source_repo_id)
.bind(&params.source_branch)
.bind(repo.id)
.bind(&params.target_branch)
.bind(params.base_commit_sha.as_deref())
.bind(&params.head_commit_sha)
.bind(params.draft.unwrap_or(false))
.bind(now)
.fetch_one(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO pr_status (pull_request_id, head_commit_sha, checks_state, mergeable_state, \
conflicts, approvals_count, requested_reviews_count, changed_files_count, \
additions_count, deletions_count, updated_at) \
VALUES ($1, $2, 'pending', 'unknown', false, 0, 0, 0, 0, 0, $3)",
)
.bind(pr_id)
.bind(&params.head_commit_sha)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO pr_subscription (id, pull_request_id, user_id, reason, muted, created_at, updated_at) \
VALUES ($1, $2, $3, 'author', false, $4, $4)",
)
.bind(Uuid::now_v7())
.bind(pr_id)
.bind(user_uid)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"UPDATE repo_stats SET open_pull_requests_count = open_pull_requests_count + 1, updated_at = $1 WHERE repo_id = $2",
)
.bind(now)
.bind(repo.id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
tracing::info!(pr_id = %pr_id, number = number, "Pull request created");
Ok(pr)
}
pub async fn pr_update(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
params: UpdatePrParams,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_editable(user_uid, &pr).await?;
let title =
merge_optional_text(params.title, Some(pr.title.clone())).unwrap_or(pr.title.clone());
let body = merge_optional_text(params.body, pr.body.clone());
let target_branch = params.target_branch.unwrap_or(pr.target_branch.clone());
let draft = params.draft.unwrap_or(pr.draft);
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET title = $1, body = $2, target_branch = $3, draft = $4, updated_at = $5 \
WHERE id = $6 AND deleted_at IS NULL \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(&title).bind(&body).bind(&target_branch).bind(draft).bind(now).bind(pr.id)
.fetch_one(&mut *txn).await.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(result)
}
pub async fn pr_mark_ready(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_editable(user_uid, &pr).await?;
if !pr.draft {
return Err(AppError::BadRequest(
"PR is already ready for review".into(),
));
}
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET draft = false, updated_at = $1 \
WHERE id = $2 AND deleted_at IS NULL \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(now)
.bind(pr.id)
.fetch_one(&mut *txn)
.await
.map_err(AppError::Database)?;
self.create_pr_event(
pr.id,
Some(user_uid),
crate::models::common::EventType::DraftReady,
Some(serde_json::json!({"draft": true})),
Some(serde_json::json!({"draft": false})),
None,
)
.await?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(result)
}
pub async fn pr_close(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_editable(user_uid, &pr).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET state = 'closed', closed_by = $1, closed_at = $2, updated_at = $2 \
WHERE id = $3 AND deleted_at IS NULL AND state = 'open' \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(user_uid).bind(now).bind(pr.id)
.fetch_optional(&mut *txn).await.map_err(AppError::Database)?
.ok_or(AppError::NotFound("PR not found or already closed".into()))?;
sqlx::query("UPDATE repo_stats SET open_pull_requests_count = GREATEST(open_pull_requests_count - 1, 0), updated_at = $1 WHERE repo_id = $2")
.bind(now).bind(pr.repo_id).execute(&mut *txn).await.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(result)
}
pub async fn pr_reopen(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_editable(user_uid, &pr).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET state = 'open', closed_by = NULL, closed_at = NULL, updated_at = $1 \
WHERE id = $2 AND deleted_at IS NULL AND state = 'closed' AND merged_at IS NULL \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(now).bind(pr.id)
.fetch_optional(&mut *txn).await.map_err(AppError::Database)?
.ok_or(AppError::NotFound("PR not found, not closed, or already merged".into()))?;
sqlx::query("UPDATE repo_stats SET open_pull_requests_count = open_pull_requests_count + 1, updated_at = $1 WHERE repo_id = $2")
.bind(now).bind(pr.repo_id).execute(&mut *txn).await.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(result)
}
pub async fn pr_delete(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<(), AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_admin(user_uid, &pr).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query("UPDATE pull_request SET deleted_at = $1, updated_at = $1 WHERE id = $2 AND deleted_at IS NULL")
.bind(now).bind(pr.id).execute(&mut *txn).await.map_err(AppError::Database)?;
ensure_affected(result.rows_affected(), "PR not found")?;
if pr.state == State::Open {
sqlx::query("UPDATE repo_stats SET open_pull_requests_count = GREATEST(open_pull_requests_count - 1, 0), updated_at = $1 WHERE repo_id = $2")
.bind(now).bind(pr.repo_id).execute(&mut *txn).await.map_err(AppError::Database)?;
}
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(())
}
pub async fn pr_lock(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
locked: bool,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
self.ensure_pr_editable(user_uid, &pr).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET locked = $1, updated_at = $2 WHERE id = $3 AND deleted_at IS NULL \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(locked).bind(now).bind(pr.id)
.fetch_one(&mut *txn).await.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(result)
}
pub async fn pr_merge(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
number: i64,
params: MergePrParams,
) -> Result<PullRequest, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let pr = self.resolve_pr(wk_name, repo_name, number).await?;
let repo = Repo::find_by_id(self.ctx.db.reader(), pr.repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("Repo not found".into()))?;
// Require at least Maintainer role for merge (Admin for protected branches)
let user_role = self
.ensure_repo_role_at_least(user_uid, &repo, Role::Maintainer)
.await?;
if pr.state != State::Open {
return Err(AppError::BadRequest("PR is not open".into()));
}
if pr.draft {
return Err(AppError::BadRequest("cannot merge a draft PR".into()));
}
// Check branch protection rules
let ws = self.resolve_workspace(wk_name).await?;
let protection_rule = self
.check_branch_protection(&repo, &pr.target_branch, &ws)
.await?;
if let Some(rule) = &protection_rule {
// Admins can bypass protection rules, others must satisfy them
if crate::service::util::role_level(user_role)
< crate::service::util::role_level(Role::Admin)
{
// Check approval count; author self-approval must not satisfy protection.
let approval_count: i64 = sqlx::query_scalar(
"SELECT COUNT(*) FROM pr_review \
WHERE pull_request_id = $1 AND state = 'approved' AND dismissed_at IS NULL \
AND submitted_at IS NOT NULL AND author_id <> $2",
)
.bind(pr.id)
.bind(pr.author_id)
.fetch_one(self.ctx.db.reader())
.await
.map_err(AppError::Database)?;
if approval_count < rule.require_approvals as i64 {
return Err(AppError::BadRequest(format!(
"PR requires {} approvals, only {} received",
rule.require_approvals, approval_count
)));
}
// Check required status checks
if rule.require_status_checks && !rule.required_status_checks.is_empty() {
let required_checks = &rule.required_status_checks;
// Get all check runs for the head commit
let passed_checks: Vec<String> = sqlx::query_scalar(
"SELECT name FROM pr_check_run \
WHERE pull_request_id = $1 AND commit_sha = $2 AND status = 'success'",
)
.bind(pr.id)
.bind(&pr.head_commit_sha)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)?;
for required_check in required_checks {
if !passed_checks.contains(required_check) {
return Err(AppError::BadRequest(format!(
"Required status check '{}' has not passed",
required_check
)));
}
}
}
// Prevent self-merge unless explicitly allowed
if rule.require_approvals > 0 && pr.author_id == user_uid {
return Err(AppError::BadRequest(
"Cannot self-merge PRs that require approvals".to_string(),
));
}
}
}
// Perform actual Git merge via RPC
let merge_result = self.perform_git_merge(&repo, &pr, &ws, &params).await?;
let merge_commit_sha = merge_result.commit_sha;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query_as::<_, PullRequest>(
"UPDATE pull_request SET state = 'merged', merged_by = $1, merged_at = $2, \
merge_commit_sha = $3, closed_by = $1, closed_at = $2, updated_at = $2 \
WHERE id = $4 AND deleted_at IS NULL AND state = 'open' \
RETURNING id, repo_id, author_id, number, title, body, state, source_repo_id, \
source_branch, target_repo_id, target_branch, base_commit_sha, head_commit_sha, \
merge_commit_sha, draft, locked, merged_by, merged_at, closed_by, closed_at, \
created_at, updated_at, deleted_at",
)
.bind(user_uid)
.bind(now)
.bind(&merge_commit_sha)
.bind(pr.id)
.fetch_optional(&mut *txn)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("PR not found or already merged".into()))?;
sqlx::query("UPDATE repo_stats SET open_pull_requests_count = GREATEST(open_pull_requests_count - 1, 0), updated_at = $1 WHERE repo_id = $2")
.bind(now).bind(pr.repo_id).execute(&mut *txn).await.map_err(AppError::Database)?;
// Delete source branch if requested
if params.delete_source_branch.unwrap_or(false) {
// Only delete if source and target are in the same repo
if pr.source_repo_id == pr.target_repo_id {
let _ = self.delete_git_branch(&repo, &ws, &pr.source_branch).await;
let _ = sqlx::query("DELETE FROM repo_branch WHERE repo_id = $1 AND name = $2")
.bind(pr.source_repo_id)
.bind(&pr.source_branch)
.execute(&mut *txn)
.await;
}
}
txn.commit().await.map_err(|_| AppError::TxnError)?;
tracing::info!(pr_id = %pr.id, number = number, merge_sha = %merge_commit_sha, "Pull request merged");
Ok(result)
}
pub(crate) async fn resolve_workspace(&self, wk_name: &str) -> Result<Workspace, AppError> {
Workspace::find_by_name(self.ctx.db.reader(), wk_name)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("workspace not found".into()))
}
pub(crate) async fn resolve_repo(
&self,
wk_name: &str,
repo_name: &str,
) -> Result<Repo, AppError> {
let ws = self.resolve_workspace(wk_name).await?;
Repo::find_by_name(self.ctx.db.reader(), ws.id, repo_name)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("repo not found".into()))
}
pub(crate) async fn resolve_pr(
&self,
wk_name: &str,
repo_name: &str,
number: i64,
) -> Result<PullRequest, AppError> {
let repo = self.resolve_repo(wk_name, repo_name).await?;
PullRequest::find_by_number(self.ctx.db.reader(), repo.id, number)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("PR not found".into()))
}
pub(crate) async fn ensure_repo_readable(
&self,
user_uid: Uuid,
repo: &Repo,
) -> Result<(), AppError> {
if Repo::is_readable(self.ctx.db.reader(), repo, user_uid)
.await
.map_err(AppError::Database)?
{
Ok(())
} else {
Err(AppError::Unauthorized)
}
}
pub(crate) async fn ensure_repo_role_at_least(
&self,
user_uid: Uuid,
repo: &Repo,
min_role: Role,
) -> Result<Role, AppError> {
let role = Repo::user_role(self.ctx.db.reader(), repo.id, user_uid, repo.owner_id)
.await
.map_err(AppError::Database)?
.unwrap_or(Role::Unknown);
if crate::service::util::role_level(role) < crate::service::util::role_level(min_role) {
return Err(AppError::Unauthorized);
}
Ok(role)
}
pub(crate) async fn ensure_pr_readable(
&self,
user_uid: Uuid,
pr: &PullRequest,
) -> Result<(), AppError> {
let repo = Repo::find_by_id(self.ctx.db.reader(), pr.repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("repo not found".into()))?;
self.ensure_repo_readable(user_uid, &repo).await
}
pub(crate) async fn ensure_pr_editable(
&self,
user_uid: Uuid,
pr: &PullRequest,
) -> Result<(), AppError> {
if pr.author_id == user_uid {
return Ok(());
}
let repo = Repo::find_by_id(self.ctx.db.reader(), pr.repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("repo not found".into()))?;
let role = Repo::user_role(self.ctx.db.reader(), repo.id, user_uid, repo.owner_id)
.await
.map_err(AppError::Database)?
.unwrap_or(Role::Unknown);
if crate::service::util::role_level(role) >= crate::service::util::role_level(Role::Member)
{
return Ok(());
}
Err(AppError::Unauthorized)
}
pub(crate) async fn ensure_pr_admin(
&self,
user_uid: Uuid,
pr: &PullRequest,
) -> Result<(), AppError> {
let repo = Repo::find_by_id(self.ctx.db.reader(), pr.repo_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("repo not found".into()))?;
let role = Repo::user_role(self.ctx.db.reader(), repo.id, user_uid, repo.owner_id)
.await
.map_err(AppError::Database)?
.unwrap_or(Role::Unknown);
if crate::service::util::role_level(role) >= crate::service::util::role_level(Role::Admin) {
return Ok(());
}
Err(AppError::Unauthorized)
}
/// Check branch protection rules for a given branch
async fn check_branch_protection(
&self,
repo: &Repo,
branch_name: &str,
_ws: &Workspace,
) -> Result<Option<crate::models::repos::BranchProtectionRule>, AppError> {
use crate::models::repos::BranchProtectionRule;
let rules: Vec<BranchProtectionRule> = sqlx::query_as(
"SELECT id, repo_id, pattern, require_approvals, require_status_checks, \
required_status_checks, require_linear_history, allow_force_pushes, allow_deletions, \
require_signed_commits, require_code_owner_review, dismiss_stale_reviews, \
restrict_pushes, push_allowances, restrict_review_dismissal, dismissal_allowances, \
require_conversation_resolution, created_by, created_at, updated_at \
FROM branch_protection_rule WHERE repo_id = $1 ORDER BY pattern ASC",
)
.bind(repo.id)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)?;
Ok(rules
.into_iter()
.find(|rule| glob_match(&rule.pattern, branch_name)))
}
/// Check if a branch exists in the repository
async fn branch_exists(&self, repo: &Repo, branch_name: &str) -> Result<bool, AppError> {
use crate::pb::repo::{self as pb};
let ws = self.resolve_workspace_by_repo(repo).await?;
let git_client = self
.ctx
.registry
.get_git_client(&repo.primary_storage_node_id)
.ok_or(AppError::Config("Git client not available".into()))?;
let header = pb::RepositoryHeader {
storage_name: ws.name.clone(),
relative_path: format!("{}.git", repo.name),
storage_path: repo.storage_path.clone(),
};
let request = pb::GetBranchRequest {
repository: Some(header),
name: branch_name.to_string(),
};
let mut client = git_client;
match client.branch.get_branch(tonic::Request::new(request)).await {
Ok(_) => Ok(true),
Err(status) if status.code() == tonic::Code::NotFound => Ok(false),
Err(e) => Err(AppError::InternalServerError(format!(
"Failed to check branch: {}",
e
))),
}
}
/// Check if a commit exists in the repository
async fn commit_exists(&self, repo: &Repo, commit_sha: &str) -> Result<bool, AppError> {
use crate::pb::repo::{self as pb};
let ws = self.resolve_workspace_by_repo(repo).await?;
let git_client = self
.ctx
.registry
.get_git_client(&repo.primary_storage_node_id)
.ok_or(AppError::Config("Git client not available".into()))?;
let header = pb::RepositoryHeader {
storage_name: ws.name.clone(),
relative_path: format!("{}.git", repo.name),
storage_path: repo.storage_path.clone(),
};
let request = pb::GetCommitRequest {
repository: Some(header),
revision: Some(pb::ObjectSelector {
selector: Some(pb::object_selector::Selector::Revision(pb::ObjectName {
revision: commit_sha.to_string(),
})),
}),
include_stats: false,
include_raw: false,
};
let mut client = git_client;
match client.commit.get_commit(tonic::Request::new(request)).await {
Ok(_) => Ok(true),
Err(status) if status.code() == tonic::Code::NotFound => Ok(false),
Err(e) => Err(AppError::InternalServerError(format!(
"Failed to check commit: {}",
e
))),
}
}
/// Get workspace for a repository
async fn resolve_workspace_by_repo(&self, repo: &Repo) -> Result<Workspace, AppError> {
sqlx::query_as::<_, Workspace>(
"SELECT id, owner_id, name, description, avatar_url, visibility, plan, status, \
default_role, is_personal, archived_at, created_at, updated_at, deleted_at \
FROM workspace WHERE id = $1 AND deleted_at IS NULL",
)
.bind(repo.workspace_id)
.fetch_optional(self.ctx.db.reader())
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("Workspace not found".into()))
}
/// Perform actual git merge via RPC
async fn perform_git_merge(
&self,
repo: &Repo,
pr: &PullRequest,
ws: &Workspace,
params: &MergePrParams,
) -> Result<MergeResult, AppError> {
use crate::pb::repo::{self as pb};
let mut git_client = self
.ctx
.registry
.get_git_client(&repo.primary_storage_node_id)
.ok_or(AppError::Config("Git client not available".into()))?;
let header = pb::RepositoryHeader {
storage_name: ws.name.clone(),
relative_path: format!("{}.git", repo.name),
storage_path: repo.storage_path.clone(),
};
// Determine merge strategy
let strategy = params
.strategy
.as_deref()
.and_then(|s| s.parse::<MergeStrategyKind>().ok())
.filter(|s| *s != MergeStrategyKind::Unknown)
.unwrap_or(MergeStrategyKind::Merge);
let merge_strategy = match strategy {
MergeStrategyKind::Squash => pb::merge_options::Strategy::MergeStrategyOrt as i32,
MergeStrategyKind::Rebase => pb::merge_options::Strategy::MergeStrategyRecursive as i32,
_ => pb::merge_options::Strategy::MergeStrategyOrt as i32,
};
let options = pb::MergeOptions {
strategy: merge_strategy,
fast_forward: if strategy == MergeStrategyKind::Rebase {
pb::merge_options::FastForwardMode::MergeFastForwardModeNoFf as i32
} else {
pb::merge_options::FastForwardMode::MergeFastForwardModeAllowed as i32
},
squash: strategy == MergeStrategyKind::Squash,
no_commit: false,
allow_unrelated_histories: false,
strategy_options: vec![],
};
let request = pb::MergeRequest {
repository: Some(header),
target_branch: pr.target_branch.clone(),
source: Some(pb::ObjectSelector {
selector: Some(pb::object_selector::Selector::Revision(pb::ObjectName {
revision: pr.head_commit_sha.clone(),
})),
}),
committer: None,
message: params.squash_message.clone().unwrap_or_else(|| {
if strategy == MergeStrategyKind::Squash {
params
.squash_title
.clone()
.unwrap_or_else(|| format!("Squash merge PR #{}", pr.number))
} else {
format!(
"Merge pull request #{} from {}",
pr.number, pr.source_branch
)
}
}),
options: Some(options),
};
let response = git_client
.merge
.merge(tonic::Request::new(request))
.await
.map_err(|e| AppError::InternalServerError(format!("Git merge failed: {}", e)))?;
let merge_response = response.into_inner();
// Check if merge was successful
let status = merge_response.status();
if status != pb::merge_result::Status::MergeResultStatusMerged
&& status != pb::merge_result::Status::MergeResultStatusFastForward
{
return Err(AppError::InternalServerError(format!(
"Git merge failed with status: {:?}",
status
)));
}
let commit_sha = merge_response
.commit
.and_then(|c| c.oid)
.and_then(|oid| oid.hex.into())
.ok_or(AppError::InternalServerError(
"Git merge did not return commit SHA".into(),
))?;
Ok(MergeResult {
commit_sha,
merged: true,
})
}
/// Delete a git branch via RPC
async fn delete_git_branch(
&self,
repo: &Repo,
ws: &Workspace,
branch_name: &str,
) -> Result<(), AppError> {
use crate::pb::repo::{self as pb};
let mut git_client = self
.ctx
.registry
.get_git_client(&repo.primary_storage_node_id)
.ok_or(AppError::Config("Git client not available".into()))?;
let header = pb::RepositoryHeader {
storage_name: ws.name.clone(),
relative_path: format!("{}.git", repo.name),
storage_path: repo.storage_path.clone(),
};
let request = pb::DeleteBranchRequest {
repository: Some(header),
name: branch_name.to_string(),
force: false,
};
git_client
.branch
.delete_branch(tonic::Request::new(request))
.await
.map_err(|e| {
AppError::InternalServerError(format!("Failed to delete branch: {}", e))
})?;
Ok(())
}
}
fn glob_match(pattern: &str, text: &str) -> bool {
if pattern == text || pattern == "*" {
return true;
}
let p: Vec<char> = pattern.chars().collect();
let t: Vec<char> = text.chars().collect();
let (mut pi, mut ti) = (0usize, 0usize);
let (mut star_pi, mut star_ti) = (None, None);
loop {
if pi < p.len() && ti < t.len() && (p[pi] == '?' || p[pi] == t[ti]) {
pi += 1;
ti += 1;
continue;
}
if pi < p.len() && p[pi] == '*' {
star_pi = Some(pi);
star_ti = Some(ti);
pi += 1;
continue;
}
if let (Some(sp), Some(st)) = (star_pi, star_ti)
&& st < t.len()
{
pi = sp + 1;
let next_ti = st + 1;
star_ti = Some(next_ti);
ti = next_ti;
continue;
}
return pi == p.len() && ti == t.len();
}
}
/// Result of a git merge operation
#[allow(dead_code)]
struct MergeResult {
commit_sha: String,
merged: bool,
}