Files
zhenyi c4824ef261 feat(k8s): add Kubernetes Helm chart for emailks service
- Create Helm chart structure with Chart.yaml and values.yaml
- Add deployment template with container configuration and environment variables
- Implement service template for gRPC port exposure
- Add service account template with security configuration
- Include horizontal pod autoscaler template for scaling capabilities
- Add helper templates for naming and label management
- Configure SMTP settings as configurable parameters in values.yaml
- Set up resource limits and requests for container performance
- Implement liveness and readiness probes for health checks
- Add support for existing secrets and custom configurations
2026-06-07 22:59:06 +08:00

150 lines
4.1 KiB
Rust

use emailks::{
error::EmailError,
pb::email::v1::{EmailAddress, SendEmailRequest},
queue::EmailQueue,
};
use std::sync::{
Arc,
atomic::{AtomicUsize, Ordering},
};
use tokio::sync::Notify;
fn dummy_request() -> SendEmailRequest {
SendEmailRequest {
from: None,
to: vec![],
cc: vec![],
bcc: vec![],
subject: "test".into(),
text_body: "body".into(),
html_body: String::new(),
attachments: vec![],
priority: 0,
headers: Default::default(),
reply_to: String::new(),
}
}
#[tokio::test]
async fn enqueue_and_consume_success() {
let (queue, worker) = EmailQueue::unbounded();
let counter = Arc::new(AtomicUsize::new(0));
let c = counter.clone();
let notify = Arc::new(Notify::new());
let n = notify.clone();
let mut req = dummy_request();
req.to.push(EmailAddress {
email: "test@example.com".into(),
name: String::new(),
});
let id = queue.enqueue(req).expect("enqueue should work");
assert!(id > 0);
worker.spawn(move |_job| {
let c = c.clone();
let n = n.clone();
async move {
c.fetch_add(1, Ordering::SeqCst);
n.notify_one();
Ok::<(), EmailError>(())
}
});
// Wait for async consumption with proper signalling
notify.notified().await;
assert_eq!(counter.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn retry_then_succeed() {
let (queue, worker) = EmailQueue::unbounded();
let attempts = Arc::new(AtomicUsize::new(0));
let a = attempts.clone();
let notify = Arc::new(Notify::new());
let n = notify.clone();
let _id = queue.enqueue(dummy_request()).unwrap();
worker.spawn(move |_job| {
let a = a.clone();
let n = n.clone();
async move {
let count = a.fetch_add(1, Ordering::SeqCst);
if count < 2 {
Err(EmailError::Send("temp failure".into()))
} else {
n.notify_one();
Ok(())
}
}
});
// Wait for final success notification
notify.notified().await;
assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success
}
#[tokio::test]
async fn terminal_error_destroyed_immediately() {
let (queue, worker) = EmailQueue::unbounded();
let store = queue.status_store().clone();
let attempts = Arc::new(AtomicUsize::new(0));
let a = attempts.clone();
let notify = Arc::new(Notify::new());
let n = notify.clone();
let id = queue.enqueue(dummy_request()).unwrap();
worker.spawn(move |_job| {
let a = a.clone();
let n = n.clone();
async move {
a.fetch_add(1, Ordering::SeqCst);
n.notify_one();
Err(EmailError::MissingRecipients)
}
});
notify.notified().await;
assert_eq!(attempts.load(Ordering::SeqCst), 1);
let entry = store.get(id).expect("should have status entry");
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Failed);
}
#[tokio::test]
async fn bounded_channel_blocks_when_full() {
let (queue, _worker) = EmailQueue::bounded(1);
let _id1 = queue.enqueue(dummy_request()).unwrap();
// Second enqueue should fail with Full
let err = queue.enqueue(dummy_request()).unwrap_err();
assert!(matches!(err, emailks::error::QueueError::Full));
}
#[tokio::test]
async fn status_store_tracks_lifecycle() {
let (queue, worker) = EmailQueue::unbounded();
let store = queue.status_store().clone();
let notify = Arc::new(Notify::new());
let n = notify.clone();
let id = queue.enqueue(dummy_request()).unwrap();
// Status should be Queued
let entry = store.get(id).unwrap();
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued);
worker.spawn(move |_job| {
let n = n.clone();
async move {
n.notify_one();
Ok::<(), EmailError>(())
}
});
notify.notified().await;
let entry = store.get(id).unwrap();
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent);
}