use std::{ fmt, future::Future, sync::{ Arc, atomic::{AtomicU64, Ordering}, }, time::Duration, }; use tokio::sync::{mpsc, watch}; use tracing::{error, info, warn}; pub use crate::error::QueueError; use crate::{error::EmailError, pb::email::v1::SendEmailRequest, status::JobStatusStore}; pub const MAX_FAILURES: u8 = 3; const RETRY_BASE_DELAY: Duration = Duration::from_millis(100); const RETRY_MAX_DELAY: Duration = Duration::from_secs(5); #[derive(Clone)] pub struct EmailQueue { sender: QueueSender, next_id: Arc, status_store: JobStatusStore, shutdown: Arc, } pub struct EmailQueueWorker { receiver: QueueReceiver, requeue_sender: QueueSender, status_store: JobStatusStore, shutdown_rx: watch::Receiver, } #[derive(Clone)] pub struct EmailJob { pub id: u64, pub request: Arc, pub failed_attempts: u8, } #[derive(Clone)] enum QueueSender { Unbounded(mpsc::UnboundedSender), Bounded(mpsc::Sender), } enum QueueReceiver { Unbounded(mpsc::UnboundedReceiver), Bounded(mpsc::Receiver), } struct QueueShutdown { tx: watch::Sender, } impl Drop for QueueShutdown { fn drop(&mut self) { let _ = self.tx.send(true); } } fn new_shutdown_pair() -> (Arc, watch::Receiver) { let (tx, rx) = watch::channel(false); (Arc::new(QueueShutdown { tx }), rx) } fn retry_delay(attempt: u8, job_id: u64) -> Duration { let multiplier = 1u32 << u32::from(attempt.saturating_sub(1)).min(8); let base = RETRY_BASE_DELAY.saturating_mul(multiplier); let jitter = Duration::from_millis(job_id % 100); base.saturating_add(jitter).min(RETRY_MAX_DELAY) } impl QueueSender { fn send(&self, job: EmailJob) -> Result<(), QueueError> { match self { Self::Unbounded(tx) => tx.send(job).map_err(|_| QueueError::Closed), Self::Bounded(tx) => tx.try_send(job).map_err(|e| match e { mpsc::error::TrySendError::Full(_) => QueueError::Full, mpsc::error::TrySendError::Closed(_) => QueueError::Closed, }), } } } impl QueueReceiver { async fn recv(&mut self) -> Option { match self { Self::Unbounded(rx) => rx.recv().await, Self::Bounded(rx) => rx.recv().await, } } } impl EmailQueue { pub fn unbounded() -> (Self, EmailQueueWorker) { let store = JobStatusStore::new(); Self::build(mpsc::unbounded_channel(), store) } pub fn bounded(capacity: usize) -> (Self, EmailQueueWorker) { let store = JobStatusStore::new(); Self::build_bounded(mpsc::channel(capacity), store) } fn build( (tx, rx): ( mpsc::UnboundedSender, mpsc::UnboundedReceiver, ), store: JobStatusStore, ) -> (Self, EmailQueueWorker) { let (shutdown, shutdown_rx) = new_shutdown_pair(); let sender = QueueSender::Unbounded(tx); let queue = Self { sender: sender.clone(), next_id: Arc::new(AtomicU64::new(1)), status_store: store.clone(), shutdown, }; let worker = EmailQueueWorker { receiver: QueueReceiver::Unbounded(rx), requeue_sender: sender, status_store: store, shutdown_rx, }; (queue, worker) } fn build_bounded( (tx, rx): (mpsc::Sender, mpsc::Receiver), store: JobStatusStore, ) -> (Self, EmailQueueWorker) { let (shutdown, shutdown_rx) = new_shutdown_pair(); let sender = QueueSender::Bounded(tx.clone()); let queue = Self { sender: sender.clone(), next_id: Arc::new(AtomicU64::new(1)), status_store: store.clone(), shutdown, }; let worker = EmailQueueWorker { receiver: QueueReceiver::Bounded(rx), requeue_sender: sender, status_store: store, shutdown_rx, }; (queue, worker) } } impl EmailQueue { pub fn enqueue(&self, request: SendEmailRequest) -> Result { if *self.shutdown.tx.borrow() { return Err(QueueError::Closed); } let id = self.next_job_id()?; self.status_store.set_queued(id); let job = EmailJob { id, request: Arc::new(request), failed_attempts: 0, }; if let Err(err) = self.sender.send(job) { self.status_store.remove(id); return Err(err); } info!(id, "email job enqueued"); Ok(id) } /// Enqueues multiple email requests. /// /// Returns `Ok(ids)` on full success. On partial failure, returns /// `Err(QueueError)` — note that some emails may have already been /// enqueued before the failure. Callers should handle duplicates /// if retrying the full batch. pub fn enqueue_batch(&self, requests: I) -> Result, QueueError> where I: IntoIterator, { requests.into_iter().map(|r| self.enqueue(r)).collect() } pub fn status_store(&self) -> &JobStatusStore { &self.status_store } fn next_job_id(&self) -> Result { self.next_id .fetch_update(Ordering::Relaxed, Ordering::Relaxed, |id| id.checked_add(1)) .map_err(|_| QueueError::IdExhausted) } } impl EmailQueueWorker { pub async fn run(mut self, mut consume: F) where F: FnMut(EmailJob) -> Fut, Fut: Future>, { loop { tokio::select! { changed = self.shutdown_rx.changed() => { if changed.is_err() || *self.shutdown_rx.borrow() { info!("queue worker stopped: shutdown requested"); break; } } job = self.receiver.recv() => { let Some(job) = job else { info!("queue worker stopped: channel closed"); break; }; self.consume_job(job, &mut consume).await; } } } } pub fn spawn(self, consume: F) -> tokio::task::JoinHandle<()> where F: FnMut(EmailJob) -> Fut + Send + 'static, Fut: Future> + Send + 'static, { tokio::spawn(async move { self.run(consume).await }) } async fn consume_job(&self, mut job: EmailJob, consume: &mut F) where F: FnMut(EmailJob) -> Fut, Fut: Future>, { self.status_store.set_sending(job.id); match consume(job.clone()).await { Ok(()) => { info!(id = job.id, "email sent"); self.status_store.set_sent(job.id); } Err(err) => { if err.is_terminal() { warn!(id = job.id, %err, "terminal error, destroying job"); self.status_store.set_failed(job.id, err.to_string()); return; } job.failed_attempts = job.failed_attempts.saturating_add(1); if job.failed_attempts >= MAX_FAILURES { error!( id = job.id, %err, attempts = job.failed_attempts, "max failures reached, destroying job" ); self.status_store.set_failed(job.id, err.to_string()); return; } let delay = retry_delay(job.failed_attempts, job.id); warn!( id = job.id, %err, attempt = job.failed_attempts, max = MAX_FAILURES, delay_ms = delay.as_millis(), "retryable failure, requeuing after backoff" ); if *self.shutdown_rx.borrow() { self.status_store .set_failed(job.id, "shutdown before retry".to_owned()); return; } let mut shutdown_rx = self.shutdown_rx.clone(); tokio::select! { _ = tokio::time::sleep(delay) => {} changed = shutdown_rx.changed() => { if changed.is_err() || *shutdown_rx.borrow() { self.status_store.set_failed(job.id, "shutdown before retry".to_owned()); return; } } } let requeue_id = job.id; if let Err(e) = self.requeue_sender.send(job) { error!(id = requeue_id, %e, "failed to requeue"); self.status_store .set_failed(requeue_id, format!("requeue failed: {e}")); } } } } pub fn status_store(&self) -> &JobStatusStore { &self.status_store } } impl fmt::Debug for EmailJob { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("EmailJob") .field("id", &self.id) .field("failed_attempts", &self.failed_attempts) .finish_non_exhaustive() } }