use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; use crate::service::im::events::{MemberAction, MemberEvent}; use crate::models::base_info::UserBaseInfo; use crate::models::channels::ChannelMember; use crate::models::common::Role; use crate::models::workspaces::Workspace; use crate::service::ImService; use crate::service::im::events::ImEvent; use super::session::ImSession; use super::util::*; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct InviteMemberParams { pub user_id: Uuid, pub role: Option, } #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct UpdateMemberParams { pub role: Option, pub muted: Option, pub pinned: Option, } impl ImService { pub async fn member_list( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, limit: i64, offset: i64, ) -> Result, AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; let (limit, offset) = clamp_limit_offset(limit, offset); sqlx::query_as::<_, ChannelMember>( "SELECT id, channel_id, user_id, role, status, muted, pinned, \ last_read_message_id, last_read_at, joined_at, left_at, created_at, updated_at \ FROM channel_member WHERE channel_id = $1 AND status = 'active' \ ORDER BY joined_at ASC LIMIT $2 OFFSET $3", ) .bind(channel_id) .bind(limit) .bind(offset) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } pub async fn member_invite( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, params: InviteMemberParams, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_editable(user_uid, &channel).await?; let ws = Workspace::find_by_id(self.ctx.db.reader(), channel.workspace_id) .await .map_err(AppError::Database)? .ok_or(AppError::NotFound("workspace not found".into()))?; let ws_role = Workspace::user_role(self.ctx.db.reader(), ws.id, params.user_id, ws.owner_id) .await .map_err(AppError::Database)?; if ws_role == Some(Role::Unknown) || ws_role.is_none() { return Err(AppError::BadRequest( "invited user is not a workspace member".into(), )); } let is_already = self.is_channel_member(channel_id, params.user_id).await?; if is_already { return Err(AppError::Conflict("user is already a member".into())); } let role = parse_enum(params.role, Role::Member, Role::Unknown, "role")?; let now = chrono::Utc::now(); let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; let member = sqlx::query_as::<_, ChannelMember>( "INSERT INTO channel_member \ (id, channel_id, user_id, role, status, muted, pinned, joined_at, created_at, updated_at) \ VALUES ($1, $2, $3, $4, 'active', false, false, $5, $5, $5) \ RETURNING id, channel_id, user_id, role, status, muted, pinned, \ last_read_message_id, last_read_at, joined_at, left_at, created_at, updated_at", ) .bind(Uuid::now_v7()) .bind(channel_id) .bind(params.user_id) .bind(role) .bind(now) .fetch_one(&mut *txn) .await .map_err(AppError::Database)?; self.increment_channel_stat(channel_id, 1, now, &mut txn) .await?; txn.commit().await.map_err(|_| AppError::TxnError)?; tracing::info!(channel_id = %channel_id, user_id = %params.user_id, "Member invited"); let request_id = Uuid::nil(); let event = MemberEvent { channel_id, user: UserBaseInfo::placeholder(member.user_id), user_id: member.user_id, action: MemberAction::Joined, }; self.publish(&format!("im.member.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Member { request_id, data: event, }); Ok(member) } pub async fn member_update( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, member_user_id: Uuid, params: UpdateMemberParams, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; let role = match params.role { Some(ref v) => parse_enum(Some(v.clone()), Role::Member, Role::Unknown, "role")?, None => { // Fetch current role sqlx::query_scalar::<_, String>( "SELECT role::text FROM channel_member \ WHERE channel_id = $1 AND user_id = $2 AND status = 'active'", ) .bind(channel_id) .bind(member_user_id) .fetch_optional(self.ctx.db.reader()) .await .map_err(AppError::Database)? .map(|s| s.parse::().unwrap_or(Role::Member)) .unwrap_or(Role::Member) } }; let now = chrono::Utc::now(); let member = sqlx::query_as::<_, ChannelMember>( "UPDATE channel_member SET role = $1, muted = COALESCE($2, muted), \ pinned = COALESCE($3, pinned), updated_at = $4 \ WHERE channel_id = $5 AND user_id = $6 AND status = 'active' \ RETURNING id, channel_id, user_id, role, status, muted, pinned, \ last_read_message_id, last_read_at, joined_at, left_at, created_at, updated_at", ) .bind(role) .bind(params.muted) .bind(params.pinned) .bind(now) .bind(channel_id) .bind(member_user_id) .fetch_one(self.ctx.db.writer()) .await .map_err(AppError::Database)?; let request_id = Uuid::nil(); let event = MemberEvent { channel_id, user: UserBaseInfo::placeholder(member.user_id), user_id: member.user_id, action: MemberAction::Updated, }; self.publish(&format!("im.member.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Member { request_id, data: event, }); Ok(member) } pub async fn member_kick( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, member_user_id: Uuid, ) -> Result<(), AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; if member_user_id == channel.created_by { return Err(AppError::Forbidden("cannot kick channel owner".into())); } let ws = Workspace::find_by_id(self.ctx.db.reader(), channel.workspace_id) .await .map_err(AppError::Database)? .ok_or(AppError::NotFound("workspace not found".into()))?; if member_user_id == ws.owner_id { return Err(AppError::Forbidden("cannot kick workspace owner".into())); } let now = chrono::Utc::now(); let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; let result = sqlx::query( "UPDATE channel_member SET status = 'inactive', left_at = $1, updated_at = $1 \ WHERE channel_id = $2 AND user_id = $3 AND status = 'active'", ) .bind(now) .bind(channel_id) .bind(member_user_id) .execute(&mut *txn) .await .map_err(AppError::Database)?; ensure_affected(result.rows_affected(), "member not found")?; self.increment_channel_stat(channel_id, -1, now, &mut txn) .await?; txn.commit().await.map_err(|_| AppError::TxnError)?; tracing::info!(channel_id = %channel_id, user_id = %member_user_id, "Member kicked"); let request_id = Uuid::nil(); let event = MemberEvent { channel_id, user: UserBaseInfo::placeholder(member_user_id), user_id: member_user_id, action: MemberAction::Kicked, }; self.publish(&format!("im.member.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Member { request_id, data: event, }); Ok(()) } pub async fn member_leave( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, ) -> Result<(), AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; if channel.created_by == user_uid { return Err(AppError::Forbidden("channel owner cannot leave".into())); } let now = chrono::Utc::now(); let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; let result = sqlx::query( "UPDATE channel_member SET status = 'inactive', left_at = $1, updated_at = $1 \ WHERE channel_id = $2 AND user_id = $3 AND status = 'active'", ) .bind(now) .bind(channel_id) .bind(user_uid) .execute(&mut *txn) .await .map_err(AppError::Database)?; ensure_affected(result.rows_affected(), "not a member")?; self.increment_channel_stat(channel_id, -1, now, &mut txn) .await?; txn.commit().await.map_err(|_| AppError::TxnError)?; let request_id = Uuid::nil(); let event = MemberEvent { channel_id, user: UserBaseInfo::placeholder(user_uid), user_id: user_uid, action: MemberAction::Left, }; self.publish(&format!("im.member.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Member { request_id, data: event, }); Ok(()) } pub async fn member_join( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; let is_already = self.is_channel_member(channel_id, user_uid).await?; if is_already { return Err(AppError::Conflict("already a member".into())); } let now = chrono::Utc::now(); let mut txn = self .ctx .db .writer() .begin() .await .map_err(|_| AppError::TxnError)?; sqlx::query(set_local_user_id(user_uid)) .execute(&mut *txn) .await .map_err(AppError::Database)?; let member = sqlx::query_as::<_, ChannelMember>( "INSERT INTO channel_member \ (id, channel_id, user_id, role, status, muted, pinned, joined_at, created_at, updated_at) \ VALUES ($1, $2, $3, 'member', 'active', false, false, $4, $4, $4) \ RETURNING id, channel_id, user_id, role, status, muted, pinned, \ last_read_message_id, last_read_at, joined_at, left_at, created_at, updated_at", ) .bind(Uuid::now_v7()) .bind(channel_id) .bind(user_uid) .bind(now) .fetch_one(&mut *txn) .await .map_err(AppError::Database)?; self.increment_channel_stat(channel_id, 1, now, &mut txn) .await?; txn.commit().await.map_err(|_| AppError::TxnError)?; let request_id = Uuid::nil(); let event = MemberEvent { channel_id, user: UserBaseInfo::placeholder(member.user_id), user_id: member.user_id, action: MemberAction::Joined, }; self.publish(&format!("im.member.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Member { request_id, data: event, }); Ok(member) } }