use emailks::{ error::EmailError, pb::email::v1::{EmailAddress, SendEmailRequest}, queue::EmailQueue, }; use std::sync::{ Arc, atomic::{AtomicUsize, Ordering}, }; use tokio::sync::Notify; fn dummy_request() -> SendEmailRequest { SendEmailRequest { from: None, to: vec![], cc: vec![], bcc: vec![], subject: "test".into(), text_body: "body".into(), html_body: String::new(), attachments: vec![], priority: 0, headers: Default::default(), reply_to: String::new(), } } #[tokio::test] async fn enqueue_and_consume_success() { let (queue, worker) = EmailQueue::unbounded(); let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); let notify = Arc::new(Notify::new()); let n = notify.clone(); let mut req = dummy_request(); req.to.push(EmailAddress { email: "test@example.com".into(), name: String::new(), }); let id = queue.enqueue(req).expect("enqueue should work"); assert!(id > 0); worker.spawn(move |_job| { let c = c.clone(); let n = n.clone(); async move { c.fetch_add(1, Ordering::SeqCst); n.notify_one(); Ok::<(), EmailError>(()) } }); // Wait for async consumption with proper signalling notify.notified().await; assert_eq!(counter.load(Ordering::SeqCst), 1); } #[tokio::test] async fn retry_then_succeed() { let (queue, worker) = EmailQueue::unbounded(); let attempts = Arc::new(AtomicUsize::new(0)); let a = attempts.clone(); let notify = Arc::new(Notify::new()); let n = notify.clone(); let _id = queue.enqueue(dummy_request()).unwrap(); worker.spawn(move |_job| { let a = a.clone(); let n = n.clone(); async move { let count = a.fetch_add(1, Ordering::SeqCst); if count < 2 { Err(EmailError::Send("temp failure".into())) } else { n.notify_one(); Ok(()) } } }); // Wait for final success notification notify.notified().await; assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success } #[tokio::test] async fn terminal_error_destroyed_immediately() { let (queue, worker) = EmailQueue::unbounded(); let store = queue.status_store().clone(); let attempts = Arc::new(AtomicUsize::new(0)); let a = attempts.clone(); let notify = Arc::new(Notify::new()); let n = notify.clone(); let id = queue.enqueue(dummy_request()).unwrap(); worker.spawn(move |_job| { let a = a.clone(); let n = n.clone(); async move { a.fetch_add(1, Ordering::SeqCst); n.notify_one(); Err(EmailError::MissingRecipients) } }); notify.notified().await; assert_eq!(attempts.load(Ordering::SeqCst), 1); let entry = store.get(id).expect("should have status entry"); assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Failed); } #[tokio::test] async fn bounded_channel_blocks_when_full() { let (queue, _worker) = EmailQueue::bounded(1); let _id1 = queue.enqueue(dummy_request()).unwrap(); // Second enqueue should fail with Full let err = queue.enqueue(dummy_request()).unwrap_err(); assert!(matches!(err, emailks::error::QueueError::Full)); } #[tokio::test] async fn status_store_tracks_lifecycle() { let (queue, worker) = EmailQueue::unbounded(); let store = queue.status_store().clone(); let notify = Arc::new(Notify::new()); let n = notify.clone(); let id = queue.enqueue(dummy_request()).unwrap(); // Status should be Queued let entry = store.get(id).unwrap(); assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued); worker.spawn(move |_job| { let n = n.clone(); async move { n.notify_one(); Ok::<(), EmailError>(()) } }); notify.notified().await; let entry = store.get(id).unwrap(); assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent); }