Files
gitks/service/repo/fork.rs
T
zhenyi 420dedbc1e feat(service): expand service layer with new domain operations
- Add IM service modules: audit, channel roles, custom emojis, forum
  tags, integrations, invitations, repo links, slash commands, stages,
  voice, webhooks
- Add PR service modules: review requests, templates
- Add repo service modules: contributors, release assets, git extras
  (archive, branch rename, commit extras, diff/merge, tag, tree)
- Add user service: social (follow/block)
- Add internal auth service
- Update existing service modules with expanded functionality
- Remove deleted IM modules: articles, delivery trace, drafts,
  follows, messages, polls, presence, reactions, threads
2026-06-10 18:49:32 +08:00

317 lines
12 KiB
Rust

use chrono::Utc;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::AppError;
use crate::models::common::Role;
use crate::models::repos::{Repo, RepoFork};
use crate::service::RepoService;
use crate::session::Session;
use super::util::{clamp_limit_offset, set_local_user_id};
#[derive(Debug, Deserialize, Serialize, utoipa::ToSchema)]
pub struct ForkRepoParams {
pub target_workspace_name: Option<String>,
pub name: Option<String>,
}
impl RepoService {
pub async fn repo_forks(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
limit: i64,
offset: i64,
) -> Result<Vec<RepoFork>, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo = self.resolve_repo(wk_name, repo_name).await?;
self.ensure_repo_readable(user_uid, &repo).await?;
let (limit, offset) = clamp_limit_offset(limit, offset);
sqlx::query_as::<_, RepoFork>(
"SELECT id, parent_repo_id, fork_repo_id, forked_by, created_at \
FROM repo_fork WHERE parent_repo_id = $1 ORDER BY created_at DESC LIMIT $2 OFFSET $3",
)
.bind(repo.id)
.bind(limit)
.bind(offset)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
#[tracing::instrument(skip(self, ctx, params))]
pub async fn repo_fork(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
params: ForkRepoParams,
) -> Result<Repo, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let parent = self.resolve_repo(wk_name, repo_name).await?;
self.ensure_repo_readable(user_uid, &parent).await?;
let ws_name = params.target_workspace_name.as_deref().unwrap_or(wk_name);
let ws = self.resolve_workspace(ws_name).await?;
self.ensure_workspace_role_at_least(user_uid, &ws, Role::Member)
.await?;
let fork_name = params.name.as_deref().unwrap_or(repo_name);
let existing = sqlx::query_scalar::<_, bool>(
"SELECT EXISTS(SELECT 1 FROM repo WHERE workspace_id = $1 AND name = $2 AND deleted_at IS NULL)",
)
.bind(ws.id).bind(fork_name)
.fetch_one(self.ctx.db.reader()).await.map_err(AppError::Database)?;
if existing {
return Err(AppError::Conflict(
"repo name already taken in target workspace".into(),
));
}
let now = Utc::now();
let fork_id = Uuid::now_v7();
let storage_path = format!("repos/{}/{}", ws.id, fork_id);
let storage_node_ids = parent.storage_node_ids.clone();
let primary_node_id = parent.primary_storage_node_id;
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let fork = sqlx::query_as::<_, Repo>(
"INSERT INTO repo (id, workspace_id, owner_id, name, description, default_branch, \
visibility, status, is_fork, forked_from_repo_id, storage_node_ids, \
primary_storage_node_id, storage_path, git_service, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, 'private', 'active', true, $7, $8, $9, $10, $11, $12, $12) \
RETURNING id, workspace_id, owner_id, name, description, default_branch, visibility, status, \
is_fork, forked_from_repo_id, storage_node_ids, primary_storage_node_id, storage_path, \
git_service, archived_at, created_at, updated_at, deleted_at",
)
.bind(fork_id).bind(ws.id).bind(user_uid)
.bind(fork_name).bind(parent.description.as_deref())
.bind(&parent.default_branch).bind(parent.id)
.bind(&storage_node_ids).bind(primary_node_id)
.bind(&storage_path).bind(parent.git_service)
.bind(now)
.fetch_one(&mut *txn).await.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO repo_member (id, repo_id, user_id, role, status, joined_at, created_at, updated_at) \
VALUES ($1, $2, $3, 'owner', 'active', $4, $4, $4)",
)
.bind(Uuid::now_v7()).bind(fork_id).bind(user_uid).bind(now)
.execute(&mut *txn).await.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO repo_stats (repo_id, stars_count, watchers_count, forks_count, branches_count, \
tags_count, commits_count, releases_count, open_issues_count, open_pull_requests_count, \
size_bytes, updated_at) \
VALUES ($1, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, $2)",
)
.bind(fork_id).bind(now)
.execute(&mut *txn).await.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO repo_branch (id, repo_id, name, commit_sha, protected, default_branch, created_by, created_at, updated_at) \
VALUES ($1, $2, $3, '', false, true, $4, $5, $5)",
)
.bind(Uuid::now_v7()).bind(fork_id).bind(&parent.default_branch).bind(user_uid).bind(now)
.execute(&mut *txn).await.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO repo_fork (id, parent_repo_id, fork_repo_id, forked_by, created_at) \
VALUES ($1, $2, $3, $4, $5)",
)
.bind(Uuid::now_v7())
.bind(parent.id)
.bind(fork_id)
.bind(user_uid)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"UPDATE repo_stats SET forks_count = forks_count + 1, updated_at = $1 WHERE repo_id = $2",
).bind(now).bind(parent.id).execute(&mut *txn).await.map_err(AppError::Database)?;
sqlx::query(
"UPDATE workspace_stats SET repos_count = repos_count + 1, updated_at = $1 WHERE workspace_id = $2",
).bind(now).bind(ws.id).execute(&mut *txn).await.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
if let Some(mut client) = self.ctx.registry.get_git_client(&primary_node_id) {
let parent_ws = self.resolve_workspace(wk_name).await?;
let _header = crate::pb::repo::RepositoryHeader {
storage_name: parent_ws.name.clone(),
relative_path: format!("{}.git", parent.name),
storage_path: parent.storage_path.clone(),
};
let fork_header = crate::pb::repo::RepositoryHeader {
storage_name: ws.name.clone(),
relative_path: format!("{}.git", fork_name),
storage_path: storage_path.clone(),
};
let _ = client
.repository
.init_repository(tonic::Request::new(
crate::pb::repo::InitRepositoryRequest {
repository: Some(fork_header),
bare: true,
object_format: crate::pb::repo::ObjectFormat::Sha1 as i32,
initial_branch: parent.default_branch.clone(),
},
))
.await;
}
tracing::info!(fork_id = %fork_id, parent_id = %parent.id, "Repo forked");
Ok(fork)
}
pub async fn repo_sync_fork(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<Repo, AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let fork = self.resolve_repo(wk_name, repo_name).await?;
self.ensure_repo_role_at_least(user_uid, &fork, Role::Member)
.await?;
if !fork.is_fork {
return Err(AppError::BadRequest("repo is not a fork".into()));
}
let parent_id = fork
.forked_from_repo_id
.ok_or(AppError::BadRequest("parent repo not found".into()))?;
let parent = Repo::find_by_id(self.ctx.db.reader(), parent_id)
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("parent repo not found".into()))?;
let header = self.repo_header(&fork, &self.resolve_workspace(wk_name).await?);
let parent_ws = self.find_ws_for_repo(&parent).await?;
let _parent_header = self.repo_header(&parent, &parent_ws);
let mut client = self.git_client(&fork)?;
let result = client
.merge
.merge(tonic::Request::new(crate::pb::repo::MergeRequest {
repository: Some(header),
target_branch: fork.default_branch.clone(),
source: Some(crate::pb::repo::ObjectSelector {
selector: Some(crate::pb::repo::object_selector::Selector::Revision(
crate::pb::repo::ObjectName {
revision: parent.default_branch.clone(),
},
)),
}),
committer: None,
message: format!("Sync from upstream {}/{}", parent_ws.name, parent.name),
options: None,
}))
.await
.map_err(|e| AppError::InternalServerError(format!("sync failed: {e}")))?;
let merge_result = result.into_inner();
if merge_result.status
== crate::pb::repo::merge_result::Status::MergeResultStatusConflicts as i32
{
return Err(AppError::Conflict(
"sync failed: merge conflicts with upstream".into(),
));
}
Ok(fork)
}
pub async fn repo_delete_fork(
&self,
ctx: &Session,
wk_name: &str,
repo_name: &str,
) -> Result<(), AppError> {
let user_uid = ctx.user().ok_or(AppError::Unauthorized)?;
let repo = self.resolve_repo(wk_name, repo_name).await?;
if !repo.is_fork {
return Err(AppError::BadRequest("repo is not a fork".into()));
}
self.ensure_repo_role_at_least(user_uid, &repo, Role::Admin)
.await?;
let parent_id = repo
.forked_from_repo_id
.ok_or(AppError::BadRequest("parent repo not found".into()))?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query(set_local_user_id(user_uid))
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query("DELETE FROM repo_fork WHERE fork_repo_id = $1")
.bind(repo.id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"UPDATE repo SET deleted_at = $1, status = 'deleted', updated_at = $1 WHERE id = $2",
)
.bind(now)
.bind(repo.id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"UPDATE repo_stats SET forks_count = GREATEST(forks_count - 1, 0), updated_at = $1 WHERE repo_id = $2",
)
.bind(now)
.bind(parent_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
Ok(())
}
pub(crate) async fn find_ws_for_repo(
&self,
repo: &Repo,
) -> Result<crate::models::workspaces::Workspace, AppError> {
sqlx::query_as::<_, crate::models::workspaces::Workspace>(
"SELECT id, owner_id, name, description, avatar_url, visibility, plan, status, \
default_role, is_personal, archived_at, created_at, updated_at, deleted_at \
FROM workspace WHERE id = $1 AND deleted_at IS NULL",
)
.bind(repo.workspace_id)
.fetch_optional(self.ctx.db.reader())
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("workspace not found".into()))
}
}