c4824ef261
- Create Helm chart structure with Chart.yaml and values.yaml - Add deployment template with container configuration and environment variables - Implement service template for gRPC port exposure - Add service account template with security configuration - Include horizontal pod autoscaler template for scaling capabilities - Add helper templates for naming and label management - Configure SMTP settings as configurable parameters in values.yaml - Set up resource limits and requests for container performance - Implement liveness and readiness probes for health checks - Add support for existing secrets and custom configurations
150 lines
4.1 KiB
Rust
150 lines
4.1 KiB
Rust
use emailks::{
|
|
error::EmailError,
|
|
pb::email::v1::{EmailAddress, SendEmailRequest},
|
|
queue::EmailQueue,
|
|
};
|
|
use std::sync::{
|
|
Arc,
|
|
atomic::{AtomicUsize, Ordering},
|
|
};
|
|
use tokio::sync::Notify;
|
|
|
|
fn dummy_request() -> SendEmailRequest {
|
|
SendEmailRequest {
|
|
from: None,
|
|
to: vec![],
|
|
cc: vec![],
|
|
bcc: vec![],
|
|
subject: "test".into(),
|
|
text_body: "body".into(),
|
|
html_body: String::new(),
|
|
attachments: vec![],
|
|
priority: 0,
|
|
headers: Default::default(),
|
|
reply_to: String::new(),
|
|
}
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn enqueue_and_consume_success() {
|
|
let (queue, worker) = EmailQueue::unbounded();
|
|
let counter = Arc::new(AtomicUsize::new(0));
|
|
let c = counter.clone();
|
|
let notify = Arc::new(Notify::new());
|
|
let n = notify.clone();
|
|
|
|
let mut req = dummy_request();
|
|
req.to.push(EmailAddress {
|
|
email: "test@example.com".into(),
|
|
name: String::new(),
|
|
});
|
|
|
|
let id = queue.enqueue(req).expect("enqueue should work");
|
|
assert!(id > 0);
|
|
|
|
worker.spawn(move |_job| {
|
|
let c = c.clone();
|
|
let n = n.clone();
|
|
async move {
|
|
c.fetch_add(1, Ordering::SeqCst);
|
|
n.notify_one();
|
|
Ok::<(), EmailError>(())
|
|
}
|
|
});
|
|
|
|
// Wait for async consumption with proper signalling
|
|
notify.notified().await;
|
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn retry_then_succeed() {
|
|
let (queue, worker) = EmailQueue::unbounded();
|
|
let attempts = Arc::new(AtomicUsize::new(0));
|
|
let a = attempts.clone();
|
|
let notify = Arc::new(Notify::new());
|
|
let n = notify.clone();
|
|
|
|
let _id = queue.enqueue(dummy_request()).unwrap();
|
|
|
|
worker.spawn(move |_job| {
|
|
let a = a.clone();
|
|
let n = n.clone();
|
|
async move {
|
|
let count = a.fetch_add(1, Ordering::SeqCst);
|
|
if count < 2 {
|
|
Err(EmailError::Send("temp failure".into()))
|
|
} else {
|
|
n.notify_one();
|
|
Ok(())
|
|
}
|
|
}
|
|
});
|
|
|
|
// Wait for final success notification
|
|
notify.notified().await;
|
|
assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn terminal_error_destroyed_immediately() {
|
|
let (queue, worker) = EmailQueue::unbounded();
|
|
let store = queue.status_store().clone();
|
|
let attempts = Arc::new(AtomicUsize::new(0));
|
|
let a = attempts.clone();
|
|
let notify = Arc::new(Notify::new());
|
|
let n = notify.clone();
|
|
|
|
let id = queue.enqueue(dummy_request()).unwrap();
|
|
|
|
worker.spawn(move |_job| {
|
|
let a = a.clone();
|
|
let n = n.clone();
|
|
async move {
|
|
a.fetch_add(1, Ordering::SeqCst);
|
|
n.notify_one();
|
|
Err(EmailError::MissingRecipients)
|
|
}
|
|
});
|
|
|
|
notify.notified().await;
|
|
assert_eq!(attempts.load(Ordering::SeqCst), 1);
|
|
|
|
let entry = store.get(id).expect("should have status entry");
|
|
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Failed);
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn bounded_channel_blocks_when_full() {
|
|
let (queue, _worker) = EmailQueue::bounded(1);
|
|
let _id1 = queue.enqueue(dummy_request()).unwrap();
|
|
// Second enqueue should fail with Full
|
|
let err = queue.enqueue(dummy_request()).unwrap_err();
|
|
assert!(matches!(err, emailks::error::QueueError::Full));
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn status_store_tracks_lifecycle() {
|
|
let (queue, worker) = EmailQueue::unbounded();
|
|
let store = queue.status_store().clone();
|
|
let notify = Arc::new(Notify::new());
|
|
let n = notify.clone();
|
|
|
|
let id = queue.enqueue(dummy_request()).unwrap();
|
|
// Status should be Queued
|
|
let entry = store.get(id).unwrap();
|
|
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued);
|
|
|
|
worker.spawn(move |_job| {
|
|
let n = n.clone();
|
|
async move {
|
|
n.notify_one();
|
|
Ok::<(), EmailError>(())
|
|
}
|
|
});
|
|
|
|
notify.notified().await;
|
|
let entry = store.get(id).unwrap();
|
|
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent);
|
|
}
|