Files
imks/repo/message_scheduled.rs
zhenyi 821537186e refactor(tests): reformat code and update dependency management
- Reorganized import statements in adapter tests for better readability
- Replaced or_insert_with(Vec::new) with or_default() in test closures
- Updated Cargo.lock with new dependency versions and checksums
- Added TLS features to tonic dependency configuration
- Included sqlx, chrono, and uuid dependencies with specific features
- Added jsonwebtoken and arc-swap as project dependencies
- Reformatted assertion statements to comply with line length limits
- Adjusted base64 import order in engine codec module
- Updated protobuf include statement formatting
2026-06-11 12:11:05 +08:00

160 lines
4.5 KiB
Rust

//! Scheduled message CRUD operations on `MessageRepo`.
use chrono::Utc;
use uuid::Uuid;
use crate::ImksResult;
use crate::models::message_scheduled::MessageScheduled;
use super::message_repo::MessageRepo;
impl MessageRepo {
/// Schedule a message to be sent later.
#[allow(clippy::too_many_arguments)]
pub async fn schedule_message(
&self,
channel_id: Uuid,
author_id: Uuid,
thread_id: Option<Uuid>,
reply_to_message_id: Option<Uuid>,
body: &str,
metadata: Option<serde_json::Value>,
scheduled_at: chrono::DateTime<Utc>,
) -> ImksResult<MessageScheduled> {
let id = Uuid::now_v7();
let now = Utc::now();
sqlx::query_as::<_, MessageScheduled>(
r#"
INSERT INTO message_scheduled (
id, channel_id, author_id, thread_id, reply_to_message_id,
body, metadata, scheduled_at, status, created_at, updated_at
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, 'pending', $9, $9)
RETURNING *
"#,
)
.bind(id)
.bind(channel_id)
.bind(author_id)
.bind(thread_id)
.bind(reply_to_message_id)
.bind(body)
.bind(metadata)
.bind(scheduled_at)
.bind(now)
.fetch_one(self.pool())
.await
.map_err(Into::into)
}
/// Cancel a scheduled message (only if still pending).
pub async fn cancel_scheduled(&self, scheduled_id: Uuid) -> ImksResult<bool> {
let result = sqlx::query(
r#"
UPDATE message_scheduled
SET status = 'cancelled', updated_at = $1
WHERE id = $2 AND status = 'pending'
"#,
)
.bind(Utc::now())
.bind(scheduled_id)
.execute(self.pool())
.await?;
Ok(result.rows_affected() > 0)
}
/// List a user's scheduled messages.
pub async fn list_scheduled(
&self,
channel_id: Uuid,
author_id: Uuid,
) -> ImksResult<Vec<MessageScheduled>> {
sqlx::query_as::<_, MessageScheduled>(
r#"
SELECT * FROM message_scheduled
WHERE channel_id = $1 AND author_id = $2 AND status = 'pending'
ORDER BY scheduled_at ASC
"#,
)
.bind(channel_id)
.bind(author_id)
.fetch_all(self.pool())
.await
.map_err(Into::into)
}
/// Atomically claim due scheduled messages for background dispatch.
pub async fn claim_due_scheduled(&self) -> ImksResult<Vec<MessageScheduled>> {
let mut tx = self.pool().begin().await?;
let now = Utc::now();
let rows = sqlx::query_as::<_, MessageScheduled>(
r#"
UPDATE message_scheduled
SET status = 'processing', updated_at = $1
WHERE id IN (
SELECT id
FROM message_scheduled
WHERE status = 'pending' AND scheduled_at <= $1
ORDER BY scheduled_at ASC
LIMIT 100
FOR UPDATE SKIP LOCKED
)
RETURNING *
"#,
)
.bind(now)
.fetch_all(&mut *tx)
.await?;
tx.commit().await?;
Ok(rows)
}
/// Get all pending scheduled messages whose time has come (for background dispatch).
pub async fn get_due_scheduled(&self) -> ImksResult<Vec<MessageScheduled>> {
self.claim_due_scheduled().await
}
/// Mark a scheduled message as sent.
pub async fn mark_scheduled_sent(
&self,
scheduled_id: Uuid,
sent_message_id: Uuid,
) -> ImksResult<()> {
sqlx::query(
r#"
UPDATE message_scheduled
SET status = 'sent', sent_message_id = $1, updated_at = $2
WHERE id = $3 AND status = 'processing'
"#,
)
.bind(sent_message_id)
.bind(Utc::now())
.bind(scheduled_id)
.execute(self.pool())
.await?;
Ok(())
}
/// Mark a scheduled message as failed.
pub async fn mark_scheduled_failed(&self, scheduled_id: Uuid, error: &str) -> ImksResult<()> {
sqlx::query(
r#"
UPDATE message_scheduled
SET status = 'failed', error = $1, updated_at = $2
WHERE id = $3 AND status = 'processing'
"#,
)
.bind(error)
.bind(Utc::now())
.bind(scheduled_id)
.execute(self.pool())
.await?;
Ok(())
}
}