use serde::{Deserialize, Serialize}; use uuid::Uuid; use crate::error::AppError; use crate::immediate::{FollowAction, FollowEvent}; use crate::models::channels::{ArticleCrossPost, ChannelFollow}; use crate::service::ImService; use crate::service::im::events::ImEvent; use super::session::ImSession; use super::util::*; #[derive(Deserialize, Serialize, Clone, Debug, utoipa::ToSchema)] pub struct FollowChannelParams { pub target_workspace_id: Uuid, pub target_channel_id: Option, pub webhook_url: Option, } impl ImService { async fn follow_realtime(&self, channel_id: Uuid, follow_id: Uuid, action: FollowAction) { let request_id = Uuid::nil(); let event = FollowEvent { channel_id, follow_id, action, }; self.publish(&format!("im.follow.{channel_id}"), request_id, &event) .await; self.emit_event(ImEvent::Follow { request_id, data: event, }); } pub async fn follow_list( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, ) -> Result, AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; sqlx::query_as::<_, ChannelFollow>( "SELECT id, source_channel_id, target_workspace_id, target_channel_id, \ webhook_url, webhook_secret_ciphertext, enabled, followed_by, \ unfollowed_at, last_delivery_at, last_delivery_status, created_at, updated_at \ FROM channel_follow WHERE source_channel_id = $1 AND unfollowed_at IS NULL \ ORDER BY created_at DESC", ) .bind(channel_id) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } pub async fn follow_create( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, params: FollowChannelParams, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_readable(user_uid, &channel).await?; let now = chrono::Utc::now(); let follow = sqlx::query_as::<_, ChannelFollow>( "INSERT INTO channel_follow \ (id, source_channel_id, target_workspace_id, target_channel_id, \ webhook_url, enabled, followed_by, created_at, updated_at) \ VALUES ($1, $2, $3, $4, $5, true, $6, $7, $7) \ ON CONFLICT (source_channel_id, target_workspace_id, target_channel_id) \ DO UPDATE SET enabled = true, unfollowed_at = NULL, updated_at = $7 \ RETURNING id, source_channel_id, target_workspace_id, target_channel_id, \ webhook_url, webhook_secret_ciphertext, enabled, followed_by, \ unfollowed_at, last_delivery_at, last_delivery_status, created_at, updated_at", ) .bind(Uuid::now_v7()) .bind(channel_id) .bind(params.target_workspace_id) .bind(params.target_channel_id) .bind(params.webhook_url.as_deref()) .bind(user_uid) .bind(now) .fetch_one(self.ctx.db.writer()) .await .map_err(AppError::Database)?; self.follow_realtime(channel_id, follow.id, FollowAction::Created) .await; Ok(follow) } pub async fn follow_delete( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, follow_id: Uuid, ) -> Result<(), AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; let now = chrono::Utc::now(); let result = sqlx::query( "UPDATE channel_follow SET unfollowed_at = $1, enabled = false, updated_at = $1 \ WHERE id = $2 AND source_channel_id = $3 AND unfollowed_at IS NULL", ) .bind(now) .bind(follow_id) .bind(channel_id) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; ensure_affected(result.rows_affected(), "follow not found")?; self.follow_realtime(channel_id, follow_id, FollowAction::Deleted) .await; Ok(()) } pub(crate) async fn cross_post_article( &self, article_id: Uuid, channel_id: Uuid, _actor_id: Uuid, ) -> Result { let followers = sqlx::query_as::<_, ChannelFollow>( "SELECT id, source_channel_id, target_workspace_id, target_channel_id, \ webhook_url, webhook_secret_ciphertext, enabled, followed_by, \ unfollowed_at, last_delivery_at, last_delivery_status, created_at, updated_at \ FROM channel_follow WHERE source_channel_id = $1 AND enabled AND unfollowed_at IS NULL", ) .bind(channel_id) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database)?; let now = chrono::Utc::now(); let mut count = 0u64; for follow in &followers { sqlx::query( "INSERT INTO article_cross_post \ (id, article_id, follow_id, target_workspace_id, target_channel_id, \ status, attempts, created_at) \ VALUES ($1, $2, $3, $4, $5, 'pending', 0, $6) \ ON CONFLICT DO NOTHING", ) .bind(Uuid::now_v7()) .bind(article_id) .bind(follow.id) .bind(follow.target_workspace_id) .bind(follow.target_channel_id) .bind(now) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; count += 1; } if count > 0 { sqlx::query("UPDATE article SET cross_posted = true WHERE id = $1") .bind(article_id) .execute(self.ctx.db.writer()) .await .map_err(AppError::Database)?; } tracing::info!( article_id = %article_id, followers = count, "Cross-post jobs created" ); Ok(count) } pub async fn cross_post_list( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, article_id: Uuid, ) -> Result, AppError> { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; sqlx::query_as::<_, ArticleCrossPost>( "SELECT id, article_id, follow_id, target_workspace_id, target_channel_id, \ status, attempts, last_error, sent_at, delivered_at, failed_at, created_at \ FROM article_cross_post WHERE article_id = $1 ORDER BY created_at ASC", ) .bind(article_id) .fetch_all(self.ctx.db.reader()) .await .map_err(AppError::Database) } pub async fn cross_post_retry( &self, ctx: &ImSession, _wk_name: &str, channel_id: Uuid, cross_post_id: Uuid, ) -> Result { let user_uid = ctx.user; let channel = self.resolve_channel(channel_id).await?; self.ensure_channel_admin(user_uid, &channel).await?; let post = sqlx::query_as::<_, ArticleCrossPost>( "UPDATE article_cross_post SET status = 'pending', attempts = 0, \ last_error = NULL, failed_at = NULL \ WHERE id = $1 AND status = 'failed' \ RETURNING id, article_id, follow_id, target_workspace_id, target_channel_id, \ status, attempts, last_error, sent_at, delivered_at, failed_at, created_at", ) .bind(cross_post_id) .fetch_one(self.ctx.db.writer()) .await .map_err(AppError::Database)?; self.follow_realtime(channel_id, post.follow_id, FollowAction::Retried) .await; Ok(post) } }