use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; use crate::immediate::{PresenceEvent, TypingEvent}; use crate::models::common::PresenceStatus; use crate::models::users::UserPresence; use crate::service::ImService; use crate::service::im::events::ImEvent; use super::session::ImSession; use super::util::*; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct UpdatePresenceParams { pub status: String, pub custom_status_text: Option, pub custom_status_emoji: Option, } #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct TypingParams { pub channel_id: Uuid, pub thread_id: Option, } impl ImService { pub async fn presence_update( &self, ctx: &ImSession, wk_name: &str, params: UpdatePresenceParams, ) -> Result { let user_uid = ctx.user; let _ = self.resolve_workspace(wk_name).await?; let status = parse_enum( Some(params.status), PresenceStatus::Online, PresenceStatus::Unknown, "status", )?; let now = chrono::Utc::now(); let presence = sqlx::query_as::<_, UserPresence>( "INSERT INTO user_presence (id, user_id, status, custom_status_text, custom_status_emoji, \ last_active_at, created_at, updated_at) \ VALUES ($1, $2, $3, $4, $5, $6, $6, $6) \ ON CONFLICT (user_id) DO UPDATE SET \ status = $3, custom_status_text = $4, custom_status_emoji = $5, \ last_active_at = $6, updated_at = $6 \ RETURNING id, user_id, status, custom_status_text, custom_status_emoji, \ device_type, ip_address, last_active_at, last_seen_at, created_at, updated_at", ) .bind(Uuid::now_v7()) .bind(user_uid) .bind(status) .bind(params.custom_status_text.as_deref()) .bind(params.custom_status_emoji.as_deref()) .bind(now) .fetch_one(self.ctx.db.writer()) .await .map_err(AppError::Database)?; // Cache in Redis for fast lookup let key = format!("{PRESENCE_PREFIX}{user_uid}"); if let Ok(mut conn) = self.ctx.redis.get_connection() { let _ = redis::cmd("SETEX") .arg(&key) .arg(PRESENCE_TTL_SECS as u64) .arg(status.to_string()) .query::<()>(&mut *conn.inner_mut()); } let request_id = Uuid::nil(); let event = PresenceEvent { user_id: user_uid, status: presence.status.to_string(), custom_status_text: presence.custom_status_text.clone(), custom_status_emoji: presence.custom_status_emoji.clone(), }; self.publish(&format!("im.presence.{}", user_uid), request_id, &event) .await; self.emit_event(ImEvent::Presence { request_id, data: event, }); Ok(presence) } pub async fn presence_get( &self, ctx: &ImSession, wk_name: &str, user_id: Uuid, ) -> Result, AppError> { let user_uid = ctx.user; let ws = self.resolve_workspace(wk_name).await?; self.ensure_workspace_readable(user_uid, &ws).await?; // Try DB first (has full record) if let Some(p) = sqlx::query_as::<_, UserPresence>( "SELECT id, user_id, status, custom_status_text, custom_status_emoji, \ device_type, ip_address, last_active_at, last_seen_at, created_at, updated_at \ FROM user_presence WHERE user_id = $1", ) .bind(user_id) .fetch_optional(self.ctx.db.reader()) .await .map_err(AppError::Database)? { return Ok(Some(p)); } // Fallback: check Redis for a cached status let key = format!("{PRESENCE_PREFIX}{user_id}"); if let Ok(mut conn) = self.ctx.redis.get_connection() { let cached: Option = redis::cmd("GET") .arg(&key) .query(&mut *conn.inner_mut()) .ok() .flatten(); if let Some(status_str) = cached && let Ok(status) = status_str.parse::() { let now = chrono::Utc::now(); return Ok(Some(UserPresence { id: Uuid::nil(), user_id, status, custom_status_text: None, custom_status_emoji: None, device_type: None, ip_address: None, last_active_at: now, last_seen_at: None, created_at: now, updated_at: now, })); } } Ok(None) } pub async fn presence_heartbeat(&self, ctx: &ImSession, wk_name: &str) -> Result<(), AppError> { let user_uid = ctx.user; let ws = self.resolve_workspace(wk_name).await?; self.ensure_workspace_readable(user_uid, &ws).await?; let key = format!("{PRESENCE_PREFIX}{user_uid}"); if let Ok(mut conn) = self.ctx.redis.get_connection() && let Err(e) = redis::cmd("SETEX") .arg(&key) .arg(PRESENCE_TTL_SECS as u64) .arg("online") .query::<()>(&mut *conn.inner_mut()) { tracing::warn!(error = %e, "redis presence heartbeat failed"); } let now = chrono::Utc::now(); if let Err(e) = sqlx::query( "UPDATE user_presence SET last_active_at = $1, updated_at = $1 WHERE user_id = $2", ) .bind(now) .bind(user_uid) .execute(self.ctx.db.writer()) .await { tracing::warn!(error = %e, "db presence heartbeat failed"); } Ok(()) } pub async fn typing_start( &self, ctx: &ImSession, wk_name: &str, params: TypingParams, ) -> Result<(), AppError> { let user_uid = ctx.user; let ws = self.resolve_workspace(wk_name).await?; self.ensure_workspace_readable(user_uid, &ws).await?; let channel = self.resolve_channel(params.channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; let key = typing_key(params.channel_id, params.thread_id, user_uid); let mut conn = self.ctx.redis.get_connection()?; redis::cmd("SETEX") .arg(&key) .arg(TYPING_TTL_SECS as u64) .arg("1") .query::<()>(&mut *conn.inner_mut())?; let request_id = Uuid::nil(); let event = TypingEvent { channel_id: params.channel_id, thread_id: params.thread_id, user_id: user_uid, }; self.publish( &format!("im.typing.{}", params.channel_id), request_id, &event, ) .await; self.emit_event(ImEvent::Typing { request_id, data: event, }); Ok(()) } pub async fn typing_stop( &self, ctx: &ImSession, wk_name: &str, params: TypingParams, ) -> Result<(), AppError> { let user_uid = ctx.user; let ws = self.resolve_workspace(wk_name).await?; self.ensure_workspace_readable(user_uid, &ws).await?; let key = typing_key(params.channel_id, params.thread_id, user_uid); let mut conn = self.ctx.redis.get_connection()?; redis::cmd("DEL") .arg(&key) .query::<()>(&mut *conn.inner_mut())?; Ok(()) } } fn typing_key(channel_id: Uuid, thread_id: Option, user_id: Uuid) -> String { match thread_id { Some(tid) => format!("{TYPING_PREFIX}{channel_id}:{tid}:{user_id}"), None => format!("{TYPING_PREFIX}{channel_id}:{user_id}"), } }