use chrono::Utc; use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; use crate::models::common::Role; use crate::models::repos::{Repo, RepoFork}; use crate::service::RepoService; use crate::session::Session; use super::util::{clamp_limit_offset, set_local_user_id}; #[derive(Debug, Deserialize, Serialize, utoipa::ToSchema)] pub struct ForkRepoParams { pub target_workspace_name: Option, pub name: Option, } impl RepoService { pub async fn repo_forks( &self, ctx: &Session, wk_name: &str, repo_name: &str, limit: i64, offset: i64, ) -> Result, AppError> { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo = self.resolve_repo(wk_name, repo_name).await?; self.ensure_repo_readable(user_uid, &repo).await?; let (limit, offset) = clamp_limit_offset(limit, offset); sqlx::query_as::<_, RepoFork>( "SELECT id, parent_repo_id, fork_repo_id, forked_by, created_at \ FROM repo_fork WHERE parent_repo_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3", ) .bind(repo.id) .bind(limit) .bind(offset) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } #[tracing::instrument(skip(self, ctx, params))] pub async fn repo_fork( &self, ctx: &Session, wk_name: &str, repo_name: &str, params: ForkRepoParams, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let parent = self.resolve_repo(wk_name, repo_name).await?; self.ensure_repo_readable(user_uid, &parent).await?; let ws_name = params.target_workspace_name.as_deref().unwrap_or(wk_name); let ws = self.resolve_workspace(ws_name).await?; self.ensure_workspace_role_at_least(user_uid, &ws, Role::Member) .await?; let fork_name = params.name.as_deref().unwrap_or(repo_name); let existing = sqlx::query_scalar::<_, bool>( "SELECT EXISTS(SELECT 1 FROM repo WHERE workspace_id = $1 AND name = $2 AND deleted_at IS NULL)", ) .bind(ws.id).bind(fork_name) .fetch_one(self.ctx.db.reader()).await.map_err(AppError::Database)?; if existing { return Err(AppError::Conflict( "repo name already taken in target workspace".into(), )); } let now = Utc::now(); let fork_id = Uuid::now_v7(); let storage_path = format!("repos/{}/{}", ws.id, fork_id); let storage_node_ids = parent.storage_node_ids.clone(); let primary_node_id = parent.primary_storage_node_id; let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; let fork = sqlx::query_as::<_, Repo>( "INSERT INTO repo (id, workspace_id, owner_id, name, description, default_branch, \ visibility, status, is_fork, forked_from_repo_id, storage_node_ids, \ primary_storage_node_id, storage_path, git_service, created_at, updated_at) \ VALUES ($1, $2, $3, $4, $5, $6, 'private', 'active', true, $7, $8, $9, $10, $11, $12, $12) \ RETURNING id, workspace_id, owner_id, name, description, default_branch, visibility, status, \ is_fork, forked_from_repo_id, storage_node_ids, primary_storage_node_id, storage_path, \ git_service, archived_at, created_at, updated_at, deleted_at", ) .bind(fork_id).bind(ws.id).bind(user_uid) .bind(fork_name).bind(parent.description.as_deref()) .bind(&parent.default_branch).bind(parent.id) .bind(&storage_node_ids).bind(primary_node_id) .bind(&storage_path).bind(parent.git_service) .bind(now) .fetch_one(&mut *txn).await.map_err(AppError::Database)?; sqlx::query( "INSERT INTO repo_member (id, repo_id, user_id, role, status, joined_at, created_at, updated_at) \ VALUES ($1, $2, $3, 'owner', 'active', $4, $4, $4)", ) .bind(Uuid::now_v7()).bind(fork_id).bind(user_uid).bind(now) .execute(&mut *txn).await.map_err(AppError::Database)?; sqlx::query( "INSERT INTO repo_stats (repo_id, stars_count, watchers_count, forks_count, branches_count, \ tags_count, commits_count, releases_count, open_issues_count, open_pull_requests_count, \ size_bytes, updated_at) \ VALUES ($1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, $2)", ) .bind(fork_id).bind(now) .execute(&mut *txn).await.map_err(AppError::Database)?; sqlx::query( "INSERT INTO repo_branch (id, repo_id, name, commit_sha, protected, default_branch, created_by, created_at, updated_at) \ VALUES ($1, $2, $3, '', false, true, $4, $5, $5)", ) .bind(Uuid::now_v7()).bind(fork_id).bind(&parent.default_branch).bind(user_uid).bind(now) .execute(&mut *txn).await.map_err(AppError::Database)?; sqlx::query( "INSERT INTO repo_fork (id, parent_repo_id, fork_repo_id, forked_by, created_at) \ VALUES ($1, $2, $3, $4, $5)", ) .bind(Uuid::now_v7()) .bind(parent.id) .bind(fork_id) .bind(user_uid) .bind(now) .execute(&mut *txn) .await .map_err(AppError::Database)?; sqlx::query( "UPDATE repo_stats SET forks_count = forks_count + 1, updated_at = $1 WHERE repo_id = $2", ).bind(now).bind(parent.id).execute(&mut *txn).await.map_err(AppError::Database)?; sqlx::query( "UPDATE workspace_stats SET repos_count = repos_count + 1, updated_at = $1 WHERE workspace_id = $2", ).bind(now).bind(ws.id).execute(&mut *txn).await.map_err(AppError::Database)?; txn.commit().await.map_err(|_| AppError::TxnError)?; if let Some(mut client) = self.ctx.registry.get_git_client(&primary_node_id) { let parent_ws = self.resolve_workspace(wk_name).await?; let _header = crate::pb::repo::RepositoryHeader { storage_name: parent_ws.name.clone(), relative_path: format!("{}.git", parent.name), storage_path: parent.storage_path.clone(), }; let fork_header = crate::pb::repo::RepositoryHeader { storage_name: ws.name.clone(), relative_path: format!("{}.git", fork_name), storage_path: storage_path.clone(), }; let _ = client .repository .init_repository(tonic::Request::new( crate::pb::repo::InitRepositoryRequest { repository: Some(fork_header), bare: true, object_format: crate::pb::repo::ObjectFormat::Sha1 as i32, initial_branch: parent.default_branch.clone(), }, )) .await; } tracing::info!(fork_id = %fork_id, parent_id = %parent.id, "Repo forked"); Ok(fork) } pub async fn repo_sync_fork( &self, ctx: &Session, wk_name: &str, repo_name: &str, ) -> Result { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let fork = self.resolve_repo(wk_name, repo_name).await?; self.ensure_repo_role_at_least(user_uid, &fork, Role::Member) .await?; if !fork.is_fork { return Err(AppError::BadRequest("repo is not a fork".into())); } let parent_id = fork .forked_from_repo_id .ok_or(AppError::BadRequest("parent repo not found".into()))?; let parent = Repo::find_by_id(self.ctx.db.reader(), parent_id) .await .map_err(AppError::Database)? .ok_or(AppError::NotFound("parent repo not found".into()))?; let header = self.repo_header(&fork, &self.resolve_workspace(wk_name).await?); let parent_ws = self.find_ws_for_repo(&parent).await?; let _parent_header = self.repo_header(&parent, &parent_ws); let mut client = self.git_client(&fork)?; let result = client .merge .merge(tonic::Request::new(crate::pb::repo::MergeRequest { repository: Some(header), target_branch: fork.default_branch.clone(), source: Some(crate::pb::repo::ObjectSelector { selector: Some(crate::pb::repo::object_selector::Selector::Revision( crate::pb::repo::ObjectName { revision: parent.default_branch.clone(), }, )), }), committer: None, message: format!("Sync from upstream {}/{}", parent_ws.name, parent.name), options: None, })) .await .map_err(|e| AppError::InternalServerError(format!("sync failed: {e}")))?; let merge_result = result.into_inner(); if merge_result.status == crate::pb::repo::merge_result::Status::MergeResultStatusConflicts as i32 { return Err(AppError::Conflict( "sync failed: merge conflicts with upstream".into(), )); } Ok(fork) } pub async fn repo_delete_fork( &self, ctx: &Session, wk_name: &str, repo_name: &str, ) -> Result<(), AppError> { let user_uid = ctx.user().ok_or(AppError::Unauthorized)?; let repo = self.resolve_repo(wk_name, repo_name).await?; if !repo.is_fork { return Err(AppError::BadRequest("repo is not a fork".into())); } self.ensure_repo_role_at_least(user_uid, &repo, Role::Admin) .await?; let parent_id = repo .forked_from_repo_id .ok_or(AppError::BadRequest("parent repo not found".into()))?; let now = chrono::Utc::now(); let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; sqlx::query("DELETE FROM repo_fork WHERE fork_repo_id = $1") .bind(repo.id) .execute(&mut *txn) .await .map_err(AppError::Database)?; sqlx::query( "UPDATE repo SET deleted_at = $1, status = 'deleted', updated_at = $1 WHERE id = $2", ) .bind(now) .bind(repo.id) .execute(&mut *txn) .await .map_err(AppError::Database)?; sqlx::query( "UPDATE repo_stats SET forks_count = GREATEST(forks_count - 1, 0), updated_at = $1 WHERE repo_id = $2", ) .bind(now) .bind(parent_id) .execute(&mut *txn) .await .map_err(AppError::Database)?; txn.commit().await.map_err(|_| AppError::TxnError)?; Ok(()) } pub(crate) async fn find_ws_for_repo( &self, repo: &Repo, ) -> Result { sqlx::query_as::<_, crate::models::workspaces::Workspace>( "SELECT id, owner_id, name, description, avatar_url, visibility, plan, status, \ default_role, is_personal, archived_at, created_at, updated_at, deleted_at \ FROM workspace WHERE id = $1 AND deleted_at IS NULL", ) .bind(repo.workspace_id) .fetch_optional(self.ctx.db.reader()) .await .map_err(AppError::Database)? .ok_or(AppError::NotFound("workspace not found".into())) } }