pub mod api; pub mod cache; pub mod config; pub mod error; pub mod etcd; pub mod grpc; pub mod models; pub mod pb; pub mod queue; pub mod service; pub mod session; pub mod storage; use std::net::SocketAddr; use std::sync::Arc; use actix_web::cookie::Key; use actix_web::{App, HttpResponse, HttpServer, web}; use sqlx::Executor; use crate::cache::AppCache; use crate::cache::redis::AppRedis; use crate::config::AppConfig; use crate::error::{AppError, AppResult}; use crate::etcd::EtcdRegistry; use crate::models::db::AppDatabase; use crate::queue::NatsQueue; use crate::service::AppService; use crate::session::RedisSessionStore; use crate::storage::s3::AppS3Storage; use utoipa::OpenApi; /// A ready-to-run appks HTTP + gRPC server. /// /// Use [`AppksServerBuilder`] to construct an instance from configuration. pub struct AppksServer { service: AppService, db: AppDatabase, config: AppConfig, redis: AppRedis, rpc_addr: SocketAddr, rpc_listener: tokio::net::TcpListener, } /// Builder for [`AppksServer`]. /// /// # Examples /// /// ```no_run /// use appks::{AppksServer, config::AppConfig}; /// /// # async fn example() -> Result<(), Box> { /// let config = AppConfig::load(); /// let server = AppksServer::builder() /// .config(config) /// .build() /// .await?; /// server.serve().await?; /// # Ok(()) /// # } /// ``` pub struct AppksServerBuilder { config: Option, } impl AppksServer { /// Create a new builder. pub fn builder() -> AppksServerBuilder { AppksServerBuilder::default() } /// Returns a reference to the inner [`AppService`]. /// /// Useful when embedding appks in another application without running /// the built-in HTTP/gRPC servers — you can wire routes manually. pub fn service(&self) -> &AppService { &self.service } /// Start the HTTP and gRPC servers and block until shutdown. pub async fn serve(self) -> AppResult<()> { // Spawn gRPC server { let grpc_service = self.service.clone(); let rpc_addr = self.rpc_addr; let rpc_listener = self.rpc_listener; tokio::spawn(async move { if let Err(e) = crate::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await { tracing::error!(error = %e, "gRPC server failed"); } }); } // Background JWT key rotation { let token_service = self.service.internal_auth.clone(); tokio::spawn(async move { let mut interval = tokio::time::interval(std::time::Duration::from_secs(600)); interval.tick().await; // skip first immediate tick loop { interval.tick().await; match token_service.rotate_if_needed().await { Ok(true) => tracing::info!("signing key rotated"), Ok(false) => tracing::debug!("signing key rotation not needed"), Err(e) => tracing::error!(error = %e, "signing key rotation failed"), } } }); } // HTTP server let host = self .config .get_env_or::("APP_HTTP_HOST", "0.0.0.0".to_string())?; let port = self .config .get_env_or::("APP_HTTP_PORT", 8000)?; let workers = self.config.get_env_or::( "APP_HTTP_WORKERS", std::thread::available_parallelism() .map(|n| n.get()) .unwrap_or(1), )?; let bind_addr = format!("{host}:{port}"); let session_key = build_session_key(&self.config)?; let session_cfg = self.config.session_config()?; let redis = self.redis.clone(); let service = self.service.clone(); tracing::info!(addr = %bind_addr, workers, "http server listening"); HttpServer::new(move || { let session_store = RedisSessionStore::new(redis.clone()); let session_middleware = session_cfg.build_middleware(session_store, session_key.clone()); App::new() .wrap(actix_web::middleware::Logger::default()) .app_data(web::Data::new(service.clone())) .wrap(session_middleware) .route("/healthz", web::get().to(healthz)) .route("/readyz", web::get().to(readyz)) .route("/openapi.json", web::get().to(openapi_json)) .configure(crate::api::routes::init_routes) }) .workers(workers) .bind(bind_addr)? .run() .await?; self.db.close().await; Ok(()) } } impl AppksServerBuilder { /// Set the server configuration. pub fn config(mut self, config: AppConfig) -> Self { self.config = Some(config); self } /// Build the server, connecting to all infrastructure (DB, Redis, S3, etcd, NATS) /// and creating the service layer. pub async fn build(self) -> AppResult { let config = self.config.unwrap_or_else(AppConfig::load); validate_session_secret(&config)?; tracing::info!("starting AppKS"); let db = AppDatabase::from_config(&config).await?; db.writer().execute("SELECT 1").await?; sqlx::migrate!("./migrate") .run(db.writer()) .await .map_err(|e| AppError::Config(format!("database migration failed: {e}")))?; let redis = AppRedis::from_config(&config).await?; let cache = Arc::new(AppCache::from_config(&config).await?); let storage = AppS3Storage::from_config(&config).await?; let rpc_host = config.get_env_or::("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?; let rpc_port = config.get_env_or::("APP_RPC_SELF_PORT", 50050)?; let rpc_addr: SocketAddr = format!("{rpc_host}:{rpc_port}") .parse() .map_err(|e| AppError::Config(format!("invalid gRPC address: {e}")))?; let rpc_listener = tokio::net::TcpListener::bind(rpc_addr).await.map_err(|e| { AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")) })?; let registry = Arc::new(EtcdRegistry::connect(&config).await?); registry.start_discovery().await?; registry .register_self(&config.rpc_self_service_name()?) .await?; let nats = Arc::new(NatsQueue::connect(&config).await?); let service = AppService::new( env!("CARGO_PKG_VERSION").to_string(), db.clone(), redis.clone(), cache, config.clone(), storage, registry, nats, ) .await; Ok(AppksServer { service, db, config, redis, rpc_addr, rpc_listener, }) } } impl Default for AppksServerBuilder { fn default() -> Self { Self { config: None } } } async fn healthz() -> HttpResponse { HttpResponse::Ok().json(serde_json::json!({ "status": "ok" })) } async fn readyz(service: web::Data) -> Result { service.ctx.db.writer().execute("SELECT 1").await?; Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "ready" }))) } async fn openapi_json() -> HttpResponse { HttpResponse::Ok().json(crate::api::openapi::OpenApiDoc::openapi()) } fn build_session_key(config: &AppConfig) -> AppResult { let secret = session_secret(config)?; Ok(Key::derive_from(secret.as_bytes())) } fn validate_session_secret(config: &AppConfig) -> AppResult<()> { session_secret(config).map(|_| ()) } fn session_secret(config: &AppConfig) -> AppResult { let secret = config .env .get("APP_SESSION_SECRET") .map(|s| s.trim()) .filter(|s| s.len() >= 32) .ok_or_else(|| AppError::Config("APP_SESSION_SECRET must be at least 32 bytes".into()))?; Ok(secret.to_string()) }