245 lines
7.9 KiB
Rust
245 lines
7.9 KiB
Rust
use serde::{Deserialize, Serialize};
|
|
use uuid::Uuid;
|
|
|
|
use crate::error::AppError;
|
|
use crate::immediate::{PresenceEvent, TypingEvent};
|
|
use crate::models::common::PresenceStatus;
|
|
use crate::models::users::UserPresence;
|
|
use crate::service::ImService;
|
|
use crate::service::im::events::ImEvent;
|
|
|
|
use super::session::ImSession;
|
|
use super::util::*;
|
|
|
|
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
|
|
pub struct UpdatePresenceParams {
|
|
pub status: String,
|
|
pub custom_status_text: Option<String>,
|
|
pub custom_status_emoji: Option<String>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
|
|
pub struct TypingParams {
|
|
pub channel_id: Uuid,
|
|
pub thread_id: Option<Uuid>,
|
|
}
|
|
|
|
impl ImService {
|
|
pub async fn presence_update(
|
|
&self,
|
|
ctx: &ImSession,
|
|
wk_name: &str,
|
|
params: UpdatePresenceParams,
|
|
) -> Result<UserPresence, AppError> {
|
|
let user_uid = ctx.user;
|
|
let _ = self.resolve_workspace(wk_name).await?;
|
|
|
|
let status = parse_enum(
|
|
Some(params.status),
|
|
PresenceStatus::Online,
|
|
PresenceStatus::Unknown,
|
|
"status",
|
|
)?;
|
|
|
|
let now = chrono::Utc::now();
|
|
|
|
let presence = sqlx::query_as::<_, UserPresence>(
|
|
"INSERT INTO user_presence (id, user_id, status, custom_status_text, custom_status_emoji, \
|
|
last_active_at, created_at, updated_at) \
|
|
VALUES ($1, $2, $3, $4, $5, $6, $6, $6) \
|
|
ON CONFLICT (user_id) DO UPDATE SET \
|
|
status = $3, custom_status_text = $4, custom_status_emoji = $5, \
|
|
last_active_at = $6, updated_at = $6 \
|
|
RETURNING id, user_id, status, custom_status_text, custom_status_emoji, \
|
|
device_type, ip_address, last_active_at, last_seen_at, created_at, updated_at",
|
|
)
|
|
.bind(Uuid::now_v7())
|
|
.bind(user_uid)
|
|
.bind(status)
|
|
.bind(params.custom_status_text.as_deref())
|
|
.bind(params.custom_status_emoji.as_deref())
|
|
.bind(now)
|
|
.fetch_one(self.ctx.db.writer())
|
|
.await
|
|
.map_err(AppError::Database)?;
|
|
|
|
// Cache in Redis for fast lookup
|
|
let key = format!("{PRESENCE_PREFIX}{user_uid}");
|
|
if let Ok(mut conn) = self.ctx.redis.get_connection() {
|
|
let _ = redis::cmd("SETEX")
|
|
.arg(&key)
|
|
.arg(PRESENCE_TTL_SECS as u64)
|
|
.arg(status.to_string())
|
|
.query::<()>(&mut *conn.inner_mut());
|
|
}
|
|
|
|
let request_id = Uuid::nil();
|
|
let event = PresenceEvent {
|
|
user_id: user_uid,
|
|
status: presence.status.to_string(),
|
|
custom_status_text: presence.custom_status_text.clone(),
|
|
custom_status_emoji: presence.custom_status_emoji.clone(),
|
|
};
|
|
self.publish(&format!("im.presence.{}", user_uid), request_id, &event)
|
|
.await;
|
|
self.emit_event(ImEvent::Presence {
|
|
request_id,
|
|
data: event,
|
|
});
|
|
Ok(presence)
|
|
}
|
|
|
|
pub async fn presence_get(
|
|
&self,
|
|
ctx: &ImSession,
|
|
wk_name: &str,
|
|
user_id: Uuid,
|
|
) -> Result<Option<UserPresence>, AppError> {
|
|
let user_uid = ctx.user;
|
|
let ws = self.resolve_workspace(wk_name).await?;
|
|
self.ensure_workspace_readable(user_uid, &ws).await?;
|
|
|
|
// Try DB first (has full record)
|
|
if let Some(p) = sqlx::query_as::<_, UserPresence>(
|
|
"SELECT id, user_id, status, custom_status_text, custom_status_emoji, \
|
|
device_type, ip_address, last_active_at, last_seen_at, created_at, updated_at \
|
|
FROM user_presence WHERE user_id = $1",
|
|
)
|
|
.bind(user_id)
|
|
.fetch_optional(self.ctx.db.reader())
|
|
.await
|
|
.map_err(AppError::Database)?
|
|
{
|
|
return Ok(Some(p));
|
|
}
|
|
|
|
// Fallback: check Redis for a cached status
|
|
let key = format!("{PRESENCE_PREFIX}{user_id}");
|
|
if let Ok(mut conn) = self.ctx.redis.get_connection() {
|
|
let cached: Option<String> = redis::cmd("GET")
|
|
.arg(&key)
|
|
.query(&mut *conn.inner_mut())
|
|
.ok()
|
|
.flatten();
|
|
|
|
if let Some(status_str) = cached
|
|
&& let Ok(status) = status_str.parse::<PresenceStatus>()
|
|
{
|
|
let now = chrono::Utc::now();
|
|
return Ok(Some(UserPresence {
|
|
id: Uuid::nil(),
|
|
user_id,
|
|
status,
|
|
custom_status_text: None,
|
|
custom_status_emoji: None,
|
|
device_type: None,
|
|
ip_address: None,
|
|
last_active_at: now,
|
|
last_seen_at: None,
|
|
created_at: now,
|
|
updated_at: now,
|
|
}));
|
|
}
|
|
}
|
|
|
|
Ok(None)
|
|
}
|
|
|
|
pub async fn presence_heartbeat(&self, ctx: &ImSession, wk_name: &str) -> Result<(), AppError> {
|
|
let user_uid = ctx.user;
|
|
let ws = self.resolve_workspace(wk_name).await?;
|
|
self.ensure_workspace_readable(user_uid, &ws).await?;
|
|
|
|
let key = format!("{PRESENCE_PREFIX}{user_uid}");
|
|
if let Ok(mut conn) = self.ctx.redis.get_connection()
|
|
&& let Err(e) = redis::cmd("SETEX")
|
|
.arg(&key)
|
|
.arg(PRESENCE_TTL_SECS as u64)
|
|
.arg("online")
|
|
.query::<()>(&mut *conn.inner_mut())
|
|
{
|
|
tracing::warn!(error = %e, "redis presence heartbeat failed");
|
|
}
|
|
|
|
let now = chrono::Utc::now();
|
|
if let Err(e) = sqlx::query(
|
|
"UPDATE user_presence SET last_active_at = $1, updated_at = $1 WHERE user_id = $2",
|
|
)
|
|
.bind(now)
|
|
.bind(user_uid)
|
|
.execute(self.ctx.db.writer())
|
|
.await
|
|
{
|
|
tracing::warn!(error = %e, "db presence heartbeat failed");
|
|
}
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn typing_start(
|
|
&self,
|
|
ctx: &ImSession,
|
|
wk_name: &str,
|
|
params: TypingParams,
|
|
) -> Result<(), AppError> {
|
|
let user_uid = ctx.user;
|
|
let ws = self.resolve_workspace(wk_name).await?;
|
|
self.ensure_workspace_readable(user_uid, &ws).await?;
|
|
|
|
let channel = self.resolve_channel(params.channel_id).await?;
|
|
self.ensure_channel_readable(user_uid, &channel).await?;
|
|
|
|
let key = typing_key(params.channel_id, params.thread_id, user_uid);
|
|
let mut conn = self.ctx.redis.get_connection()?;
|
|
redis::cmd("SETEX")
|
|
.arg(&key)
|
|
.arg(TYPING_TTL_SECS as u64)
|
|
.arg("1")
|
|
.query::<()>(&mut *conn.inner_mut())?;
|
|
|
|
let request_id = Uuid::nil();
|
|
let event = TypingEvent {
|
|
channel_id: params.channel_id,
|
|
thread_id: params.thread_id,
|
|
user_id: user_uid,
|
|
};
|
|
self.publish(
|
|
&format!("im.typing.{}", params.channel_id),
|
|
request_id,
|
|
&event,
|
|
)
|
|
.await;
|
|
self.emit_event(ImEvent::Typing {
|
|
request_id,
|
|
data: event,
|
|
});
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn typing_stop(
|
|
&self,
|
|
ctx: &ImSession,
|
|
wk_name: &str,
|
|
params: TypingParams,
|
|
) -> Result<(), AppError> {
|
|
let user_uid = ctx.user;
|
|
let ws = self.resolve_workspace(wk_name).await?;
|
|
self.ensure_workspace_readable(user_uid, &ws).await?;
|
|
|
|
let key = typing_key(params.channel_id, params.thread_id, user_uid);
|
|
let mut conn = self.ctx.redis.get_connection()?;
|
|
redis::cmd("DEL")
|
|
.arg(&key)
|
|
.query::<()>(&mut *conn.inner_mut())?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
fn typing_key(channel_id: Uuid, thread_id: Option<Uuid>, user_id: Uuid) -> String {
|
|
match thread_id {
|
|
Some(tid) => format!("{TYPING_PREFIX}{channel_id}:{tid}:{user_id}"),
|
|
None => format!("{TYPING_PREFIX}{channel_id}:{user_id}"),
|
|
}
|
|
}
|