use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; use crate::immediate::{ReactionAction, ReactionEvent}; use crate::models::channels::{MessageMention, MessageReaction}; use crate::service::ImService; use crate::service::im::events::ImEvent; use super::session::ImSession; use super::util::*; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct AddReactionParams { pub content: String, } #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct AddMentionParams { pub mentioned_user_id: Uuid, } impl ImService { pub async fn reaction_list( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, message_id: Uuid, ) -> Result, AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; sqlx::query_as::<_, MessageReaction>( "SELECT id, message_id, channel_id, user_id, content, created_at \ FROM message_reaction WHERE message_id = $1 AND channel_id = $2 \ ORDER BY created_at ASC", ) .bind(message_id) .bind(channel_id) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } pub async fn reaction_add( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, message_id: Uuid, params: AddReactionParams, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; self.resolve_message(message_id, channel_id).await?; let content = required_text(params.content, "content")?; let now = chrono::Utc::now(); let reaction = sqlx::query_as::<_, MessageReaction>( "INSERT INTO message_reaction (id, message_id, channel_id, user_id, content, created_at) \ VALUES ($1, $2, $3, $4, $5, $6) \ ON CONFLICT (message_id, user_id, content) DO NOTHING \ RETURNING id, message_id, channel_id, user_id, content, created_at", ) .bind(Uuid::now_v7()) .bind(message_id) .bind(channel_id) .bind(user_uid) .bind(&content) .bind(now) .fetch_optional(self.ctx.db.writer()) .await .map_err(AppError::Database)?; if reaction.is_none() { return Err(AppError::Conflict("reaction already exists".into())); } let reaction = reaction.unwrap(); let request_id = Uuid::nil(); let event = ReactionEvent { channel_id, message_id, user_id: reaction.user_id, action: ReactionAction::Added, content: Some(reaction.content.clone()), }; self.publish(&format!("im.reaction.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Reaction { request_id, data: event, }); Ok(reaction) } pub async fn reaction_remove( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, message_id: Uuid, content: &str, ) -> Result<(), AppError> { let user_uid = ctx.user; let result = sqlx::query( "DELETE FROM message_reaction \ WHERE message_id = $1 AND channel_id = $2 AND user_id = $3 AND content = $4", ) .bind(message_id) .bind(channel_id) .bind(user_uid) .bind(content) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; ensure_affected(result.rows_affected(), "reaction not found")?; let request_id = Uuid::nil(); let event = ReactionEvent { channel_id, message_id, user_id: user_uid, action: ReactionAction::Removed, content: Some(content.to_string()), }; self.publish(&format!("im.reaction.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Reaction { request_id, data: event, }); Ok(()) } pub async fn reaction_remove_all( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, message_id: Uuid, ) -> Result<(), AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; sqlx::query("DELETE FROM message_reaction WHERE message_id = $1 AND channel_id = $2") .bind(message_id) .bind(channel_id) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; let request_id = Uuid::nil(); let event = ReactionEvent { channel_id, message_id, user_id: user_uid, action: ReactionAction::Removed, content: None, }; self.publish(&format!("im.reaction.{}", channel_id), request_id, &event) .await; self.emit_event(ImEvent::Reaction { request_id, data: event, }); Ok(()) } pub async fn mention_list_for_user( &self, ctx: &ImSession, wk_name: &str, limit: i64, offset: i64, unread_only: bool, ) -> Result, AppError> { let user_uid = ctx.user; let _ = self.resolve_workspace(wk_name).await?; let (limit, offset) = clamp_limit_offset(limit, offset); if unread_only { sqlx::query_as::<_, MessageMention>( "SELECT id, message_id, channel_id, mentioned_user_id, mentioned_by, read_at, created_at \ FROM message_mention \ WHERE mentioned_user_id = $1 AND read_at IS NULL \ ORDER BY created_at DESC LIMIT $2 OFFSET $3", ) .bind(user_uid) .bind(limit) .bind(offset) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } else { sqlx::query_as::<_, MessageMention>( "SELECT id, message_id, channel_id, mentioned_user_id, mentioned_by, read_at, created_at \ FROM message_mention \ WHERE mentioned_user_id = $1 \ ORDER BY created_at DESC LIMIT $2 OFFSET $3", ) .bind(user_uid) .bind(limit) .bind(offset) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } } pub async fn mention_mark_read( &self, ctx: &ImSession, _wk_name: &str, mention_id: Uuid, ) -> Result<(), AppError> { let user_uid = ctx.user; let now = chrono::Utc::now(); let result = sqlx::query( "UPDATE message_mention SET read_at = $1 \ WHERE id = $2 AND mentioned_user_id = $3 AND read_at IS NULL", ) .bind(now) .bind(mention_id) .bind(user_uid) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; ensure_affected(result.rows_affected(), "mention not found or already read") } pub async fn mention_mark_all_read( &self, ctx: &ImSession, wk_name: &str, ) -> Result { let user_uid = ctx.user; let _ = self.resolve_workspace(wk_name).await?; let now = chrono::Utc::now(); let result = sqlx::query( "UPDATE message_mention SET read_at = $1 \ WHERE mentioned_user_id = $2 AND read_at IS NULL", ) .bind(now) .bind(user_uid) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; Ok(result.rows_affected()) } }