Files
appks/service/im/messages.rs
T
2026-06-07 11:30:56 +08:00

890 lines
30 KiB
Rust

use std::sync::OnceLock;
use serde::{Deserialize, Serialize};
use uuid::Uuid;
use crate::error::AppError;
use crate::immediate::{MessageAction, MessageEvent};
use crate::models::channels::{Message, MessageBookmark, MessageEditHistory, SavedMessage};
use crate::models::common::{JsonValue, MessageType};
use crate::service::ImService;
use crate::service::im::delivery_trace::trace_message;
use crate::service::im::events::ImEvent;
use ::redis::Cmd;
use super::session::ImSession;
use super::util::*;
const MESSAGE_SEQ_SCRIPT: &str = "local cur = redis.call('GET', KEYS[1]); if (not cur) or (tonumber(cur) < tonumber(ARGV[1])) then redis.call('SET', KEYS[1], ARGV[1]); end; return redis.call('INCR', KEYS[1]);";
static MESSAGE_SEQ_SHA: OnceLock<String> = OnceLock::new();
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct SendMessageParams {
pub body: String,
pub message_type: Option<String>,
pub thread_id: Option<Uuid>,
pub reply_to_message_id: Option<Uuid>,
pub pinned: Option<bool>,
pub attachments: Option<Vec<CreateAttachmentParams>>,
pub embeds: Option<Vec<CreateEmbedParams>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct EditMessageParams {
pub body: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct CreateAttachmentParams {
pub filename: String,
pub url: String,
pub proxy_url: Option<String>,
pub size_bytes: i64,
pub mime_type: String,
pub width: Option<i32>,
pub height: Option<i32>,
pub duration_ms: Option<i64>,
pub thumbnail_url: Option<String>,
pub blurhash: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct CreateEmbedParams {
pub embed_type: Option<String>,
pub title: Option<String>,
pub description: Option<String>,
pub url: Option<String>,
pub author_name: Option<String>,
pub author_url: Option<String>,
pub author_icon_url: Option<String>,
pub thumbnail_url: Option<String>,
pub image_url: Option<String>,
pub color: Option<i32>,
pub fields: Option<JsonValue>,
pub footer_text: Option<String>,
pub footer_icon_url: Option<String>,
pub provider_name: Option<String>,
pub timestamp: Option<chrono::DateTime<chrono::Utc>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
pub struct MessageListFilters {
pub thread_id: Option<Uuid>,
pub author_id: Option<Uuid>,
pub pinned: Option<bool>,
pub before: Option<Uuid>,
pub after: Option<Uuid>,
}
impl ImService {
pub async fn message_list(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
filters: MessageListFilters,
limit: i64,
offset: i64,
) -> Result<Vec<Message>, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
let (limit, offset) = clamp_limit_offset(limit, offset);
sqlx::query_as::<_, Message>(
"SELECT id, channel_id, author_id, thread_id, reply_to_message_id, seq, \
message_type, body, metadata, pinned, system, edited_at, deleted_at, \
created_at, updated_at \
FROM message \
WHERE channel_id = $1 AND deleted_at IS NULL \
AND ($2::uuid IS NULL OR thread_id = $2) \
AND ($3::uuid IS NULL OR author_id = $3) \
AND ($4::bool IS NULL OR pinned = $4) \
AND ($5::uuid IS NULL OR seq < (SELECT seq FROM message WHERE id = $5)) \
AND ($6::uuid IS NULL OR seq > (SELECT seq FROM message WHERE id = $6)) \
ORDER BY seq DESC LIMIT $7 OFFSET $8",
)
.bind(channel_id)
.bind(filters.thread_id)
.bind(filters.author_id)
.bind(filters.pinned)
.bind(filters.before)
.bind(filters.after)
.bind(limit)
.bind(offset)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
#[tracing::instrument(skip(self, ctx, params))]
pub async fn message_send(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
params: SendMessageParams,
request_id: Uuid,
) -> Result<Message, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_member(user_uid, &channel).await?;
if channel.read_only {
self.ensure_channel_editable(user_uid, &channel).await?;
}
let body = required_text(params.body, "body")?;
if body.len() > MAX_MESSAGE_BODY {
return Err(AppError::BadRequest("message body too long".into()));
}
let msg_type = parse_enum(
params.message_type,
MessageType::Text,
MessageType::Unknown,
"message_type",
)?;
let thread_id = params.thread_id;
if let Some(thread_id) = thread_id {
self.resolve_thread(thread_id, channel_id).await?;
}
let now = chrono::Utc::now();
let message_id = Uuid::now_v7();
let seq = self.next_message_seq(channel_id).await?;
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query("SET LOCAL app.current_user_id = $1")
.bind(user_uid)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let message = sqlx::query_as::<_, Message>(
"INSERT INTO message \
(id, channel_id, author_id, thread_id, reply_to_message_id, seq, \
message_type, body, metadata, pinned, system, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, NULL, $9, false, $10, $10) \
RETURNING id, channel_id, author_id, thread_id, reply_to_message_id, seq, \
message_type, body, metadata, pinned, system, edited_at, deleted_at, \
created_at, updated_at",
)
.bind(message_id)
.bind(channel_id)
.bind(user_uid)
.bind(thread_id)
.bind(params.reply_to_message_id)
.bind(seq)
.bind(msg_type)
.bind(&body)
.bind(params.pinned.unwrap_or(false))
.bind(now)
.fetch_one(&mut *txn)
.await
.map_err(AppError::Database)?;
// Insert attachments
if let Some(attachments) = params.attachments {
for att in &attachments {
let att_filename = required_text(att.filename.clone(), "filename")?;
let att_url = required_text(att.url.clone(), "url")?;
let att_mime = required_text(att.mime_type.clone(), "mime_type")?;
sqlx::query(
"INSERT INTO message_attachment \
(id, message_id, channel_id, filename, url, proxy_url, \
size_bytes, mime_type, width, height, duration_ms, \
thumbnail_url, blurhash, created_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)",
)
.bind(Uuid::now_v7())
.bind(message_id)
.bind(channel_id)
.bind(&att_filename)
.bind(&att_url)
.bind(att.proxy_url.as_deref())
.bind(att.size_bytes)
.bind(&att_mime)
.bind(att.width)
.bind(att.height)
.bind(att.duration_ms)
.bind(att.thumbnail_url.as_deref())
.bind(att.blurhash.as_deref())
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
}
}
// Insert embeds
if let Some(embeds) = params.embeds {
for emb in &embeds {
sqlx::query(
"INSERT INTO message_embed \
(id, message_id, embed_type, title, description, url, \
author_name, author_url, author_icon_url, thumbnail_url, \
image_url, color, fields, footer_text, footer_icon_url, \
provider_name, \"timestamp\", created_at) \
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, \
$11, $12, $13, $14, $15, $16, $17, $18)",
)
.bind(Uuid::now_v7())
.bind(message_id)
.bind(emb.embed_type.as_deref().unwrap_or("rich"))
.bind(emb.title.as_deref())
.bind(emb.description.as_deref())
.bind(emb.url.as_deref())
.bind(emb.author_name.as_deref())
.bind(emb.author_url.as_deref())
.bind(emb.author_icon_url.as_deref())
.bind(emb.thumbnail_url.as_deref())
.bind(emb.image_url.as_deref())
.bind(emb.color)
.bind(emb.fields.clone())
.bind(emb.footer_text.as_deref())
.bind(emb.footer_icon_url.as_deref())
.bind(emb.provider_name.as_deref())
.bind(emb.timestamp)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
}
}
if let Some(thread_id) = thread_id {
sqlx::query(
"UPDATE message_thread SET replies_count = replies_count + 1, \
participants_count = (SELECT COUNT(DISTINCT author_id)::int FROM message WHERE thread_id = $3 AND deleted_at IS NULL), \
last_reply_message_id = $1, last_reply_at = $2, updated_at = $2 \
WHERE id = $3 AND channel_id = $4",
)
.bind(message_id)
.bind(now)
.bind(thread_id)
.bind(channel_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
}
// Update channel last_message
sqlx::query(
"UPDATE channel SET last_message_id = $1, last_message_at = $2, updated_at = $2 \
WHERE id = $3",
)
.bind(message_id)
.bind(now)
.bind(channel_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
self.update_channel_stats(channel_id, now, &mut txn).await?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
tracing::info!(message_id = %message_id, channel_id = %channel_id, "Message sent");
trace_message(
"committed",
request_id,
channel_id,
message.id,
Some(message.seq),
);
let event = MessageEvent {
channel_id,
thread_id: message.thread_id,
message_id: message.id,
author_id: message.author_id,
action: MessageAction::Created,
body: Some(message.body.clone()),
seq: Some(message.seq),
};
self.publish(&format!("im.message.{}", channel_id), request_id, &event)
.await;
self.emit_event(ImEvent::Message {
request_id,
data: event,
});
Ok(message)
}
pub async fn message_edit(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
params: EditMessageParams,
request_id: Uuid,
) -> Result<Message, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
let body = required_text(params.body, "body")?;
if body.len() > MAX_MESSAGE_BODY {
return Err(AppError::BadRequest("message body too long".into()));
}
let existing = self.resolve_message(message_id, channel_id).await?;
if existing.author_id != user_uid {
self.ensure_channel_admin(user_uid, &channel).await?;
}
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query("SET LOCAL app.current_user_id = $1")
.bind(user_uid)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
// Save edit history
sqlx::query(
"INSERT INTO message_edit_history (id, message_id, channel_id, previous_body, edited_by, edited_at) \
VALUES ($1, $2, $3, $4, $5, $6)",
)
.bind(Uuid::now_v7())
.bind(message_id)
.bind(channel_id)
.bind(&existing.body)
.bind(user_uid)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let updated = sqlx::query_as::<_, Message>(
"UPDATE message SET body = $1, edited_at = $2, updated_at = $2 \
WHERE id = $3 AND channel_id = $4 AND deleted_at IS NULL \
RETURNING id, channel_id, author_id, thread_id, reply_to_message_id, seq, \
message_type, body, metadata, pinned, system, edited_at, deleted_at, \
created_at, updated_at",
)
.bind(&body)
.bind(now)
.bind(message_id)
.bind(channel_id)
.fetch_one(&mut *txn)
.await
.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
let event = MessageEvent {
channel_id,
thread_id: updated.thread_id,
message_id: updated.id,
author_id: updated.author_id,
action: MessageAction::Edited,
body: Some(updated.body.clone()),
seq: Some(updated.seq),
};
self.publish(&format!("im.message.{}", channel_id), request_id, &event)
.await;
self.emit_event(ImEvent::Message {
request_id,
data: event,
});
Ok(updated)
}
pub async fn message_delete(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
request_id: Uuid,
) -> Result<(), AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
let existing = self.resolve_message(message_id, channel_id).await?;
if existing.author_id != user_uid {
self.ensure_channel_admin(user_uid, &channel).await?;
}
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query("SET LOCAL app.current_user_id = $1")
.bind(user_uid)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
let result = sqlx::query(
"UPDATE message SET deleted_at = $1, updated_at = $1 \
WHERE id = $2 AND channel_id = $3 AND deleted_at IS NULL",
)
.bind(now)
.bind(message_id)
.bind(channel_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
ensure_affected(result.rows_affected(), "message not found")?;
self.update_channel_stats(channel_id, now, &mut txn).await?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
let event = MessageEvent {
channel_id,
thread_id: None,
message_id,
author_id: existing.author_id,
action: MessageAction::Deleted,
body: None,
seq: Some(existing.seq),
};
self.publish(&format!("im.message.{}", channel_id), request_id, &event)
.await;
self.emit_event(ImEvent::Message {
request_id,
data: event,
});
Ok(())
}
pub async fn message_pin(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
request_id: Uuid,
) -> Result<(), AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_editable(user_uid, &channel).await?;
let message = self.resolve_message(message_id, channel_id).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query("SET LOCAL app.current_user_id = $1")
.bind(user_uid)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query("UPDATE message SET pinned = true, updated_at = $1 WHERE id = $2 AND deleted_at IS NULL")
.bind(now)
.bind(message_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query(
"INSERT INTO message_pin (id, message_id, channel_id, pinned_by, pinned_at) \
VALUES ($1, $2, $3, $4, $5) \
ON CONFLICT (message_id) DO NOTHING",
)
.bind(Uuid::now_v7())
.bind(message_id)
.bind(channel_id)
.bind(user_uid)
.bind(now)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
let event = MessageEvent {
channel_id,
thread_id: None,
message_id,
author_id: ctx.user,
action: MessageAction::Pinned,
body: None,
seq: Some(message.seq),
};
self.publish(&format!("im.message.{}", channel_id), request_id, &event)
.await;
self.emit_event(ImEvent::Message {
request_id,
data: event,
});
Ok(())
}
pub async fn message_unpin(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
request_id: Uuid,
) -> Result<(), AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_editable(user_uid, &channel).await?;
let message = self.resolve_message(message_id, channel_id).await?;
let now = chrono::Utc::now();
let mut txn = self
.ctx
.db
.writer()
.begin()
.await
.map_err(|_| AppError::TxnError)?;
sqlx::query("SET LOCAL app.current_user_id = $1")
.bind(user_uid)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query("UPDATE message SET pinned = false, updated_at = $1 WHERE id = $2 AND deleted_at IS NULL")
.bind(now)
.bind(message_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
sqlx::query("DELETE FROM message_pin WHERE message_id = $1")
.bind(message_id)
.execute(&mut *txn)
.await
.map_err(AppError::Database)?;
txn.commit().await.map_err(|_| AppError::TxnError)?;
let event = MessageEvent {
channel_id,
thread_id: None,
message_id,
author_id: ctx.user,
action: MessageAction::Unpinned,
body: None,
seq: Some(message.seq),
};
self.publish(&format!("im.message.{}", channel_id), request_id, &event)
.await;
self.emit_event(ImEvent::Message {
request_id,
data: event,
});
Ok(())
}
pub async fn message_list_pinned(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
) -> Result<Vec<Message>, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
sqlx::query_as::<_, Message>(
"SELECT m.id, m.channel_id, m.author_id, m.thread_id, m.reply_to_message_id, m.seq, \
m.message_type, m.body, m.metadata, m.pinned, m.system, m.edited_at, m.deleted_at, \
m.created_at, m.updated_at \
FROM message m \
JOIN message_pin mp ON mp.message_id = m.id \
WHERE m.channel_id = $1 AND m.deleted_at IS NULL AND m.pinned \
ORDER BY mp.pinned_at DESC",
)
.bind(channel_id)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
pub async fn message_edit_history(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
) -> Result<Vec<MessageEditHistory>, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
sqlx::query_as::<_, MessageEditHistory>(
"SELECT id, message_id, channel_id, previous_body, edited_by, edited_at \
FROM message_edit_history \
WHERE message_id = $1 AND channel_id = $2 \
ORDER BY edited_at ASC",
)
.bind(message_id)
.bind(channel_id)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
pub async fn message_bookmark(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
note: Option<String>,
) -> Result<MessageBookmark, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
self.resolve_message(message_id, channel_id).await?;
let now = chrono::Utc::now();
sqlx::query_as::<_, MessageBookmark>(
"INSERT INTO message_bookmark (id, message_id, channel_id, user_id, note, created_at, updated_at) \
VALUES ($1, $2, $3, $4, $5, $6, $6) \
ON CONFLICT (message_id, user_id) DO UPDATE SET note = COALESCE($5, message_bookmark.note), updated_at = $6 \
RETURNING id, message_id, channel_id, user_id, note, created_at, updated_at",
)
.bind(Uuid::now_v7())
.bind(message_id)
.bind(channel_id)
.bind(user_uid)
.bind(note.as_deref())
.bind(now)
.fetch_one(self.ctx.db.writer())
.await
.map_err(AppError::Database)
}
pub async fn message_unbookmark(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
) -> Result<(), AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
let result = sqlx::query(
"DELETE FROM message_bookmark WHERE message_id = $1 AND user_id = $2 AND channel_id = $3",
)
.bind(message_id)
.bind(user_uid)
.bind(channel_id)
.execute(self.ctx.db.writer())
.await
.map_err(AppError::Database)?;
ensure_affected(result.rows_affected(), "bookmark not found")
}
pub async fn message_list_bookmarks(
&self,
ctx: &ImSession,
wk_name: &str,
limit: i64,
offset: i64,
) -> Result<Vec<MessageBookmark>, AppError> {
let user_uid = ctx.user;
let ws = self.resolve_workspace(wk_name).await?;
self.ensure_workspace_readable(user_uid, &ws).await?;
let (limit, offset) = clamp_limit_offset(limit, offset);
sqlx::query_as::<_, MessageBookmark>(
"SELECT mb.id, mb.message_id, mb.channel_id, mb.user_id, mb.note, mb.created_at, mb.updated_at \
FROM message_bookmark mb \
JOIN channel c ON c.id = mb.channel_id \
WHERE mb.user_id = $1 AND c.workspace_id = $2 \
ORDER BY mb.created_at DESC LIMIT $3 OFFSET $4",
)
.bind(user_uid)
.bind(ws.id)
.bind(limit)
.bind(offset)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
pub async fn message_save(
&self,
ctx: &ImSession,
_wk_name: &str,
channel_id: Uuid,
message_id: Uuid,
note: Option<String>,
) -> Result<SavedMessage, AppError> {
let user_uid = ctx.user;
let channel = self.resolve_channel(channel_id).await?;
self.ensure_channel_readable(user_uid, &channel).await?;
self.resolve_message(message_id, channel_id).await?;
let now = chrono::Utc::now();
sqlx::query_as::<_, SavedMessage>(
"INSERT INTO saved_message (id, user_id, message_id, channel_id, note, created_at) \
VALUES ($1, $2, $3, $4, $5, $6) \
ON CONFLICT (user_id, message_id) DO NOTHING \
RETURNING id, user_id, message_id, channel_id, note, created_at",
)
.bind(Uuid::now_v7())
.bind(user_uid)
.bind(message_id)
.bind(channel_id)
.bind(note.as_deref())
.bind(now)
.fetch_one(self.ctx.db.writer())
.await
.map_err(AppError::Database)
}
pub async fn message_unsave(
&self,
ctx: &ImSession,
wk_name: &str,
message_id: Uuid,
) -> Result<(), AppError> {
let user_uid = ctx.user;
let ws = self.resolve_workspace(wk_name).await?;
self.ensure_workspace_readable(user_uid, &ws).await?;
let result =
sqlx::query("DELETE FROM saved_message WHERE user_id = $1 AND message_id = $2")
.bind(user_uid)
.bind(message_id)
.execute(self.ctx.db.writer())
.await
.map_err(AppError::Database)?;
ensure_affected(result.rows_affected(), "saved message not found")
}
pub async fn message_list_saved(
&self,
ctx: &ImSession,
wk_name: &str,
limit: i64,
offset: i64,
) -> Result<Vec<SavedMessage>, AppError> {
let user_uid = ctx.user;
let ws = self.resolve_workspace(wk_name).await?;
self.ensure_workspace_readable(user_uid, &ws).await?;
let (limit, offset) = clamp_limit_offset(limit, offset);
sqlx::query_as::<_, SavedMessage>(
"SELECT sm.id, sm.user_id, sm.message_id, sm.channel_id, sm.note, sm.created_at \
FROM saved_message sm \
JOIN channel c ON c.id = sm.channel_id \
WHERE sm.user_id = $1 AND c.workspace_id = $2 \
ORDER BY sm.created_at DESC LIMIT $3 OFFSET $4",
)
.bind(user_uid)
.bind(ws.id)
.bind(limit)
.bind(offset)
.fetch_all(self.ctx.db.reader())
.await
.map_err(AppError::Database)
}
async fn next_message_seq(&self, channel_id: Uuid) -> Result<i64, AppError> {
let key = format!("im:seq:{channel_id}");
let mut conn = self.ctx.redis.get_connection()?;
let exists: bool = Cmd::new()
.arg("EXISTS")
.arg(&key)
.query(&mut *conn.inner_mut())
.map_err(AppError::Redis)?;
let db_max = if exists {
0
} else {
sqlx::query_scalar(
"SELECT COALESCE(MAX(seq), 0)::bigint FROM message WHERE channel_id = $1",
)
.bind(channel_id)
.fetch_one(self.ctx.db.reader())
.await
.map_err(AppError::Database)?
};
let sha = self.message_seq_sha()?;
let result: Result<i64, redis::RedisError> = Cmd::new()
.arg("EVALSHA")
.arg(&sha)
.arg(1)
.arg(&key)
.arg(db_max)
.query(&mut *conn.inner_mut());
match result {
Ok(seq) => Ok(seq),
Err(e) if e.to_string().contains("NOSCRIPT") => Cmd::new()
.arg("EVAL")
.arg(MESSAGE_SEQ_SCRIPT)
.arg(1)
.arg(&key)
.arg(db_max)
.query(&mut *conn.inner_mut())
.map_err(AppError::Redis),
Err(e) => Err(AppError::Redis(e)),
}
}
fn message_seq_sha(&self) -> Result<String, AppError> {
if let Some(sha) = MESSAGE_SEQ_SHA.get() {
return Ok(sha.clone());
}
let mut conn = self.ctx.redis.get_connection()?;
let sha: String = Cmd::new()
.arg("SCRIPT")
.arg("LOAD")
.arg(MESSAGE_SEQ_SCRIPT)
.query(&mut *conn.inner_mut())
.map_err(AppError::Redis)?;
let _ = MESSAGE_SEQ_SHA.set(sha.clone());
Ok(sha)
}
pub(crate) async fn resolve_message(
&self,
message_id: Uuid,
channel_id: Uuid,
) -> Result<Message, AppError> {
sqlx::query_as::<_, Message>(
"SELECT id, channel_id, author_id, thread_id, reply_to_message_id, seq, \
message_type, body, metadata, pinned, system, edited_at, deleted_at, \
created_at, updated_at \
FROM message WHERE id = $1 AND channel_id = $2 AND deleted_at IS NULL",
)
.bind(message_id)
.bind(channel_id)
.fetch_optional(self.ctx.db.reader())
.await
.map_err(AppError::Database)?
.ok_or(AppError::NotFound("message not found".into()))
}
}