From 3251fa08e3acdee2dd18c55d6eccfe59d3d61984 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Fri, 12 Jun 2026 21:36:42 +0800 Subject: [PATCH] refactor: extract EmailksServer as library, thin main.rs to bootstrap-only - Add EmailksServer / EmailksServerBuilder to lib.rs - Expose serve() / serve_with_shutdown() for embedding - main.rs now only handles etcd config and delegates to library --- lib.rs | 153 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.rs | 65 +++--------------------- 2 files changed, 159 insertions(+), 59 deletions(-) diff --git a/lib.rs b/lib.rs index d33b3fb..12269db 100644 --- a/lib.rs +++ b/lib.rs @@ -16,3 +16,156 @@ pub mod pb { } } } + +use std::future::Future; +use std::net::SocketAddr; + +use crate::config::AppConfig; +use crate::email::EmailSender; +use crate::pb::email::v1::email_service_server::EmailServiceServer; +use crate::queue::EmailQueue; +use crate::server::EmailServiceImpl; +use tonic::transport::Server as TonicServer; +use tracing::{error, info}; + +const DEFAULT_QUEUE_CAPACITY: usize = 1_000; + +/// A ready-to-run emailks gRPC server. +/// +/// Use [`EmailksServerBuilder`] to construct an instance from configuration. +pub struct EmailksServer { + service: EmailServiceImpl, + worker_handle: tokio::task::JoinHandle<()>, + addr: SocketAddr, +} + +/// Builder for [`EmailksServer`]. +/// +/// # Examples +/// +/// ```no_run +/// use emailks::{EmailksServer, config::AppConfig}; +/// +/// # async fn example() -> Result<(), Box> { +/// let config = AppConfig::from_env()?; +/// let server = EmailksServer::builder() +/// .config(config) +/// .build() +/// .await?; +/// server.serve().await?; +/// # Ok(()) +/// # } +/// ``` +pub struct EmailksServerBuilder { + config: Option, +} + +impl EmailksServer { + /// Create a new builder. + pub fn builder() -> EmailksServerBuilder { + EmailksServerBuilder::default() + } + + /// Start the gRPC server and block until Ctrl+C (or SIGTERM on Unix). + pub async fn serve(self) -> Result<(), Box> { + self.serve_with_shutdown(shutdown_signal()).await + } + + /// Start the gRPC server and block until the provided `shutdown` future resolves. + /// + /// In-flight requests are drained before the server returns. + pub async fn serve_with_shutdown( + self, + shutdown: impl Future, + ) -> Result<(), Box> { + let svc = self.service; + let addr = self.addr; + + let (health, health_svc) = tonic_health::server::health_reporter(); + health + .set_serving::>() + .await; + + info!(%addr, "gRPC server starting"); + TonicServer::builder() + .add_service(health_svc) + .add_service(EmailServiceServer::new(svc)) + .serve_with_shutdown(addr, shutdown) + .await?; + + info!("server stopped"); + if let Err(e) = self.worker_handle.await { + error!(error = %e, "worker task panicked"); + } + + Ok(()) + } +} + +impl EmailksServerBuilder { + /// Set the server configuration. + pub fn config(mut self, config: AppConfig) -> Self { + self.config = Some(config); + self + } + + /// Build the server, creating the SMTP sender, queue, worker, and gRPC service. + pub async fn build(self) -> Result> { + let config = self + .config + .unwrap_or_else(|| AppConfig::from_env().expect("failed to load emailks config")); + + info!( + host = %config.smtp.host, + port = config.smtp.port, + "smtp config loaded" + ); + + let sender = EmailSender::new(config.smtp)?; + let (queue, worker) = match config.queue_capacity { + Some(0) => { + info!("creating unbounded queue"); + EmailQueue::unbounded() + } + Some(cap) => { + info!(capacity = cap, "creating bounded queue"); + EmailQueue::bounded(cap) + } + None => { + info!( + capacity = DEFAULT_QUEUE_CAPACITY, + "creating bounded queue (default)" + ); + EmailQueue::bounded(DEFAULT_QUEUE_CAPACITY) + } + }; + + let store = queue.status_store().clone(); + let worker_handle = worker.spawn(move |job| { + let s = sender.clone(); + async move { s.send_job(&job).await } + }); + + let addr = config.listen_addr; + let service = EmailServiceImpl::new(queue, store); + + Ok(EmailksServer { + service, + worker_handle, + addr, + }) + } +} + +impl Default for EmailksServerBuilder { + fn default() -> Self { + Self { config: None } + } +} + +async fn shutdown_signal() { + match tokio::signal::ctrl_c().await { + Ok(()) => info!("shutdown signal received"), + Err(err) => error!(%err, "failed to install CTRL+C handler"), + } +} diff --git a/main.rs b/main.rs index 3ef9bb0..88a3816 100644 --- a/main.rs +++ b/main.rs @@ -1,12 +1,9 @@ use emailks::{ - config::AppConfig, email::EmailSender, etcd::{EtcdConfig, ServiceRegistry}, - pb::email::v1::email_service_server::EmailServiceServer, queue::EmailQueue, - server::EmailServiceImpl, + config::AppConfig, + etcd::{EtcdConfig, ServiceRegistry}, + EmailksServer, }; -use tonic::transport::Server; -use tracing::{error, info}; - -const DEFAULT_QUEUE_CAPACITY: usize = 1_000; +use tracing::info; #[tokio::main] async fn main() -> Result<(), Box> { @@ -19,7 +16,6 @@ async fn main() -> Result<(), Box> { dotenvy::dotenv().ok(); - // Phase 1: read etcd endpoints from env (required to bootstrap etcd) let etcd_endpoints: Vec = std::env::var("ETCD_ENDPOINTS") .unwrap_or_else(|_| "http://localhost:2379".to_string()) .split(',') @@ -29,15 +25,12 @@ async fn main() -> Result<(), Box> { let etcd_prefix = std::env::var("ETCD_KEY_PREFIX") .unwrap_or_else(|_| "/appks/".to_string()); - // Phase 2: connect etcd, create config overlay (etcd > env > default) let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await?; let listen_addr_str = etcd.get("EMAILKS_LISTEN_ADDR", "127.0.0.1:50051").await; - // Phase 3: register this service so other services (appks) can discover us let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix); registry.register("emailks", &listen_addr_str).await?; - // Phase 4: load SMTP config — each key: etcd first, then env, then default let smtp_host = etcd.get("APP_SMTP_HOST", "").await; if smtp_host.is_empty() { return Err("APP_SMTP_HOST is required (set via etcd or env)".into()); @@ -68,54 +61,8 @@ async fn main() -> Result<(), Box> { info!(host = %config.smtp.host, port = config.smtp.port, "smtp config loaded (etcd priority)"); - let sender = EmailSender::new(config.smtp)?; - let (queue, worker) = match config.queue_capacity { - Some(0) => { - info!("creating unbounded queue"); - EmailQueue::unbounded() - } - Some(cap) => { - info!(capacity = cap, "creating bounded queue"); - EmailQueue::bounded(cap) - } - None => { - info!(capacity = DEFAULT_QUEUE_CAPACITY, "creating bounded queue (default)"); - EmailQueue::bounded(DEFAULT_QUEUE_CAPACITY) - } - }; - - let store = queue.status_store().clone(); - let worker_handle = worker.spawn(move |job| { - let s = sender.clone(); - async move { s.send_job(&job).await } - }); - - let addr = config.listen_addr; - let svc = EmailServiceImpl::new(queue, store); - - let (health, health_svc) = tonic_health::server::health_reporter(); - health - .set_serving::>() - .await; - - info!(%addr, "gRPC server starting"); - Server::builder() - .add_service(health_svc) - .add_service(EmailServiceServer::new(svc)) - .serve_with_shutdown(addr, shutdown_signal()) - .await?; - - info!("server stopped"); - if let Err(e) = worker_handle.await { - tracing::error!(error = %e, "worker task panicked"); - } + let server = EmailksServer::builder().config(config).build().await?; + server.serve().await?; Ok(()) } - -async fn shutdown_signal() { - match tokio::signal::ctrl_c().await { - Ok(()) => info!("shutdown signal received"), - Err(err) => error!(%err, "failed to install CTRL+C handler"), - } -}