322 lines
11 KiB
Rust
322 lines
11 KiB
Rust
use serde::{Deserialize, Serialize};
|
|
use uuid::Uuid;
|
|
|
|
use crate::error::AppError;
|
|
use crate::immediate::{ThreadAction, ThreadEvent};
|
|
use crate::models::channels::MessageThread;
|
|
use crate::service::ImService;
|
|
use crate::service::im::events::ImEvent;
|
|
|
|
use super::session::ImSession;
|
|
use super::util::*;
|
|
|
|
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
|
|
pub struct CreateThreadParams {
|
|
pub title: Option<String>,
|
|
pub root_message_id: Uuid,
|
|
pub tags: Option<Vec<String>>,
|
|
pub auto_archive_duration: Option<i32>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
|
|
pub struct UpdateThreadParams {
|
|
pub title: Option<String>,
|
|
pub tags: Option<Vec<String>>,
|
|
pub pinned: Option<bool>,
|
|
pub locked: Option<bool>,
|
|
pub rate_limit_per_user: Option<i32>,
|
|
pub resolved: Option<bool>,
|
|
}
|
|
|
|
#[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)]
|
|
pub struct ThreadListFilters {
|
|
pub pinned: Option<bool>,
|
|
pub locked: Option<bool>,
|
|
pub resolved: Option<bool>,
|
|
}
|
|
|
|
impl ImService {
|
|
pub async fn thread_list(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
filters: ThreadListFilters,
|
|
limit: i64,
|
|
offset: i64,
|
|
) -> Result<Vec<MessageThread>, AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
self.ensure_channel_readable(user_uid, &channel).await?;
|
|
let (limit, offset) = clamp_limit_offset(limit, offset);
|
|
|
|
sqlx::query_as::<_, MessageThread>(
|
|
"SELECT id, channel_id, root_message_id, created_by, replies_count, \
|
|
participants_count, last_reply_message_id, last_reply_at, resolved, \
|
|
resolved_by, resolved_at, title, tags, pinned, locked, \
|
|
rate_limit_per_user, auto_archive_at, created_at, updated_at \
|
|
FROM message_thread WHERE channel_id = $1 \
|
|
AND ($2::bool IS NULL OR pinned = $2) \
|
|
AND ($3::bool IS NULL OR locked = $3) \
|
|
AND ($4::bool IS NULL OR resolved = $4) \
|
|
ORDER BY last_reply_at DESC NULLS LAST, created_at DESC \
|
|
LIMIT $5 OFFSET $6",
|
|
)
|
|
.bind(channel_id)
|
|
.bind(filters.pinned)
|
|
.bind(filters.locked)
|
|
.bind(filters.resolved)
|
|
.bind(limit)
|
|
.bind(offset)
|
|
.fetch_all(self.ctx.db.reader())
|
|
.await
|
|
.map_err(AppError::Database)
|
|
}
|
|
|
|
pub async fn thread_get(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
thread_id: Uuid,
|
|
) -> Result<MessageThread, AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
self.ensure_channel_readable(user_uid, &channel).await?;
|
|
self.resolve_thread(thread_id, channel_id).await
|
|
}
|
|
|
|
pub async fn thread_create(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
params: CreateThreadParams,
|
|
) -> Result<MessageThread, AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
self.ensure_channel_readable(user_uid, &channel).await?;
|
|
|
|
self.resolve_message(params.root_message_id, channel_id)
|
|
.await?;
|
|
|
|
let now = chrono::Utc::now();
|
|
let thread_id = Uuid::now_v7();
|
|
let tags = params.tags.unwrap_or_default();
|
|
let auto_archive_at = params
|
|
.auto_archive_duration
|
|
.map(|d| now + chrono::Duration::minutes(d as i64));
|
|
|
|
let thread = sqlx::query_as::<_, MessageThread>(
|
|
"INSERT INTO message_thread \
|
|
(id, channel_id, root_message_id, created_by, replies_count, \
|
|
participants_count, last_reply_message_id, last_reply_at, resolved, \
|
|
title, tags, pinned, locked, auto_archive_at, created_at, updated_at) \
|
|
VALUES ($1, $2, $3, $4, 0, 0, NULL, NULL, false, $5, $6, false, false, $7, $8, $8) \
|
|
RETURNING id, channel_id, root_message_id, created_by, replies_count, \
|
|
participants_count, last_reply_message_id, last_reply_at, resolved, \
|
|
resolved_by, resolved_at, title, tags, pinned, locked, \
|
|
rate_limit_per_user, auto_archive_at, created_at, updated_at",
|
|
)
|
|
.bind(thread_id)
|
|
.bind(channel_id)
|
|
.bind(params.root_message_id)
|
|
.bind(user_uid)
|
|
.bind(params.title.as_deref())
|
|
.bind(&tags)
|
|
.bind(auto_archive_at)
|
|
.bind(now)
|
|
.fetch_one(self.ctx.db.writer())
|
|
.await
|
|
.map_err(AppError::Database)?;
|
|
|
|
tracing::info!(thread_id = %thread_id, channel_id = %channel_id, "Thread created");
|
|
let request_id = Uuid::nil();
|
|
let event = ThreadEvent {
|
|
channel_id,
|
|
thread_id,
|
|
action: ThreadAction::Created,
|
|
};
|
|
self.publish(
|
|
&format!("im.thread.{}.{}", channel_id, thread_id),
|
|
request_id,
|
|
&event,
|
|
)
|
|
.await;
|
|
self.emit_event(ImEvent::Thread {
|
|
request_id,
|
|
data: event,
|
|
});
|
|
Ok(thread)
|
|
}
|
|
|
|
pub async fn thread_update(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
thread_id: Uuid,
|
|
params: UpdateThreadParams,
|
|
) -> Result<MessageThread, AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
let thread = self.resolve_thread(thread_id, channel_id).await?;
|
|
|
|
let is_owner = thread.created_by == user_uid;
|
|
if !is_owner {
|
|
self.ensure_channel_editable(user_uid, &channel).await?;
|
|
}
|
|
|
|
let now = chrono::Utc::now();
|
|
let resolved_by = if params.resolved == Some(true) && !thread.resolved {
|
|
Some(user_uid)
|
|
} else {
|
|
thread.resolved_by
|
|
};
|
|
let resolved_at = if params.resolved == Some(true) && !thread.resolved {
|
|
Some(now)
|
|
} else if params.resolved == Some(false) {
|
|
None
|
|
} else {
|
|
thread.resolved_at
|
|
};
|
|
|
|
let updated = sqlx::query_as::<_, MessageThread>(
|
|
"UPDATE message_thread SET \
|
|
title = COALESCE($1, title), \
|
|
tags = COALESCE($2, tags), \
|
|
pinned = COALESCE($3, pinned), \
|
|
locked = COALESCE($4, locked), \
|
|
rate_limit_per_user = COALESCE($5, rate_limit_per_user), \
|
|
resolved = COALESCE($6, resolved), \
|
|
resolved_by = $7, resolved_at = $8, \
|
|
updated_at = $9 \
|
|
WHERE id = $10 \
|
|
RETURNING id, channel_id, root_message_id, created_by, replies_count, \
|
|
participants_count, last_reply_message_id, last_reply_at, resolved, \
|
|
resolved_by, resolved_at, title, tags, pinned, locked, \
|
|
rate_limit_per_user, auto_archive_at, created_at, updated_at",
|
|
)
|
|
.bind(params.title.as_deref())
|
|
.bind(params.tags.as_deref())
|
|
.bind(params.pinned)
|
|
.bind(params.locked)
|
|
.bind(params.rate_limit_per_user)
|
|
.bind(params.resolved)
|
|
.bind(resolved_by)
|
|
.bind(resolved_at)
|
|
.bind(now)
|
|
.bind(thread_id)
|
|
.fetch_one(self.ctx.db.writer())
|
|
.await
|
|
.map_err(AppError::Database)?;
|
|
let request_id = Uuid::nil();
|
|
let event = ThreadEvent {
|
|
channel_id,
|
|
thread_id,
|
|
action: ThreadAction::Updated,
|
|
};
|
|
self.publish(
|
|
&format!("im.thread.{}.{}", channel_id, thread_id),
|
|
request_id,
|
|
&event,
|
|
)
|
|
.await;
|
|
self.emit_event(ImEvent::Thread {
|
|
request_id,
|
|
data: event,
|
|
});
|
|
Ok(updated)
|
|
}
|
|
|
|
pub async fn thread_delete(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
thread_id: Uuid,
|
|
) -> Result<(), AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
self.ensure_channel_admin(user_uid, &channel).await?;
|
|
|
|
let result = sqlx::query("DELETE FROM message_thread WHERE id = $1 AND channel_id = $2")
|
|
.bind(thread_id)
|
|
.bind(channel_id)
|
|
.execute(self.ctx.db.writer())
|
|
.await
|
|
.map_err(AppError::Database)?;
|
|
|
|
ensure_affected(result.rows_affected(), "thread not found")?;
|
|
let request_id = Uuid::nil();
|
|
let event = ThreadEvent {
|
|
channel_id,
|
|
thread_id,
|
|
action: ThreadAction::Deleted,
|
|
};
|
|
self.publish(
|
|
&format!("im.thread.{}.{}", channel_id, thread_id),
|
|
request_id,
|
|
&event,
|
|
)
|
|
.await;
|
|
self.emit_event(ImEvent::Thread {
|
|
request_id,
|
|
data: event,
|
|
});
|
|
Ok(())
|
|
}
|
|
|
|
pub async fn thread_read_state_update(
|
|
&self,
|
|
ctx: &ImSession,
|
|
_wk_name: &str,
|
|
channel_id: Uuid,
|
|
thread_id: Uuid,
|
|
message_id: Uuid,
|
|
) -> Result<(), AppError> {
|
|
let user_uid = ctx.user;
|
|
let channel = self.resolve_channel(channel_id).await?;
|
|
self.ensure_channel_readable(user_uid, &channel).await?;
|
|
|
|
let now = chrono::Utc::now();
|
|
sqlx::query(
|
|
"INSERT INTO thread_read_state (id, user_id, thread_id, channel_id, last_read_message_id, last_read_at, updated_at) \
|
|
VALUES ($1, $2, $3, $4, $5, $6, $6) \
|
|
ON CONFLICT (user_id, thread_id) DO UPDATE SET \
|
|
last_read_message_id = $5, last_read_at = $6, updated_at = $6",
|
|
)
|
|
.bind(Uuid::now_v7())
|
|
.bind(user_uid)
|
|
.bind(thread_id)
|
|
.bind(channel_id)
|
|
.bind(message_id)
|
|
.bind(now)
|
|
.execute(self.ctx.db.writer())
|
|
.await
|
|
.map_err(AppError::Database)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
pub(crate) async fn resolve_thread(
|
|
&self,
|
|
thread_id: Uuid,
|
|
channel_id: Uuid,
|
|
) -> Result<MessageThread, AppError> {
|
|
sqlx::query_as::<_, MessageThread>(
|
|
"SELECT id, channel_id, root_message_id, created_by, replies_count, \
|
|
participants_count, last_reply_message_id, last_reply_at, resolved, \
|
|
resolved_by, resolved_at, title, tags, pinned, locked, \
|
|
rate_limit_per_user, auto_archive_at, created_at, updated_at \
|
|
FROM message_thread WHERE id = $1 AND channel_id = $2",
|
|
)
|
|
.bind(thread_id)
|
|
.bind(channel_id)
|
|
.fetch_optional(self.ctx.db.reader())
|
|
.await
|
|
.map_err(AppError::Database)?
|
|
.ok_or(AppError::NotFound("thread not found".into()))
|
|
}
|
|
}
|