From 47ed59c9d645c3d37712892da84363369e2b9539 Mon Sep 17 00:00:00 2001 From: zhenyi <434836402@qq.com> Date: Fri, 12 Jun 2026 21:37:07 +0800 Subject: [PATCH] refactor: extract AppksServer as library, thin main.rs to bootstrap-only - Add AppksServer / AppksServerBuilder to lib.rs - Move DB/Redis/Cache/S3/etcd/NATS init into builder - Move gRPC server spawn, JWT rotation, actix-web HTTP into serve() - Expose service() getter for embedding without the built-in servers - main.rs reduced to ~10 lines --- lib.rs | 249 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++ main.rs | 158 ++--------------------------------- 2 files changed, 256 insertions(+), 151 deletions(-) diff --git a/lib.rs b/lib.rs index 27775f0..617b664 100644 --- a/lib.rs +++ b/lib.rs @@ -10,3 +10,252 @@ pub mod queue; pub mod service; pub mod session; pub mod storage; + +use std::net::SocketAddr; +use std::sync::Arc; + +use actix_web::cookie::Key; +use actix_web::{App, HttpResponse, HttpServer, web}; +use sqlx::Executor; + +use crate::cache::AppCache; +use crate::cache::redis::AppRedis; +use crate::config::AppConfig; +use crate::error::{AppError, AppResult}; +use crate::etcd::EtcdRegistry; +use crate::models::db::AppDatabase; +use crate::queue::NatsQueue; +use crate::service::AppService; +use crate::session::RedisSessionStore; +use crate::storage::s3::AppS3Storage; +use utoipa::OpenApi; + +/// A ready-to-run appks HTTP + gRPC server. +/// +/// Use [`AppksServerBuilder`] to construct an instance from configuration. +pub struct AppksServer { + service: AppService, + db: AppDatabase, + config: AppConfig, + redis: AppRedis, + rpc_addr: SocketAddr, + rpc_listener: tokio::net::TcpListener, +} + +/// Builder for [`AppksServer`]. +/// +/// # Examples +/// +/// ```no_run +/// use appks::{AppksServer, config::AppConfig}; +/// +/// # async fn example() -> Result<(), Box> { +/// let config = AppConfig::load(); +/// let server = AppksServer::builder() +/// .config(config) +/// .build() +/// .await?; +/// server.serve().await?; +/// # Ok(()) +/// # } +/// ``` +pub struct AppksServerBuilder { + config: Option, +} + +impl AppksServer { + /// Create a new builder. + pub fn builder() -> AppksServerBuilder { + AppksServerBuilder::default() + } + + /// Returns a reference to the inner [`AppService`]. + /// + /// Useful when embedding appks in another application without running + /// the built-in HTTP/gRPC servers — you can wire routes manually. + pub fn service(&self) -> &AppService { + &self.service + } + + /// Start the HTTP and gRPC servers and block until shutdown. + pub async fn serve(self) -> AppResult<()> { + // Spawn gRPC server + { + let grpc_service = self.service.clone(); + let rpc_addr = self.rpc_addr; + let rpc_listener = self.rpc_listener; + tokio::spawn(async move { + if let Err(e) = + crate::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await + { + tracing::error!(error = %e, "gRPC server failed"); + } + }); + } + + // Background JWT key rotation + { + let token_service = self.service.internal_auth.clone(); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); + interval.tick().await; // skip first immediate tick + loop { + interval.tick().await; + match token_service.rotate_if_needed().await { + Ok(true) => tracing::info!("signing key rotated"), + Ok(false) => tracing::debug!("signing key rotation not needed"), + Err(e) => tracing::error!(error = %e, "signing key rotation failed"), + } + } + }); + } + + // HTTP server + let host = self + .config + .get_env_or::("APP_HTTP_HOST", "0.0.0.0".to_string())?; + let port = self + .config + .get_env_or::("APP_HTTP_PORT", 8000)?; + let workers = self.config.get_env_or::( + "APP_HTTP_WORKERS", + std::thread::available_parallelism() + .map(|n| n.get()) + .unwrap_or(1), + )?; + let bind_addr = format!("{host}:{port}"); + let session_key = build_session_key(&self.config)?; + let session_cfg = self.config.session_config()?; + let redis = self.redis.clone(); + let service = self.service.clone(); + + tracing::info!(addr = %bind_addr, workers, "http server listening"); + + HttpServer::new(move || { + let session_store = RedisSessionStore::new(redis.clone()); + let session_middleware = + session_cfg.build_middleware(session_store, session_key.clone()); + + App::new() + .wrap(actix_web::middleware::Logger::default()) + .app_data(web::Data::new(service.clone())) + .wrap(session_middleware) + .route("/healthz", web::get().to(healthz)) + .route("/readyz", web::get().to(readyz)) + .route("/openapi.json", web::get().to(openapi_json)) + .configure(crate::api::routes::init_routes) + }) + .workers(workers) + .bind(bind_addr)? + .run() + .await?; + + self.db.close().await; + Ok(()) + } +} + +impl AppksServerBuilder { + /// Set the server configuration. + pub fn config(mut self, config: AppConfig) -> Self { + self.config = Some(config); + self + } + + /// Build the server, connecting to all infrastructure (DB, Redis, S3, etcd, NATS) + /// and creating the service layer. + pub async fn build(self) -> AppResult { + let config = self.config.unwrap_or_else(AppConfig::load); + validate_session_secret(&config)?; + + tracing::info!("starting AppKS"); + + let db = AppDatabase::from_config(&config).await?; + db.writer().execute("SELECT 1").await?; + sqlx::migrate!("./migrate") + .run(db.writer()) + .await + .map_err(|e| AppError::Config(format!("database migration failed: {e}")))?; + + let redis = AppRedis::from_config(&config).await?; + let cache = Arc::new(AppCache::from_config(&config).await?); + let storage = AppS3Storage::from_config(&config).await?; + + let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; + let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; + let rpc_addr: SocketAddr = format!("{rpc_host}:{rpc_port}") + .parse() + .map_err(|e| AppError::Config(format!("invalid gRPC address: {e}")))?; + + let rpc_listener = tokio::net::TcpListener::bind(rpc_addr).await.map_err(|e| { + AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")) + })?; + + let registry = Arc::new(EtcdRegistry::connect(&config).await?); + registry.start_discovery().await?; + registry + .register_self(&config.rpc_self_service_name()?) + .await?; + + let nats = Arc::new(NatsQueue::connect(&config).await?); + + let service = AppService::new( + env!("CARGO_PKG_VERSION").to_string(), + db.clone(), + redis.clone(), + cache, + config.clone(), + storage, + registry, + nats, + ) + .await; + + Ok(AppksServer { + service, + db, + config, + redis, + rpc_addr, + rpc_listener, + }) + } +} + +impl Default for AppksServerBuilder { + fn default() -> Self { + Self { config: None } + } +} + +async fn healthz() -> HttpResponse { + HttpResponse::Ok().json(serde_json::json!({ "status": "ok" })) +} + +async fn readyz(service: web::Data) -> Result { + service.ctx.db.writer().execute("SELECT 1").await?; + Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "ready" }))) +} + +async fn openapi_json() -> HttpResponse { + HttpResponse::Ok().json(crate::api::openapi::OpenApiDoc::openapi()) +} + +fn build_session_key(config: &AppConfig) -> AppResult { + let secret = session_secret(config)?; + Ok(Key::derive_from(secret.as_bytes())) +} + +fn validate_session_secret(config: &AppConfig) -> AppResult<()> { + session_secret(config).map(|_| ()) +} + +fn session_secret(config: &AppConfig) -> AppResult { + let secret = config + .env + .get("APP_SESSION_SECRET") + .map(|s| s.trim()) + .filter(|s| s.len() >= 32) + .ok_or_else(|| AppError::Config("APP_SESSION_SECRET must be at least 32 bytes".into()))?; + Ok(secret.to_string()) +} diff --git a/main.rs b/main.rs index c65aa13..917fa26 100644 --- a/main.rs +++ b/main.rs @@ -1,160 +1,16 @@ -use std::sync::Arc; +use appks::{AppksServer, config::AppConfig}; -use actix_web::cookie::Key; -use actix_web::{App, HttpResponse, HttpServer, web}; -use appks::api::openapi::OpenApiDoc; -use appks::api::routes::init_routes; -use appks::cache::AppCache; -use appks::cache::redis::AppRedis; -use appks::config::AppConfig; -use appks::error::{AppError, AppResult}; -use appks::etcd::EtcdRegistry; -use appks::models::db::AppDatabase; -use appks::queue::NatsQueue; -use appks::service::AppService; -use appks::session::RedisSessionStore; -use appks::storage::s3::AppS3Storage; -use sqlx::Executor; -use utoipa::OpenApi; - -#[actix_web::main] -async fn main() -> AppResult<()> { +#[tokio::main] +async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); let config = AppConfig::load(); - validate_session_secret(&config)?; - tracing::info!("starting AppKS"); - - let db = AppDatabase::from_config(&config).await?; - db.writer().execute("SELECT 1").await?; - sqlx::migrate!("./migrate") - .run(db.writer()) - .await - .map_err(|e| AppError::Config(format!("database migration failed: {e}")))?; - - let redis = AppRedis::from_config(&config).await?; - let cache = Arc::new(AppCache::from_config(&config).await?); - let storage = AppS3Storage::from_config(&config).await?; - - let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; - let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; - let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse() - .map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?; - - // Bind the TCP listener FIRST so the port is reserved before etcd registration. - // This prevents peers from trying to connect before the gRPC server is ready. - let rpc_listener = tokio::net::TcpListener::bind(rpc_addr) - .await - .map_err(|e| appks::error::AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")))?; - - let registry = Arc::new(EtcdRegistry::connect(&config).await?); - registry.start_discovery().await?; - registry - .register_self(&config.rpc_self_service_name()?) + let server = AppksServer::builder() + .config(config) + .build() .await?; + server.serve().await?; - let nats = Arc::new(NatsQueue::connect(&config).await?); - - let service = AppService::new( - env!("CARGO_PKG_VERSION").to_string(), - db.clone(), - redis.clone(), - cache, - config.clone(), - storage, - registry, - nats, - ) - .await; - - let grpc_service = service.clone(); - tokio::spawn(async move { - if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await { - tracing::error!(error = %e, "gRPC server failed"); - } - }); - - // Background task: rotate JWT signing keys every 10 minutes. - let token_service = service.internal_auth.clone(); - tokio::spawn(async move { - let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); - // Skip the first immediate tick. - interval.tick().await; - loop { - interval.tick().await; - match token_service.rotate_if_needed().await { - Ok(true) => tracing::info!("signing key rotated"), - Ok(false) => tracing::debug!("signing key rotation not needed"), - Err(e) => tracing::error!(error = %e, "signing key rotation failed"), - } - } - }); - - let host = config.get_env_or::("APP_HTTP_HOST", "0.0.0.0".to_string())?; - let port = config.get_env_or::("APP_HTTP_PORT", 8000)?; - let workers = config.get_env_or::( - "APP_HTTP_WORKERS", - std::thread::available_parallelism() - .map(|n| n.get()) - .unwrap_or(1), - )?; - let bind_addr = format!("{host}:{port}"); - let session_key = build_session_key(&config)?; - let session_cfg = config.session_config()?; - - tracing::info!(addr = %bind_addr, workers, "http server listening"); - - HttpServer::new(move || { - let session_store = RedisSessionStore::new(redis.clone()); - let session_middleware = session_cfg.build_middleware(session_store, session_key.clone()); - - App::new() - .wrap(actix_web::middleware::Logger::default()) - .app_data(web::Data::new(service.clone())) - .wrap(session_middleware) - .route("/healthz", web::get().to(healthz)) - .route("/readyz", web::get().to(readyz)) - .route("/openapi.json", web::get().to(openapi_json)) - .configure(init_routes) - }) - .workers(workers) - .bind(bind_addr)? - .run() - .await?; - - db.close().await; Ok(()) } - -async fn healthz() -> HttpResponse { - HttpResponse::Ok().json(serde_json::json!({ "status": "ok" })) -} - -async fn readyz(service: web::Data) -> Result { - service.ctx.db.writer().execute("SELECT 1").await?; - Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "ready" }))) -} - -async fn openapi_json() -> HttpResponse { - HttpResponse::Ok().json(OpenApiDoc::openapi()) -} - -fn build_session_key(config: &AppConfig) -> AppResult { - let secret = session_secret(config)?; - Ok(Key::derive_from(secret.as_bytes())) -} - -fn validate_session_secret(config: &AppConfig) -> AppResult<()> { - session_secret(config).map(|_| ()) -} - -fn session_secret(config: &AppConfig) -> AppResult { - let secret = config - .env - .get("APP_SESSION_SECRET") - .map(|s| s.trim()) - .filter(|s| s.len() >= 32) - .ok_or_else(|| AppError::Config("APP_SESSION_SECRET must be at least 32 bytes".into()))?; - Ok(secret.to_string()) -}