From c4824ef261a3bf424692bf26518dbff90acaea35 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Sun, 7 Jun 2026 22:59:06 +0800 Subject: [PATCH] feat(k8s): add Kubernetes Helm chart for emailks service - Create Helm chart structure with Chart.yaml and values.yaml - Add deployment template with container configuration and environment variables - Implement service template for gRPC port exposure - Add service account template with security configuration - Include horizontal pod autoscaler template for scaling capabilities - Add helper templates for naming and label management - Configure SMTP settings as configurable parameters in values.yaml - Set up resource limits and requests for container performance - Implement liveness and readiness probes for health checks - Add support for existing secrets and custom configurations --- Dockerfile | 2 +- charts/Chart.yaml | 6 ++ charts/templates/_helpers.tpl | 38 ++++++++++ charts/templates/deployment.yaml | 109 +++++++++++++++++++++++++++ charts/templates/hpa.yaml | 24 ++++++ charts/templates/service.yaml | 15 ++++ charts/templates/serviceaccount.yaml | 13 ++++ charts/values.yaml | 68 +++++++++++++++++ config.rs | 3 +- email_build.rs | 11 +++ error.rs | 2 +- lib.rs | 2 + main.rs | 1 + queue.rs | 2 + server.rs | 26 ++++++- status.rs | 26 ++----- tests/queue_tests.rs | 38 ++++++++-- 17 files changed, 353 insertions(+), 33 deletions(-) create mode 100644 charts/Chart.yaml create mode 100644 charts/templates/_helpers.tpl create mode 100644 charts/templates/deployment.yaml create mode 100644 charts/templates/hpa.yaml create mode 100644 charts/templates/service.yaml create mode 100644 charts/templates/serviceaccount.yaml create mode 100644 charts/values.yaml diff --git a/Dockerfile b/Dockerfile index 649a53a..57f7a66 100644 --- a/Dockerfile +++ b/Dockerfile @@ -15,7 +15,7 @@ COPY build.rs ./ COPY proto/ proto/ RUN echo '' >lib.rs && \ echo 'fn main() {}' >main.rs && \ - cargo build --release; \ + cargo build --release --bin emailks; \ rm -f lib.rs main.rs # Build real binary diff --git a/charts/Chart.yaml b/charts/Chart.yaml new file mode 100644 index 0000000..e1d9196 --- /dev/null +++ b/charts/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: emailks +description: Internal gRPC email relay service +type: application +version: 0.1.0 +appVersion: "0.1.0" diff --git a/charts/templates/_helpers.tpl b/charts/templates/_helpers.tpl new file mode 100644 index 0000000..c6f40c8 --- /dev/null +++ b/charts/templates/_helpers.tpl @@ -0,0 +1,38 @@ +{{- define "emailks.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{- define "emailks.fullname" -}} +{{- if .Values.fullnameOverride }} +{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }} +{{- else }} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} +{{- end }} + +{{- define "emailks.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{- define "emailks.labels" -}} +helm.sh/chart: {{ include "emailks.chart" . }} +{{ include "emailks.selectorLabels" . }} +{{- if .Chart.AppVersion }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +{{- end }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{- define "emailks.selectorLabels" -}} +app.kubernetes.io/name: {{ include "emailks.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{- define "emailks.serviceAccountName" -}} +{{- if .Values.serviceAccount.create }} +{{- default (include "emailks.fullname" .) .Values.serviceAccount.name }} +{{- else }} +{{- default "default" .Values.serviceAccount.name }} +{{- end }} +{{- end }} diff --git a/charts/templates/deployment.yaml b/charts/templates/deployment.yaml new file mode 100644 index 0000000..ab2e434 --- /dev/null +++ b/charts/templates/deployment.yaml @@ -0,0 +1,109 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "emailks.fullname" . }} + labels: + {{- include "emailks.labels" . | nindent 4 }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + {{- include "emailks.selectorLabels" . | nindent 6 }} + template: + metadata: + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + labels: + {{- include "emailks.selectorLabels" . | nindent 8 }} + {{- with .Values.podLabels }} + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + serviceAccountName: {{ include "emailks.serviceAccountName" . }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: {{ .Chart.Name }} + securityContext: + {{- toYaml .Values.securityContext | nindent 12 }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: grpc + containerPort: 50051 + protocol: TCP + env: + - name: APP_SMTP_HOST + value: {{ required "smtp.host is required" .Values.smtp.host | quote }} + - name: APP_SMTP_PORT + value: {{ .Values.smtp.port | quote }} + - name: APP_SMTP_TLS + value: {{ .Values.smtp.tls | quote }} + {{- with .Values.smtp.username }} + - name: APP_SMTP_USERNAME + value: {{ . | quote }} + {{- end }} + {{- if .Values.existingSecret.name }} + - name: APP_SMTP_PASSWORD + valueFrom: + secretKeyRef: + name: {{ .Values.existingSecret.name }} + key: {{ .Values.existingSecret.passwordKey }} + {{- else if .Values.smtp.password }} + - name: APP_SMTP_PASSWORD + value: {{ .Values.smtp.password | quote }} + {{- end }} + - name: APP_SMTP_FROM_EMAIL + value: {{ .Values.smtp.fromEmail | quote }} + {{- with .Values.smtp.fromName }} + - name: APP_SMTP_FROM_NAME + value: {{ . | quote }} + {{- end }} + {{- with .Values.smtp.replyTo }} + - name: APP_SMTP_REPLY_TO + value: {{ . | quote }} + {{- end }} + - name: APP_SMTP_TIMEOUT_SECS + value: {{ .Values.smtp.timeoutSecs | quote }} + {{- with .Values.smtp.heloName }} + - name: APP_SMTP_HELO_NAME + value: {{ . | quote }} + {{- end }} + - name: APP_SMTP_ALLOW_REQUEST_FROM + value: {{ .Values.smtp.allowRequestFrom | quote }} + - name: APP_SMTP_QUEUE_CAPACITY + value: {{ .Values.queue.capacity | quote }} + - name: APP_SMTP_LISTEN_ADDR + value: {{ .Values.listenAddr | quote }} + - name: RUST_LOG + value: {{ .Values.logLevel | quote }} + livenessProbe: + tcpSocket: + port: grpc + initialDelaySeconds: 5 + periodSeconds: 10 + readinessProbe: + tcpSocket: + port: grpc + initialDelaySeconds: 3 + periodSeconds: 5 + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/charts/templates/hpa.yaml b/charts/templates/hpa.yaml new file mode 100644 index 0000000..1f02df9 --- /dev/null +++ b/charts/templates/hpa.yaml @@ -0,0 +1,24 @@ +{{- if .Values.autoscaling.enabled }} +apiVersion: autoscaling/v2 +kind: HorizontalPodAutoscaler +metadata: + name: {{ include "emailks.fullname" . }} + labels: + {{- include "emailks.labels" . | nindent 4 }} +spec: + scaleTargetRef: + apiVersion: apps/v1 + kind: Deployment + name: {{ include "emailks.fullname" . }} + minReplicas: {{ .Values.autoscaling.minReplicas }} + maxReplicas: {{ .Values.autoscaling.maxReplicas }} + metrics: + {{- if .Values.autoscaling.targetCPUUtilizationPercentage }} + - type: Resource + resource: + name: cpu + target: + type: Utilization + averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }} + {{- end }} +{{- end }} diff --git a/charts/templates/service.yaml b/charts/templates/service.yaml new file mode 100644 index 0000000..7292316 --- /dev/null +++ b/charts/templates/service.yaml @@ -0,0 +1,15 @@ +apiVersion: v1 +kind: Service +metadata: + name: {{ include "emailks.fullname" . }} + labels: + {{- include "emailks.labels" . | nindent 4 }} +spec: + type: {{ .Values.service.type }} + ports: + - port: {{ .Values.service.port }} + targetPort: {{ .Values.service.targetPort }} + protocol: TCP + name: grpc + selector: + {{- include "emailks.selectorLabels" . | nindent 4 }} diff --git a/charts/templates/serviceaccount.yaml b/charts/templates/serviceaccount.yaml new file mode 100644 index 0000000..3d3d5df --- /dev/null +++ b/charts/templates/serviceaccount.yaml @@ -0,0 +1,13 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "emailks.serviceAccountName" . }} + labels: + {{- include "emailks.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +automountServiceAccountToken: false +{{- end }} diff --git a/charts/values.yaml b/charts/values.yaml new file mode 100644 index 0000000..39854cf --- /dev/null +++ b/charts/values.yaml @@ -0,0 +1,68 @@ +replicaCount: 1 + +image: + repository: emailks + pullPolicy: IfNotPresent + tag: "" + +imagePullSecrets: [] +nameOverride: "" +fullnameOverride: "" + +serviceAccount: + create: true + annotations: {} + name: "" + +podAnnotations: {} +podLabels: {} + +podSecurityContext: {} +securityContext: {} + +service: + type: ClusterIP + port: 50051 + targetPort: 50051 + +resources: + limits: + cpu: 500m + memory: 256Mi + requests: + cpu: 100m + memory: 64Mi + +autoscaling: + enabled: false + minReplicas: 1 + maxReplicas: 5 + targetCPUUtilizationPercentage: 80 + +nodeSelector: {} +tolerations: [] +affinity: {} + +smtp: + host: smtp.example.com + port: 587 + tls: starttls + username: "" + password: "" + fromEmail: noreply@example.com + fromName: EmailKS + replyTo: "" + timeoutSecs: 30 + heloName: "" + allowRequestFrom: false + +queue: + capacity: 1000 + +listenAddr: "0.0.0.0:50051" + +logLevel: info + +existingSecret: + name: "" + passwordKey: "password" diff --git a/config.rs b/config.rs index 4a5377f..3f80e37 100644 --- a/config.rs +++ b/config.rs @@ -1,9 +1,8 @@ use std::{env, fmt, net::SocketAddr, time::Duration}; pub use crate::error::ConfigError; +use crate::ENV_PREFIX; use tracing; - -const ENV_PREFIX: &str = "APP_SMTP_"; const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051"; #[derive(Debug, Clone, PartialEq, Eq)] diff --git a/email_build.rs b/email_build.rs index 0bb2b83..d893986 100644 --- a/email_build.rs +++ b/email_build.rs @@ -6,6 +6,7 @@ use lettre::{ }, transport::smtp::{self, authentication::Credentials, extension::ClientId}, }; +use tracing; use crate::{ config::{SmtpConfig, SmtpTls}, @@ -105,6 +106,13 @@ pub(crate) fn build_single_attachment( url: att.url.clone(), }); } + if !att.data.is_empty() && !att.url.trim().is_empty() { + tracing::warn!( + filename = att.filename, + url = att.url, + "attachment has both inline data and url; url will be ignored" + ); + } let ct: ContentType = if att.content_type.trim().is_empty() { "application/octet-stream" .parse() @@ -228,6 +236,9 @@ pub fn build_message_from_parts( } let from = resolve_sender(config, request)?; + if request.subject.trim().is_empty() { + tracing::warn!("email subject is empty — some SMTP servers may reject it"); + } let mut builder = Message::builder() .from(from) .subject(request.subject.clone()); diff --git a/error.rs b/error.rs index 49837d9..094ba60 100644 --- a/error.rs +++ b/error.rs @@ -1,6 +1,6 @@ use std::fmt; -const ENV_PREFIX: &str = "APP_SMTP_"; +use crate::ENV_PREFIX; #[derive(Debug, Clone, PartialEq, Eq)] pub enum ConfigError { diff --git a/lib.rs b/lib.rs index 5acd80e..6a05a14 100644 --- a/lib.rs +++ b/lib.rs @@ -6,6 +6,8 @@ pub mod queue; pub mod server; pub mod status; +pub(crate) const ENV_PREFIX: &str = "APP_SMTP_"; + pub mod pb { pub mod email { pub mod v1 { diff --git a/main.rs b/main.rs index 324fc06..77b0315 100644 --- a/main.rs +++ b/main.rs @@ -20,6 +20,7 @@ async fn main() -> Result<(), Box> { let sender = EmailSender::new(config.smtp)?; let (queue, worker) = match config.queue_capacity { + // `Some(0)` explicitly opts into an unbounded queue (mainly for testing). Some(0) => { info!("creating unbounded queue by explicit configuration"); EmailQueue::unbounded() diff --git a/queue.rs b/queue.rs index a769f21..580c532 100644 --- a/queue.rs +++ b/queue.rs @@ -156,6 +156,8 @@ impl EmailQueue { if *self.shutdown.tx.borrow() { return Err(QueueError::Closed); } + // NOTE: ID is consumed even if the send below fails (channel full, closed). + // The u64 space is large enough that this is inconsequential in practice. let id = self.next_job_id()?; self.status_store.set_queued(id); let job = EmailJob { diff --git a/server.rs b/server.rs index 06bd275..84696aa 100644 --- a/server.rs +++ b/server.rs @@ -8,6 +8,8 @@ use tonic::{Request, Response, Status}; use tracing::warn; const STREAM_STATUS_POLL_INTERVAL: Duration = Duration::from_millis(300); +/// Maximum lifetime of a single streaming batch-status RPC. +/// Protects against leaked streams when jobs never reach a terminal state. const STREAM_STATUS_TIMEOUT: Duration = Duration::from_secs(10 * 60); use crate::{ @@ -75,12 +77,12 @@ impl EmailService for EmailServiceImpl { request: Request, ) -> Result, Status> { let req = request.into_inner(); - let total = req.emails.len(); + let total = req.emails.len() as i32; let mut success = 0i32; let mut failures = 0i32; - let mut results = Vec::with_capacity(total); + let mut results = Vec::with_capacity(total as usize); - for email in req.emails { + for (i, email) in req.emails.into_iter().enumerate() { match self.queue.enqueue(email) { Ok(id) => { success += 1; @@ -90,6 +92,8 @@ impl EmailService for EmailServiceImpl { failures += 1; warn!(%e, "batch enqueue failed for one email"); if req.fail_fast { + // Count remaining unprocessed emails as failures too. + failures += total - (i as i32) - 1; warn!( successful = success, failed = failures, @@ -152,6 +156,7 @@ impl EmailService for EmailServiceImpl { let id_set: std::collections::HashSet = ids.iter().copied().collect(); let store = self.store.clone(); + let mut missing_streak: std::collections::HashMap = std::collections::HashMap::new(); let (tx, rx) = mpsc::channel(ids.len().saturating_add(immediate_results.len()).max(1)); tokio::spawn(async move { @@ -186,6 +191,7 @@ impl EmailService for EmailServiceImpl { continue; } if let Some(entry) = store.get(*id) { + missing_streak.remove(id); match entry.status { SendStatus::Sent => { if tx @@ -209,6 +215,20 @@ impl EmailService for EmailServiceImpl { } _ => {} } + } else { + // Status entry may have been evicted under memory pressure. + // Report as failed after a few consecutive misses. + let streak = missing_streak.entry(*id).and_modify(|c| *c += 1).or_insert(1); + if *streak >= 5 { + if tx.send(Ok(build_failed_response( + Some(*id), + "status entry evicted before terminal state".into(), + ))).await.is_err() { + return; + } + reported.insert(*id); + missing_streak.remove(id); + } } } diff --git a/status.rs b/status.rs index 54a46cc..0d95236 100644 --- a/status.rs +++ b/status.rs @@ -68,18 +68,7 @@ impl JobStatusStore { guard.remove(&id); } - pub fn all_done(&self, ids: &[u64]) -> bool { - let guard = match self.inner.read() { - Ok(g) => g, - Err(_) => return false, - }; - ids.iter().all(|id| { - matches!( - guard.get(id).map(|e| e.status), - Some(SendStatus::Sent | SendStatus::Failed) - ) - }) - } + fn write(&self, id: u64, status: SendStatus, error: Option) { let mut guard = match self.inner.write() { @@ -105,14 +94,15 @@ fn prune_statuses(entries: &mut HashMap) { let now = Instant::now(); entries.retain(|_, entry| now.duration_since(entry.updated_at) <= STATUS_TTL); - while entries.len() >= MAX_STATUS_ENTRIES { - let Some(oldest_id) = entries + // Evict at most one entry per write to avoid O(n²) behaviour under load. + // Repeated writes will gradually shrink the map if it stays above capacity. + if entries.len() >= MAX_STATUS_ENTRIES { + if let Some(oldest_id) = entries .iter() .min_by_key(|(_, entry)| entry.updated_at) .map(|(id, _)| *id) - else { - break; - }; - entries.remove(&oldest_id); + { + entries.remove(&oldest_id); + } } } diff --git a/tests/queue_tests.rs b/tests/queue_tests.rs index 1d19ac9..c54ff66 100644 --- a/tests/queue_tests.rs +++ b/tests/queue_tests.rs @@ -7,6 +7,7 @@ use std::sync::{ Arc, atomic::{AtomicUsize, Ordering}, }; +use tokio::sync::Notify; fn dummy_request() -> SendEmailRequest { SendEmailRequest { @@ -29,6 +30,8 @@ async fn enqueue_and_consume_success() { let (queue, worker) = EmailQueue::unbounded(); let counter = Arc::new(AtomicUsize::new(0)); let c = counter.clone(); + let notify = Arc::new(Notify::new()); + let n = notify.clone(); let mut req = dummy_request(); req.to.push(EmailAddress { @@ -41,14 +44,16 @@ async fn enqueue_and_consume_success() { worker.spawn(move |_job| { let c = c.clone(); + let n = n.clone(); async move { c.fetch_add(1, Ordering::SeqCst); + n.notify_one(); Ok::<(), EmailError>(()) } }); - // Wait for async consumption - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + // Wait for async consumption with proper signalling + notify.notified().await; assert_eq!(counter.load(Ordering::SeqCst), 1); } @@ -57,22 +62,27 @@ async fn retry_then_succeed() { let (queue, worker) = EmailQueue::unbounded(); let attempts = Arc::new(AtomicUsize::new(0)); let a = attempts.clone(); + let notify = Arc::new(Notify::new()); + let n = notify.clone(); let _id = queue.enqueue(dummy_request()).unwrap(); worker.spawn(move |_job| { let a = a.clone(); + let n = n.clone(); async move { - let n = a.fetch_add(1, Ordering::SeqCst); - if n < 2 { + let count = a.fetch_add(1, Ordering::SeqCst); + if count < 2 { Err(EmailError::Send("temp failure".into())) } else { + n.notify_one(); Ok(()) } } }); - tokio::time::sleep(std::time::Duration::from_millis(700)).await; + // Wait for final success notification + notify.notified().await; assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success } @@ -82,18 +92,22 @@ async fn terminal_error_destroyed_immediately() { let store = queue.status_store().clone(); let attempts = Arc::new(AtomicUsize::new(0)); let a = attempts.clone(); + let notify = Arc::new(Notify::new()); + let n = notify.clone(); let id = queue.enqueue(dummy_request()).unwrap(); worker.spawn(move |_job| { let a = a.clone(); + let n = n.clone(); async move { a.fetch_add(1, Ordering::SeqCst); + n.notify_one(); Err(EmailError::MissingRecipients) } }); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + notify.notified().await; assert_eq!(attempts.load(Ordering::SeqCst), 1); let entry = store.get(id).expect("should have status entry"); @@ -113,15 +127,23 @@ async fn bounded_channel_blocks_when_full() { async fn status_store_tracks_lifecycle() { let (queue, worker) = EmailQueue::unbounded(); let store = queue.status_store().clone(); + let notify = Arc::new(Notify::new()); + let n = notify.clone(); let id = queue.enqueue(dummy_request()).unwrap(); // Status should be Queued let entry = store.get(id).unwrap(); assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued); - worker.spawn(move |_job| async move { Ok::<(), EmailError>(()) }); + worker.spawn(move |_job| { + let n = n.clone(); + async move { + n.notify_one(); + Ok::<(), EmailError>(()) + } + }); - tokio::time::sleep(std::time::Duration::from_millis(100)).await; + notify.notified().await; let entry = store.get(id).unwrap(); assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent); }