feat(k8s): add Kubernetes Helm chart for emailks service
- Create Helm chart structure with Chart.yaml and values.yaml - Add deployment template with container configuration and environment variables - Implement service template for gRPC port exposure - Add service account template with security configuration - Include horizontal pod autoscaler template for scaling capabilities - Add helper templates for naming and label management - Configure SMTP settings as configurable parameters in values.yaml - Set up resource limits and requests for container performance - Implement liveness and readiness probes for health checks - Add support for existing secrets and custom configurations
This commit is contained in:
+1
-1
@@ -15,7 +15,7 @@ COPY build.rs ./
|
|||||||
COPY proto/ proto/
|
COPY proto/ proto/
|
||||||
RUN echo '' >lib.rs && \
|
RUN echo '' >lib.rs && \
|
||||||
echo 'fn main() {}' >main.rs && \
|
echo 'fn main() {}' >main.rs && \
|
||||||
cargo build --release; \
|
cargo build --release --bin emailks; \
|
||||||
rm -f lib.rs main.rs
|
rm -f lib.rs main.rs
|
||||||
|
|
||||||
# Build real binary
|
# Build real binary
|
||||||
|
|||||||
@@ -0,0 +1,6 @@
|
|||||||
|
apiVersion: v2
|
||||||
|
name: emailks
|
||||||
|
description: Internal gRPC email relay service
|
||||||
|
type: application
|
||||||
|
version: 0.1.0
|
||||||
|
appVersion: "0.1.0"
|
||||||
@@ -0,0 +1,38 @@
|
|||||||
|
{{- define "emailks.name" -}}
|
||||||
|
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
|
||||||
|
{{- end }}
|
||||||
|
|
||||||
|
{{- define "emailks.fullname" -}}
|
||||||
|
{{- if .Values.fullnameOverride }}
|
||||||
|
{{- .Values.fullnameOverride | trunc 63 | trimSuffix "-" }}
|
||||||
|
{{- else }}
|
||||||
|
{{- $name := default .Chart.Name .Values.nameOverride }}
|
||||||
|
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
|
||||||
|
{{- end }}
|
||||||
|
{{- end }}
|
||||||
|
|
||||||
|
{{- define "emailks.chart" -}}
|
||||||
|
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" }}
|
||||||
|
{{- end }}
|
||||||
|
|
||||||
|
{{- define "emailks.labels" -}}
|
||||||
|
helm.sh/chart: {{ include "emailks.chart" . }}
|
||||||
|
{{ include "emailks.selectorLabels" . }}
|
||||||
|
{{- if .Chart.AppVersion }}
|
||||||
|
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
|
||||||
|
{{- end }}
|
||||||
|
app.kubernetes.io/managed-by: {{ .Release.Service }}
|
||||||
|
{{- end }}
|
||||||
|
|
||||||
|
{{- define "emailks.selectorLabels" -}}
|
||||||
|
app.kubernetes.io/name: {{ include "emailks.name" . }}
|
||||||
|
app.kubernetes.io/instance: {{ .Release.Name }}
|
||||||
|
{{- end }}
|
||||||
|
|
||||||
|
{{- define "emailks.serviceAccountName" -}}
|
||||||
|
{{- if .Values.serviceAccount.create }}
|
||||||
|
{{- default (include "emailks.fullname" .) .Values.serviceAccount.name }}
|
||||||
|
{{- else }}
|
||||||
|
{{- default "default" .Values.serviceAccount.name }}
|
||||||
|
{{- end }}
|
||||||
|
{{- end }}
|
||||||
@@ -0,0 +1,109 @@
|
|||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
metadata:
|
||||||
|
name: {{ include "emailks.fullname" . }}
|
||||||
|
labels:
|
||||||
|
{{- include "emailks.labels" . | nindent 4 }}
|
||||||
|
spec:
|
||||||
|
replicas: {{ .Values.replicaCount }}
|
||||||
|
selector:
|
||||||
|
matchLabels:
|
||||||
|
{{- include "emailks.selectorLabels" . | nindent 6 }}
|
||||||
|
template:
|
||||||
|
metadata:
|
||||||
|
{{- with .Values.podAnnotations }}
|
||||||
|
annotations:
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
|
labels:
|
||||||
|
{{- include "emailks.selectorLabels" . | nindent 8 }}
|
||||||
|
{{- with .Values.podLabels }}
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
|
spec:
|
||||||
|
{{- with .Values.imagePullSecrets }}
|
||||||
|
imagePullSecrets:
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
|
serviceAccountName: {{ include "emailks.serviceAccountName" . }}
|
||||||
|
securityContext:
|
||||||
|
{{- toYaml .Values.podSecurityContext | nindent 8 }}
|
||||||
|
containers:
|
||||||
|
- name: {{ .Chart.Name }}
|
||||||
|
securityContext:
|
||||||
|
{{- toYaml .Values.securityContext | nindent 12 }}
|
||||||
|
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
|
||||||
|
imagePullPolicy: {{ .Values.image.pullPolicy }}
|
||||||
|
ports:
|
||||||
|
- name: grpc
|
||||||
|
containerPort: 50051
|
||||||
|
protocol: TCP
|
||||||
|
env:
|
||||||
|
- name: APP_SMTP_HOST
|
||||||
|
value: {{ required "smtp.host is required" .Values.smtp.host | quote }}
|
||||||
|
- name: APP_SMTP_PORT
|
||||||
|
value: {{ .Values.smtp.port | quote }}
|
||||||
|
- name: APP_SMTP_TLS
|
||||||
|
value: {{ .Values.smtp.tls | quote }}
|
||||||
|
{{- with .Values.smtp.username }}
|
||||||
|
- name: APP_SMTP_USERNAME
|
||||||
|
value: {{ . | quote }}
|
||||||
|
{{- end }}
|
||||||
|
{{- if .Values.existingSecret.name }}
|
||||||
|
- name: APP_SMTP_PASSWORD
|
||||||
|
valueFrom:
|
||||||
|
secretKeyRef:
|
||||||
|
name: {{ .Values.existingSecret.name }}
|
||||||
|
key: {{ .Values.existingSecret.passwordKey }}
|
||||||
|
{{- else if .Values.smtp.password }}
|
||||||
|
- name: APP_SMTP_PASSWORD
|
||||||
|
value: {{ .Values.smtp.password | quote }}
|
||||||
|
{{- end }}
|
||||||
|
- name: APP_SMTP_FROM_EMAIL
|
||||||
|
value: {{ .Values.smtp.fromEmail | quote }}
|
||||||
|
{{- with .Values.smtp.fromName }}
|
||||||
|
- name: APP_SMTP_FROM_NAME
|
||||||
|
value: {{ . | quote }}
|
||||||
|
{{- end }}
|
||||||
|
{{- with .Values.smtp.replyTo }}
|
||||||
|
- name: APP_SMTP_REPLY_TO
|
||||||
|
value: {{ . | quote }}
|
||||||
|
{{- end }}
|
||||||
|
- name: APP_SMTP_TIMEOUT_SECS
|
||||||
|
value: {{ .Values.smtp.timeoutSecs | quote }}
|
||||||
|
{{- with .Values.smtp.heloName }}
|
||||||
|
- name: APP_SMTP_HELO_NAME
|
||||||
|
value: {{ . | quote }}
|
||||||
|
{{- end }}
|
||||||
|
- name: APP_SMTP_ALLOW_REQUEST_FROM
|
||||||
|
value: {{ .Values.smtp.allowRequestFrom | quote }}
|
||||||
|
- name: APP_SMTP_QUEUE_CAPACITY
|
||||||
|
value: {{ .Values.queue.capacity | quote }}
|
||||||
|
- name: APP_SMTP_LISTEN_ADDR
|
||||||
|
value: {{ .Values.listenAddr | quote }}
|
||||||
|
- name: RUST_LOG
|
||||||
|
value: {{ .Values.logLevel | quote }}
|
||||||
|
livenessProbe:
|
||||||
|
tcpSocket:
|
||||||
|
port: grpc
|
||||||
|
initialDelaySeconds: 5
|
||||||
|
periodSeconds: 10
|
||||||
|
readinessProbe:
|
||||||
|
tcpSocket:
|
||||||
|
port: grpc
|
||||||
|
initialDelaySeconds: 3
|
||||||
|
periodSeconds: 5
|
||||||
|
resources:
|
||||||
|
{{- toYaml .Values.resources | nindent 12 }}
|
||||||
|
{{- with .Values.nodeSelector }}
|
||||||
|
nodeSelector:
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
|
{{- with .Values.affinity }}
|
||||||
|
affinity:
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
|
{{- with .Values.tolerations }}
|
||||||
|
tolerations:
|
||||||
|
{{- toYaml . | nindent 8 }}
|
||||||
|
{{- end }}
|
||||||
@@ -0,0 +1,24 @@
|
|||||||
|
{{- if .Values.autoscaling.enabled }}
|
||||||
|
apiVersion: autoscaling/v2
|
||||||
|
kind: HorizontalPodAutoscaler
|
||||||
|
metadata:
|
||||||
|
name: {{ include "emailks.fullname" . }}
|
||||||
|
labels:
|
||||||
|
{{- include "emailks.labels" . | nindent 4 }}
|
||||||
|
spec:
|
||||||
|
scaleTargetRef:
|
||||||
|
apiVersion: apps/v1
|
||||||
|
kind: Deployment
|
||||||
|
name: {{ include "emailks.fullname" . }}
|
||||||
|
minReplicas: {{ .Values.autoscaling.minReplicas }}
|
||||||
|
maxReplicas: {{ .Values.autoscaling.maxReplicas }}
|
||||||
|
metrics:
|
||||||
|
{{- if .Values.autoscaling.targetCPUUtilizationPercentage }}
|
||||||
|
- type: Resource
|
||||||
|
resource:
|
||||||
|
name: cpu
|
||||||
|
target:
|
||||||
|
type: Utilization
|
||||||
|
averageUtilization: {{ .Values.autoscaling.targetCPUUtilizationPercentage }}
|
||||||
|
{{- end }}
|
||||||
|
{{- end }}
|
||||||
@@ -0,0 +1,15 @@
|
|||||||
|
apiVersion: v1
|
||||||
|
kind: Service
|
||||||
|
metadata:
|
||||||
|
name: {{ include "emailks.fullname" . }}
|
||||||
|
labels:
|
||||||
|
{{- include "emailks.labels" . | nindent 4 }}
|
||||||
|
spec:
|
||||||
|
type: {{ .Values.service.type }}
|
||||||
|
ports:
|
||||||
|
- port: {{ .Values.service.port }}
|
||||||
|
targetPort: {{ .Values.service.targetPort }}
|
||||||
|
protocol: TCP
|
||||||
|
name: grpc
|
||||||
|
selector:
|
||||||
|
{{- include "emailks.selectorLabels" . | nindent 4 }}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
{{- if .Values.serviceAccount.create -}}
|
||||||
|
apiVersion: v1
|
||||||
|
kind: ServiceAccount
|
||||||
|
metadata:
|
||||||
|
name: {{ include "emailks.serviceAccountName" . }}
|
||||||
|
labels:
|
||||||
|
{{- include "emailks.labels" . | nindent 4 }}
|
||||||
|
{{- with .Values.serviceAccount.annotations }}
|
||||||
|
annotations:
|
||||||
|
{{- toYaml . | nindent 4 }}
|
||||||
|
{{- end }}
|
||||||
|
automountServiceAccountToken: false
|
||||||
|
{{- end }}
|
||||||
@@ -0,0 +1,68 @@
|
|||||||
|
replicaCount: 1
|
||||||
|
|
||||||
|
image:
|
||||||
|
repository: emailks
|
||||||
|
pullPolicy: IfNotPresent
|
||||||
|
tag: ""
|
||||||
|
|
||||||
|
imagePullSecrets: []
|
||||||
|
nameOverride: ""
|
||||||
|
fullnameOverride: ""
|
||||||
|
|
||||||
|
serviceAccount:
|
||||||
|
create: true
|
||||||
|
annotations: {}
|
||||||
|
name: ""
|
||||||
|
|
||||||
|
podAnnotations: {}
|
||||||
|
podLabels: {}
|
||||||
|
|
||||||
|
podSecurityContext: {}
|
||||||
|
securityContext: {}
|
||||||
|
|
||||||
|
service:
|
||||||
|
type: ClusterIP
|
||||||
|
port: 50051
|
||||||
|
targetPort: 50051
|
||||||
|
|
||||||
|
resources:
|
||||||
|
limits:
|
||||||
|
cpu: 500m
|
||||||
|
memory: 256Mi
|
||||||
|
requests:
|
||||||
|
cpu: 100m
|
||||||
|
memory: 64Mi
|
||||||
|
|
||||||
|
autoscaling:
|
||||||
|
enabled: false
|
||||||
|
minReplicas: 1
|
||||||
|
maxReplicas: 5
|
||||||
|
targetCPUUtilizationPercentage: 80
|
||||||
|
|
||||||
|
nodeSelector: {}
|
||||||
|
tolerations: []
|
||||||
|
affinity: {}
|
||||||
|
|
||||||
|
smtp:
|
||||||
|
host: smtp.example.com
|
||||||
|
port: 587
|
||||||
|
tls: starttls
|
||||||
|
username: ""
|
||||||
|
password: ""
|
||||||
|
fromEmail: noreply@example.com
|
||||||
|
fromName: EmailKS
|
||||||
|
replyTo: ""
|
||||||
|
timeoutSecs: 30
|
||||||
|
heloName: ""
|
||||||
|
allowRequestFrom: false
|
||||||
|
|
||||||
|
queue:
|
||||||
|
capacity: 1000
|
||||||
|
|
||||||
|
listenAddr: "0.0.0.0:50051"
|
||||||
|
|
||||||
|
logLevel: info
|
||||||
|
|
||||||
|
existingSecret:
|
||||||
|
name: ""
|
||||||
|
passwordKey: "password"
|
||||||
@@ -1,9 +1,8 @@
|
|||||||
use std::{env, fmt, net::SocketAddr, time::Duration};
|
use std::{env, fmt, net::SocketAddr, time::Duration};
|
||||||
|
|
||||||
pub use crate::error::ConfigError;
|
pub use crate::error::ConfigError;
|
||||||
|
use crate::ENV_PREFIX;
|
||||||
use tracing;
|
use tracing;
|
||||||
|
|
||||||
const ENV_PREFIX: &str = "APP_SMTP_";
|
|
||||||
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
const DEFAULT_LISTEN_ADDR: &str = "127.0.0.1:50051";
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ use lettre::{
|
|||||||
},
|
},
|
||||||
transport::smtp::{self, authentication::Credentials, extension::ClientId},
|
transport::smtp::{self, authentication::Credentials, extension::ClientId},
|
||||||
};
|
};
|
||||||
|
use tracing;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
config::{SmtpConfig, SmtpTls},
|
config::{SmtpConfig, SmtpTls},
|
||||||
@@ -105,6 +106,13 @@ pub(crate) fn build_single_attachment(
|
|||||||
url: att.url.clone(),
|
url: att.url.clone(),
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
if !att.data.is_empty() && !att.url.trim().is_empty() {
|
||||||
|
tracing::warn!(
|
||||||
|
filename = att.filename,
|
||||||
|
url = att.url,
|
||||||
|
"attachment has both inline data and url; url will be ignored"
|
||||||
|
);
|
||||||
|
}
|
||||||
let ct: ContentType = if att.content_type.trim().is_empty() {
|
let ct: ContentType = if att.content_type.trim().is_empty() {
|
||||||
"application/octet-stream"
|
"application/octet-stream"
|
||||||
.parse()
|
.parse()
|
||||||
@@ -228,6 +236,9 @@ pub fn build_message_from_parts(
|
|||||||
}
|
}
|
||||||
|
|
||||||
let from = resolve_sender(config, request)?;
|
let from = resolve_sender(config, request)?;
|
||||||
|
if request.subject.trim().is_empty() {
|
||||||
|
tracing::warn!("email subject is empty — some SMTP servers may reject it");
|
||||||
|
}
|
||||||
let mut builder = Message::builder()
|
let mut builder = Message::builder()
|
||||||
.from(from)
|
.from(from)
|
||||||
.subject(request.subject.clone());
|
.subject(request.subject.clone());
|
||||||
|
|||||||
@@ -1,6 +1,6 @@
|
|||||||
use std::fmt;
|
use std::fmt;
|
||||||
|
|
||||||
const ENV_PREFIX: &str = "APP_SMTP_";
|
use crate::ENV_PREFIX;
|
||||||
|
|
||||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
pub enum ConfigError {
|
pub enum ConfigError {
|
||||||
|
|||||||
@@ -6,6 +6,8 @@ pub mod queue;
|
|||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod status;
|
pub mod status;
|
||||||
|
|
||||||
|
pub(crate) const ENV_PREFIX: &str = "APP_SMTP_";
|
||||||
|
|
||||||
pub mod pb {
|
pub mod pb {
|
||||||
pub mod email {
|
pub mod email {
|
||||||
pub mod v1 {
|
pub mod v1 {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
let sender = EmailSender::new(config.smtp)?;
|
let sender = EmailSender::new(config.smtp)?;
|
||||||
let (queue, worker) = match config.queue_capacity {
|
let (queue, worker) = match config.queue_capacity {
|
||||||
|
// `Some(0)` explicitly opts into an unbounded queue (mainly for testing).
|
||||||
Some(0) => {
|
Some(0) => {
|
||||||
info!("creating unbounded queue by explicit configuration");
|
info!("creating unbounded queue by explicit configuration");
|
||||||
EmailQueue::unbounded()
|
EmailQueue::unbounded()
|
||||||
|
|||||||
@@ -156,6 +156,8 @@ impl EmailQueue {
|
|||||||
if *self.shutdown.tx.borrow() {
|
if *self.shutdown.tx.borrow() {
|
||||||
return Err(QueueError::Closed);
|
return Err(QueueError::Closed);
|
||||||
}
|
}
|
||||||
|
// NOTE: ID is consumed even if the send below fails (channel full, closed).
|
||||||
|
// The u64 space is large enough that this is inconsequential in practice.
|
||||||
let id = self.next_job_id()?;
|
let id = self.next_job_id()?;
|
||||||
self.status_store.set_queued(id);
|
self.status_store.set_queued(id);
|
||||||
let job = EmailJob {
|
let job = EmailJob {
|
||||||
|
|||||||
@@ -8,6 +8,8 @@ use tonic::{Request, Response, Status};
|
|||||||
use tracing::warn;
|
use tracing::warn;
|
||||||
|
|
||||||
const STREAM_STATUS_POLL_INTERVAL: Duration = Duration::from_millis(300);
|
const STREAM_STATUS_POLL_INTERVAL: Duration = Duration::from_millis(300);
|
||||||
|
/// Maximum lifetime of a single streaming batch-status RPC.
|
||||||
|
/// Protects against leaked streams when jobs never reach a terminal state.
|
||||||
const STREAM_STATUS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
const STREAM_STATUS_TIMEOUT: Duration = Duration::from_secs(10 * 60);
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -75,12 +77,12 @@ impl EmailService for EmailServiceImpl {
|
|||||||
request: Request<BatchSendEmailRequest>,
|
request: Request<BatchSendEmailRequest>,
|
||||||
) -> Result<Response<BatchSendEmailResponse>, Status> {
|
) -> Result<Response<BatchSendEmailResponse>, Status> {
|
||||||
let req = request.into_inner();
|
let req = request.into_inner();
|
||||||
let total = req.emails.len();
|
let total = req.emails.len() as i32;
|
||||||
let mut success = 0i32;
|
let mut success = 0i32;
|
||||||
let mut failures = 0i32;
|
let mut failures = 0i32;
|
||||||
let mut results = Vec::with_capacity(total);
|
let mut results = Vec::with_capacity(total as usize);
|
||||||
|
|
||||||
for email in req.emails {
|
for (i, email) in req.emails.into_iter().enumerate() {
|
||||||
match self.queue.enqueue(email) {
|
match self.queue.enqueue(email) {
|
||||||
Ok(id) => {
|
Ok(id) => {
|
||||||
success += 1;
|
success += 1;
|
||||||
@@ -90,6 +92,8 @@ impl EmailService for EmailServiceImpl {
|
|||||||
failures += 1;
|
failures += 1;
|
||||||
warn!(%e, "batch enqueue failed for one email");
|
warn!(%e, "batch enqueue failed for one email");
|
||||||
if req.fail_fast {
|
if req.fail_fast {
|
||||||
|
// Count remaining unprocessed emails as failures too.
|
||||||
|
failures += total - (i as i32) - 1;
|
||||||
warn!(
|
warn!(
|
||||||
successful = success,
|
successful = success,
|
||||||
failed = failures,
|
failed = failures,
|
||||||
@@ -152,6 +156,7 @@ impl EmailService for EmailServiceImpl {
|
|||||||
|
|
||||||
let id_set: std::collections::HashSet<u64> = ids.iter().copied().collect();
|
let id_set: std::collections::HashSet<u64> = ids.iter().copied().collect();
|
||||||
let store = self.store.clone();
|
let store = self.store.clone();
|
||||||
|
let mut missing_streak: std::collections::HashMap<u64, u32> = std::collections::HashMap::new();
|
||||||
let (tx, rx) = mpsc::channel(ids.len().saturating_add(immediate_results.len()).max(1));
|
let (tx, rx) = mpsc::channel(ids.len().saturating_add(immediate_results.len()).max(1));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
@@ -186,6 +191,7 @@ impl EmailService for EmailServiceImpl {
|
|||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
if let Some(entry) = store.get(*id) {
|
if let Some(entry) = store.get(*id) {
|
||||||
|
missing_streak.remove(id);
|
||||||
match entry.status {
|
match entry.status {
|
||||||
SendStatus::Sent => {
|
SendStatus::Sent => {
|
||||||
if tx
|
if tx
|
||||||
@@ -209,6 +215,20 @@ impl EmailService for EmailServiceImpl {
|
|||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Status entry may have been evicted under memory pressure.
|
||||||
|
// Report as failed after a few consecutive misses.
|
||||||
|
let streak = missing_streak.entry(*id).and_modify(|c| *c += 1).or_insert(1);
|
||||||
|
if *streak >= 5 {
|
||||||
|
if tx.send(Ok(build_failed_response(
|
||||||
|
Some(*id),
|
||||||
|
"status entry evicted before terminal state".into(),
|
||||||
|
))).await.is_err() {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
reported.insert(*id);
|
||||||
|
missing_streak.remove(id);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -68,18 +68,7 @@ impl JobStatusStore {
|
|||||||
guard.remove(&id);
|
guard.remove(&id);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn all_done(&self, ids: &[u64]) -> bool {
|
|
||||||
let guard = match self.inner.read() {
|
|
||||||
Ok(g) => g,
|
|
||||||
Err(_) => return false,
|
|
||||||
};
|
|
||||||
ids.iter().all(|id| {
|
|
||||||
matches!(
|
|
||||||
guard.get(id).map(|e| e.status),
|
|
||||||
Some(SendStatus::Sent | SendStatus::Failed)
|
|
||||||
)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn write(&self, id: u64, status: SendStatus, error: Option<String>) {
|
fn write(&self, id: u64, status: SendStatus, error: Option<String>) {
|
||||||
let mut guard = match self.inner.write() {
|
let mut guard = match self.inner.write() {
|
||||||
@@ -105,14 +94,15 @@ fn prune_statuses(entries: &mut HashMap<u64, JobStatusEntry>) {
|
|||||||
let now = Instant::now();
|
let now = Instant::now();
|
||||||
entries.retain(|_, entry| now.duration_since(entry.updated_at) <= STATUS_TTL);
|
entries.retain(|_, entry| now.duration_since(entry.updated_at) <= STATUS_TTL);
|
||||||
|
|
||||||
while entries.len() >= MAX_STATUS_ENTRIES {
|
// Evict at most one entry per write to avoid O(n²) behaviour under load.
|
||||||
let Some(oldest_id) = entries
|
// Repeated writes will gradually shrink the map if it stays above capacity.
|
||||||
|
if entries.len() >= MAX_STATUS_ENTRIES {
|
||||||
|
if let Some(oldest_id) = entries
|
||||||
.iter()
|
.iter()
|
||||||
.min_by_key(|(_, entry)| entry.updated_at)
|
.min_by_key(|(_, entry)| entry.updated_at)
|
||||||
.map(|(id, _)| *id)
|
.map(|(id, _)| *id)
|
||||||
else {
|
{
|
||||||
break;
|
|
||||||
};
|
|
||||||
entries.remove(&oldest_id);
|
entries.remove(&oldest_id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|||||||
+30
-8
@@ -7,6 +7,7 @@ use std::sync::{
|
|||||||
Arc,
|
Arc,
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
};
|
};
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
fn dummy_request() -> SendEmailRequest {
|
fn dummy_request() -> SendEmailRequest {
|
||||||
SendEmailRequest {
|
SendEmailRequest {
|
||||||
@@ -29,6 +30,8 @@ async fn enqueue_and_consume_success() {
|
|||||||
let (queue, worker) = EmailQueue::unbounded();
|
let (queue, worker) = EmailQueue::unbounded();
|
||||||
let counter = Arc::new(AtomicUsize::new(0));
|
let counter = Arc::new(AtomicUsize::new(0));
|
||||||
let c = counter.clone();
|
let c = counter.clone();
|
||||||
|
let notify = Arc::new(Notify::new());
|
||||||
|
let n = notify.clone();
|
||||||
|
|
||||||
let mut req = dummy_request();
|
let mut req = dummy_request();
|
||||||
req.to.push(EmailAddress {
|
req.to.push(EmailAddress {
|
||||||
@@ -41,14 +44,16 @@ async fn enqueue_and_consume_success() {
|
|||||||
|
|
||||||
worker.spawn(move |_job| {
|
worker.spawn(move |_job| {
|
||||||
let c = c.clone();
|
let c = c.clone();
|
||||||
|
let n = n.clone();
|
||||||
async move {
|
async move {
|
||||||
c.fetch_add(1, Ordering::SeqCst);
|
c.fetch_add(1, Ordering::SeqCst);
|
||||||
|
n.notify_one();
|
||||||
Ok::<(), EmailError>(())
|
Ok::<(), EmailError>(())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
// Wait for async consumption
|
// Wait for async consumption with proper signalling
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
notify.notified().await;
|
||||||
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
assert_eq!(counter.load(Ordering::SeqCst), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,22 +62,27 @@ async fn retry_then_succeed() {
|
|||||||
let (queue, worker) = EmailQueue::unbounded();
|
let (queue, worker) = EmailQueue::unbounded();
|
||||||
let attempts = Arc::new(AtomicUsize::new(0));
|
let attempts = Arc::new(AtomicUsize::new(0));
|
||||||
let a = attempts.clone();
|
let a = attempts.clone();
|
||||||
|
let notify = Arc::new(Notify::new());
|
||||||
|
let n = notify.clone();
|
||||||
|
|
||||||
let _id = queue.enqueue(dummy_request()).unwrap();
|
let _id = queue.enqueue(dummy_request()).unwrap();
|
||||||
|
|
||||||
worker.spawn(move |_job| {
|
worker.spawn(move |_job| {
|
||||||
let a = a.clone();
|
let a = a.clone();
|
||||||
|
let n = n.clone();
|
||||||
async move {
|
async move {
|
||||||
let n = a.fetch_add(1, Ordering::SeqCst);
|
let count = a.fetch_add(1, Ordering::SeqCst);
|
||||||
if n < 2 {
|
if count < 2 {
|
||||||
Err(EmailError::Send("temp failure".into()))
|
Err(EmailError::Send("temp failure".into()))
|
||||||
} else {
|
} else {
|
||||||
|
n.notify_one();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(700)).await;
|
// Wait for final success notification
|
||||||
|
notify.notified().await;
|
||||||
assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success
|
assert_eq!(attempts.load(Ordering::SeqCst), 3); // 2 fails + 1 success
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -82,18 +92,22 @@ async fn terminal_error_destroyed_immediately() {
|
|||||||
let store = queue.status_store().clone();
|
let store = queue.status_store().clone();
|
||||||
let attempts = Arc::new(AtomicUsize::new(0));
|
let attempts = Arc::new(AtomicUsize::new(0));
|
||||||
let a = attempts.clone();
|
let a = attempts.clone();
|
||||||
|
let notify = Arc::new(Notify::new());
|
||||||
|
let n = notify.clone();
|
||||||
|
|
||||||
let id = queue.enqueue(dummy_request()).unwrap();
|
let id = queue.enqueue(dummy_request()).unwrap();
|
||||||
|
|
||||||
worker.spawn(move |_job| {
|
worker.spawn(move |_job| {
|
||||||
let a = a.clone();
|
let a = a.clone();
|
||||||
|
let n = n.clone();
|
||||||
async move {
|
async move {
|
||||||
a.fetch_add(1, Ordering::SeqCst);
|
a.fetch_add(1, Ordering::SeqCst);
|
||||||
|
n.notify_one();
|
||||||
Err(EmailError::MissingRecipients)
|
Err(EmailError::MissingRecipients)
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
notify.notified().await;
|
||||||
assert_eq!(attempts.load(Ordering::SeqCst), 1);
|
assert_eq!(attempts.load(Ordering::SeqCst), 1);
|
||||||
|
|
||||||
let entry = store.get(id).expect("should have status entry");
|
let entry = store.get(id).expect("should have status entry");
|
||||||
@@ -113,15 +127,23 @@ async fn bounded_channel_blocks_when_full() {
|
|||||||
async fn status_store_tracks_lifecycle() {
|
async fn status_store_tracks_lifecycle() {
|
||||||
let (queue, worker) = EmailQueue::unbounded();
|
let (queue, worker) = EmailQueue::unbounded();
|
||||||
let store = queue.status_store().clone();
|
let store = queue.status_store().clone();
|
||||||
|
let notify = Arc::new(Notify::new());
|
||||||
|
let n = notify.clone();
|
||||||
|
|
||||||
let id = queue.enqueue(dummy_request()).unwrap();
|
let id = queue.enqueue(dummy_request()).unwrap();
|
||||||
// Status should be Queued
|
// Status should be Queued
|
||||||
let entry = store.get(id).unwrap();
|
let entry = store.get(id).unwrap();
|
||||||
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued);
|
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Queued);
|
||||||
|
|
||||||
worker.spawn(move |_job| async move { Ok::<(), EmailError>(()) });
|
worker.spawn(move |_job| {
|
||||||
|
let n = n.clone();
|
||||||
|
async move {
|
||||||
|
n.notify_one();
|
||||||
|
Ok::<(), EmailError>(())
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
tokio::time::sleep(std::time::Duration::from_millis(100)).await;
|
notify.notified().await;
|
||||||
let entry = store.get(id).unwrap();
|
let entry = store.get(id).unwrap();
|
||||||
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent);
|
assert_eq!(entry.status, emailks::pb::email::v1::SendStatus::Sent);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user