refactor: extract EmailksServer as library, thin main.rs to bootstrap-only
- Add EmailksServer / EmailksServerBuilder to lib.rs - Expose serve() / serve_with_shutdown() for embedding - main.rs now only handles etcd config and delegates to library
This commit is contained in:
@@ -16,3 +16,156 @@ pub mod pb {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
use std::future::Future;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
|
use crate::config::AppConfig;
|
||||||
|
use crate::email::EmailSender;
|
||||||
|
use crate::pb::email::v1::email_service_server::EmailServiceServer;
|
||||||
|
use crate::queue::EmailQueue;
|
||||||
|
use crate::server::EmailServiceImpl;
|
||||||
|
use tonic::transport::Server as TonicServer;
|
||||||
|
use tracing::{error, info};
|
||||||
|
|
||||||
|
const DEFAULT_QUEUE_CAPACITY: usize = 1_000;
|
||||||
|
|
||||||
|
/// A ready-to-run emailks gRPC server.
|
||||||
|
///
|
||||||
|
/// Use [`EmailksServerBuilder`] to construct an instance from configuration.
|
||||||
|
pub struct EmailksServer {
|
||||||
|
service: EmailServiceImpl,
|
||||||
|
worker_handle: tokio::task::JoinHandle<()>,
|
||||||
|
addr: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder for [`EmailksServer`].
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use emailks::{EmailksServer, config::AppConfig};
|
||||||
|
///
|
||||||
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
/// let config = AppConfig::from_env()?;
|
||||||
|
/// let server = EmailksServer::builder()
|
||||||
|
/// .config(config)
|
||||||
|
/// .build()
|
||||||
|
/// .await?;
|
||||||
|
/// server.serve().await?;
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
pub struct EmailksServerBuilder {
|
||||||
|
config: Option<AppConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EmailksServer {
|
||||||
|
/// Create a new builder.
|
||||||
|
pub fn builder() -> EmailksServerBuilder {
|
||||||
|
EmailksServerBuilder::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the gRPC server and block until Ctrl+C (or SIGTERM on Unix).
|
||||||
|
pub async fn serve(self) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
self.serve_with_shutdown(shutdown_signal()).await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the gRPC server and block until the provided `shutdown` future resolves.
|
||||||
|
///
|
||||||
|
/// In-flight requests are drained before the server returns.
|
||||||
|
pub async fn serve_with_shutdown(
|
||||||
|
self,
|
||||||
|
shutdown: impl Future<Output = ()>,
|
||||||
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
let svc = self.service;
|
||||||
|
let addr = self.addr;
|
||||||
|
|
||||||
|
let (health, health_svc) = tonic_health::server::health_reporter();
|
||||||
|
health
|
||||||
|
.set_serving::<EmailServiceServer<EmailServiceImpl>>()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
info!(%addr, "gRPC server starting");
|
||||||
|
TonicServer::builder()
|
||||||
|
.add_service(health_svc)
|
||||||
|
.add_service(EmailServiceServer::new(svc))
|
||||||
|
.serve_with_shutdown(addr, shutdown)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("server stopped");
|
||||||
|
if let Err(e) = self.worker_handle.await {
|
||||||
|
error!(error = %e, "worker task panicked");
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl EmailksServerBuilder {
|
||||||
|
/// Set the server configuration.
|
||||||
|
pub fn config(mut self, config: AppConfig) -> Self {
|
||||||
|
self.config = Some(config);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the server, creating the SMTP sender, queue, worker, and gRPC service.
|
||||||
|
pub async fn build(self) -> Result<EmailksServer, Box<dyn std::error::Error>> {
|
||||||
|
let config = self
|
||||||
|
.config
|
||||||
|
.unwrap_or_else(|| AppConfig::from_env().expect("failed to load emailks config"));
|
||||||
|
|
||||||
|
info!(
|
||||||
|
host = %config.smtp.host,
|
||||||
|
port = config.smtp.port,
|
||||||
|
"smtp config loaded"
|
||||||
|
);
|
||||||
|
|
||||||
|
let sender = EmailSender::new(config.smtp)?;
|
||||||
|
let (queue, worker) = match config.queue_capacity {
|
||||||
|
Some(0) => {
|
||||||
|
info!("creating unbounded queue");
|
||||||
|
EmailQueue::unbounded()
|
||||||
|
}
|
||||||
|
Some(cap) => {
|
||||||
|
info!(capacity = cap, "creating bounded queue");
|
||||||
|
EmailQueue::bounded(cap)
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
info!(
|
||||||
|
capacity = DEFAULT_QUEUE_CAPACITY,
|
||||||
|
"creating bounded queue (default)"
|
||||||
|
);
|
||||||
|
EmailQueue::bounded(DEFAULT_QUEUE_CAPACITY)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let store = queue.status_store().clone();
|
||||||
|
let worker_handle = worker.spawn(move |job| {
|
||||||
|
let s = sender.clone();
|
||||||
|
async move { s.send_job(&job).await }
|
||||||
|
});
|
||||||
|
|
||||||
|
let addr = config.listen_addr;
|
||||||
|
let service = EmailServiceImpl::new(queue, store);
|
||||||
|
|
||||||
|
Ok(EmailksServer {
|
||||||
|
service,
|
||||||
|
worker_handle,
|
||||||
|
addr,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for EmailksServerBuilder {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self { config: None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn shutdown_signal() {
|
||||||
|
match tokio::signal::ctrl_c().await {
|
||||||
|
Ok(()) => info!("shutdown signal received"),
|
||||||
|
Err(err) => error!(%err, "failed to install CTRL+C handler"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,12 +1,9 @@
|
|||||||
use emailks::{
|
use emailks::{
|
||||||
config::AppConfig, email::EmailSender, etcd::{EtcdConfig, ServiceRegistry},
|
config::AppConfig,
|
||||||
pb::email::v1::email_service_server::EmailServiceServer, queue::EmailQueue,
|
etcd::{EtcdConfig, ServiceRegistry},
|
||||||
server::EmailServiceImpl,
|
EmailksServer,
|
||||||
};
|
};
|
||||||
use tonic::transport::Server;
|
use tracing::info;
|
||||||
use tracing::{error, info};
|
|
||||||
|
|
||||||
const DEFAULT_QUEUE_CAPACITY: usize = 1_000;
|
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
@@ -19,7 +16,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
dotenvy::dotenv().ok();
|
dotenvy::dotenv().ok();
|
||||||
|
|
||||||
// Phase 1: read etcd endpoints from env (required to bootstrap etcd)
|
|
||||||
let etcd_endpoints: Vec<String> = std::env::var("ETCD_ENDPOINTS")
|
let etcd_endpoints: Vec<String> = std::env::var("ETCD_ENDPOINTS")
|
||||||
.unwrap_or_else(|_| "http://localhost:2379".to_string())
|
.unwrap_or_else(|_| "http://localhost:2379".to_string())
|
||||||
.split(',')
|
.split(',')
|
||||||
@@ -29,15 +25,12 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX")
|
let etcd_prefix = std::env::var("ETCD_KEY_PREFIX")
|
||||||
.unwrap_or_else(|_| "/appks/".to_string());
|
.unwrap_or_else(|_| "/appks/".to_string());
|
||||||
|
|
||||||
// Phase 2: connect etcd, create config overlay (etcd > env > default)
|
|
||||||
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await?;
|
let etcd = EtcdConfig::connect(etcd_endpoints, &etcd_prefix).await?;
|
||||||
let listen_addr_str = etcd.get("EMAILKS_LISTEN_ADDR", "127.0.0.1:50051").await;
|
let listen_addr_str = etcd.get("EMAILKS_LISTEN_ADDR", "127.0.0.1:50051").await;
|
||||||
|
|
||||||
// Phase 3: register this service so other services (appks) can discover us
|
|
||||||
let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix);
|
let registry = ServiceRegistry::new(etcd.client(), &etcd_prefix);
|
||||||
registry.register("emailks", &listen_addr_str).await?;
|
registry.register("emailks", &listen_addr_str).await?;
|
||||||
|
|
||||||
// Phase 4: load SMTP config — each key: etcd first, then env, then default
|
|
||||||
let smtp_host = etcd.get("APP_SMTP_HOST", "").await;
|
let smtp_host = etcd.get("APP_SMTP_HOST", "").await;
|
||||||
if smtp_host.is_empty() {
|
if smtp_host.is_empty() {
|
||||||
return Err("APP_SMTP_HOST is required (set via etcd or env)".into());
|
return Err("APP_SMTP_HOST is required (set via etcd or env)".into());
|
||||||
@@ -68,54 +61,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|||||||
|
|
||||||
info!(host = %config.smtp.host, port = config.smtp.port, "smtp config loaded (etcd priority)");
|
info!(host = %config.smtp.host, port = config.smtp.port, "smtp config loaded (etcd priority)");
|
||||||
|
|
||||||
let sender = EmailSender::new(config.smtp)?;
|
let server = EmailksServer::builder().config(config).build().await?;
|
||||||
let (queue, worker) = match config.queue_capacity {
|
server.serve().await?;
|
||||||
Some(0) => {
|
|
||||||
info!("creating unbounded queue");
|
|
||||||
EmailQueue::unbounded()
|
|
||||||
}
|
|
||||||
Some(cap) => {
|
|
||||||
info!(capacity = cap, "creating bounded queue");
|
|
||||||
EmailQueue::bounded(cap)
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
info!(capacity = DEFAULT_QUEUE_CAPACITY, "creating bounded queue (default)");
|
|
||||||
EmailQueue::bounded(DEFAULT_QUEUE_CAPACITY)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let store = queue.status_store().clone();
|
|
||||||
let worker_handle = worker.spawn(move |job| {
|
|
||||||
let s = sender.clone();
|
|
||||||
async move { s.send_job(&job).await }
|
|
||||||
});
|
|
||||||
|
|
||||||
let addr = config.listen_addr;
|
|
||||||
let svc = EmailServiceImpl::new(queue, store);
|
|
||||||
|
|
||||||
let (health, health_svc) = tonic_health::server::health_reporter();
|
|
||||||
health
|
|
||||||
.set_serving::<EmailServiceServer<EmailServiceImpl>>()
|
|
||||||
.await;
|
|
||||||
|
|
||||||
info!(%addr, "gRPC server starting");
|
|
||||||
Server::builder()
|
|
||||||
.add_service(health_svc)
|
|
||||||
.add_service(EmailServiceServer::new(svc))
|
|
||||||
.serve_with_shutdown(addr, shutdown_signal())
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("server stopped");
|
|
||||||
if let Err(e) = worker_handle.await {
|
|
||||||
tracing::error!(error = %e, "worker task panicked");
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn shutdown_signal() {
|
|
||||||
match tokio::signal::ctrl_c().await {
|
|
||||||
Ok(()) => info!("shutdown signal received"),
|
|
||||||
Err(err) => error!(%err, "failed to install CTRL+C handler"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user