Compare commits
1 Commits
931d82cbb9
..
main
| Author | SHA1 | Date | |
|---|---|---|---|
| 47ed59c9d6 |
@@ -10,3 +10,252 @@ pub mod queue;
|
|||||||
pub mod service;
|
pub mod service;
|
||||||
pub mod session;
|
pub mod session;
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
|
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use actix_web::cookie::Key;
|
||||||
|
use actix_web::{App, HttpResponse, HttpServer, web};
|
||||||
|
use sqlx::Executor;
|
||||||
|
|
||||||
|
use crate::cache::AppCache;
|
||||||
|
use crate::cache::redis::AppRedis;
|
||||||
|
use crate::config::AppConfig;
|
||||||
|
use crate::error::{AppError, AppResult};
|
||||||
|
use crate::etcd::EtcdRegistry;
|
||||||
|
use crate::models::db::AppDatabase;
|
||||||
|
use crate::queue::NatsQueue;
|
||||||
|
use crate::service::AppService;
|
||||||
|
use crate::session::RedisSessionStore;
|
||||||
|
use crate::storage::s3::AppS3Storage;
|
||||||
|
use utoipa::OpenApi;
|
||||||
|
|
||||||
|
/// A ready-to-run appks HTTP + gRPC server.
|
||||||
|
///
|
||||||
|
/// Use [`AppksServerBuilder`] to construct an instance from configuration.
|
||||||
|
pub struct AppksServer {
|
||||||
|
service: AppService,
|
||||||
|
db: AppDatabase,
|
||||||
|
config: AppConfig,
|
||||||
|
redis: AppRedis,
|
||||||
|
rpc_addr: SocketAddr,
|
||||||
|
rpc_listener: tokio::net::TcpListener,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Builder for [`AppksServer`].
|
||||||
|
///
|
||||||
|
/// # Examples
|
||||||
|
///
|
||||||
|
/// ```no_run
|
||||||
|
/// use appks::{AppksServer, config::AppConfig};
|
||||||
|
///
|
||||||
|
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
|
/// let config = AppConfig::load();
|
||||||
|
/// let server = AppksServer::builder()
|
||||||
|
/// .config(config)
|
||||||
|
/// .build()
|
||||||
|
/// .await?;
|
||||||
|
/// server.serve().await?;
|
||||||
|
/// # Ok(())
|
||||||
|
/// # }
|
||||||
|
/// ```
|
||||||
|
pub struct AppksServerBuilder {
|
||||||
|
config: Option<AppConfig>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppksServer {
|
||||||
|
/// Create a new builder.
|
||||||
|
pub fn builder() -> AppksServerBuilder {
|
||||||
|
AppksServerBuilder::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Returns a reference to the inner [`AppService`].
|
||||||
|
///
|
||||||
|
/// Useful when embedding appks in another application without running
|
||||||
|
/// the built-in HTTP/gRPC servers — you can wire routes manually.
|
||||||
|
pub fn service(&self) -> &AppService {
|
||||||
|
&self.service
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Start the HTTP and gRPC servers and block until shutdown.
|
||||||
|
pub async fn serve(self) -> AppResult<()> {
|
||||||
|
// Spawn gRPC server
|
||||||
|
{
|
||||||
|
let grpc_service = self.service.clone();
|
||||||
|
let rpc_addr = self.rpc_addr;
|
||||||
|
let rpc_listener = self.rpc_listener;
|
||||||
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) =
|
||||||
|
crate::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await
|
||||||
|
{
|
||||||
|
tracing::error!(error = %e, "gRPC server failed");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Background JWT key rotation
|
||||||
|
{
|
||||||
|
let token_service = self.service.internal_auth.clone();
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
|
||||||
|
interval.tick().await; // skip first immediate tick
|
||||||
|
loop {
|
||||||
|
interval.tick().await;
|
||||||
|
match token_service.rotate_if_needed().await {
|
||||||
|
Ok(true) => tracing::info!("signing key rotated"),
|
||||||
|
Ok(false) => tracing::debug!("signing key rotation not needed"),
|
||||||
|
Err(e) => tracing::error!(error = %e, "signing key rotation failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// HTTP server
|
||||||
|
let host = self
|
||||||
|
.config
|
||||||
|
.get_env_or::<String>("APP_HTTP_HOST", "0.0.0.0".to_string())?;
|
||||||
|
let port = self
|
||||||
|
.config
|
||||||
|
.get_env_or::<u16>("APP_HTTP_PORT", 8000)?;
|
||||||
|
let workers = self.config.get_env_or::<usize>(
|
||||||
|
"APP_HTTP_WORKERS",
|
||||||
|
std::thread::available_parallelism()
|
||||||
|
.map(|n| n.get())
|
||||||
|
.unwrap_or(1),
|
||||||
|
)?;
|
||||||
|
let bind_addr = format!("{host}:{port}");
|
||||||
|
let session_key = build_session_key(&self.config)?;
|
||||||
|
let session_cfg = self.config.session_config()?;
|
||||||
|
let redis = self.redis.clone();
|
||||||
|
let service = self.service.clone();
|
||||||
|
|
||||||
|
tracing::info!(addr = %bind_addr, workers, "http server listening");
|
||||||
|
|
||||||
|
HttpServer::new(move || {
|
||||||
|
let session_store = RedisSessionStore::new(redis.clone());
|
||||||
|
let session_middleware =
|
||||||
|
session_cfg.build_middleware(session_store, session_key.clone());
|
||||||
|
|
||||||
|
App::new()
|
||||||
|
.wrap(actix_web::middleware::Logger::default())
|
||||||
|
.app_data(web::Data::new(service.clone()))
|
||||||
|
.wrap(session_middleware)
|
||||||
|
.route("/healthz", web::get().to(healthz))
|
||||||
|
.route("/readyz", web::get().to(readyz))
|
||||||
|
.route("/openapi.json", web::get().to(openapi_json))
|
||||||
|
.configure(crate::api::routes::init_routes)
|
||||||
|
})
|
||||||
|
.workers(workers)
|
||||||
|
.bind(bind_addr)?
|
||||||
|
.run()
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
self.db.close().await;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AppksServerBuilder {
|
||||||
|
/// Set the server configuration.
|
||||||
|
pub fn config(mut self, config: AppConfig) -> Self {
|
||||||
|
self.config = Some(config);
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the server, connecting to all infrastructure (DB, Redis, S3, etcd, NATS)
|
||||||
|
/// and creating the service layer.
|
||||||
|
pub async fn build(self) -> AppResult<AppksServer> {
|
||||||
|
let config = self.config.unwrap_or_else(AppConfig::load);
|
||||||
|
validate_session_secret(&config)?;
|
||||||
|
|
||||||
|
tracing::info!("starting AppKS");
|
||||||
|
|
||||||
|
let db = AppDatabase::from_config(&config).await?;
|
||||||
|
db.writer().execute("SELECT 1").await?;
|
||||||
|
sqlx::migrate!("./migrate")
|
||||||
|
.run(db.writer())
|
||||||
|
.await
|
||||||
|
.map_err(|e| AppError::Config(format!("database migration failed: {e}")))?;
|
||||||
|
|
||||||
|
let redis = AppRedis::from_config(&config).await?;
|
||||||
|
let cache = Arc::new(AppCache::from_config(&config).await?);
|
||||||
|
let storage = AppS3Storage::from_config(&config).await?;
|
||||||
|
|
||||||
|
let rpc_host = config.get_env_or::<String>("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?;
|
||||||
|
let rpc_port = config.get_env_or::<u16>("APP_RPC_SELF_PORT", 50050)?;
|
||||||
|
let rpc_addr: SocketAddr = format!("{rpc_host}:{rpc_port}")
|
||||||
|
.parse()
|
||||||
|
.map_err(|e| AppError::Config(format!("invalid gRPC address: {e}")))?;
|
||||||
|
|
||||||
|
let rpc_listener = tokio::net::TcpListener::bind(rpc_addr).await.map_err(|e| {
|
||||||
|
AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}"))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let registry = Arc::new(EtcdRegistry::connect(&config).await?);
|
||||||
|
registry.start_discovery().await?;
|
||||||
|
registry
|
||||||
|
.register_self(&config.rpc_self_service_name()?)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let nats = Arc::new(NatsQueue::connect(&config).await?);
|
||||||
|
|
||||||
|
let service = AppService::new(
|
||||||
|
env!("CARGO_PKG_VERSION").to_string(),
|
||||||
|
db.clone(),
|
||||||
|
redis.clone(),
|
||||||
|
cache,
|
||||||
|
config.clone(),
|
||||||
|
storage,
|
||||||
|
registry,
|
||||||
|
nats,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
|
||||||
|
Ok(AppksServer {
|
||||||
|
service,
|
||||||
|
db,
|
||||||
|
config,
|
||||||
|
redis,
|
||||||
|
rpc_addr,
|
||||||
|
rpc_listener,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for AppksServerBuilder {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self { config: None }
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn healthz() -> HttpResponse {
|
||||||
|
HttpResponse::Ok().json(serde_json::json!({ "status": "ok" }))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn readyz(service: web::Data<AppService>) -> Result<HttpResponse, AppError> {
|
||||||
|
service.ctx.db.writer().execute("SELECT 1").await?;
|
||||||
|
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "ready" })))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn openapi_json() -> HttpResponse {
|
||||||
|
HttpResponse::Ok().json(crate::api::openapi::OpenApiDoc::openapi())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn build_session_key(config: &AppConfig) -> AppResult<Key> {
|
||||||
|
let secret = session_secret(config)?;
|
||||||
|
Ok(Key::derive_from(secret.as_bytes()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn validate_session_secret(config: &AppConfig) -> AppResult<()> {
|
||||||
|
session_secret(config).map(|_| ())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn session_secret(config: &AppConfig) -> AppResult<String> {
|
||||||
|
let secret = config
|
||||||
|
.env
|
||||||
|
.get("APP_SESSION_SECRET")
|
||||||
|
.map(|s| s.trim())
|
||||||
|
.filter(|s| s.len() >= 32)
|
||||||
|
.ok_or_else(|| AppError::Config("APP_SESSION_SECRET must be at least 32 bytes".into()))?;
|
||||||
|
Ok(secret.to_string())
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,160 +1,16 @@
|
|||||||
use std::sync::Arc;
|
use appks::{AppksServer, config::AppConfig};
|
||||||
|
|
||||||
use actix_web::cookie::Key;
|
#[tokio::main]
|
||||||
use actix_web::{App, HttpResponse, HttpServer, web};
|
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||||
use appks::api::openapi::OpenApiDoc;
|
|
||||||
use appks::api::routes::init_routes;
|
|
||||||
use appks::cache::AppCache;
|
|
||||||
use appks::cache::redis::AppRedis;
|
|
||||||
use appks::config::AppConfig;
|
|
||||||
use appks::error::{AppError, AppResult};
|
|
||||||
use appks::etcd::EtcdRegistry;
|
|
||||||
use appks::models::db::AppDatabase;
|
|
||||||
use appks::queue::NatsQueue;
|
|
||||||
use appks::service::AppService;
|
|
||||||
use appks::session::RedisSessionStore;
|
|
||||||
use appks::storage::s3::AppS3Storage;
|
|
||||||
use sqlx::Executor;
|
|
||||||
use utoipa::OpenApi;
|
|
||||||
|
|
||||||
#[actix_web::main]
|
|
||||||
async fn main() -> AppResult<()> {
|
|
||||||
tracing_subscriber::fmt::init();
|
tracing_subscriber::fmt::init();
|
||||||
|
|
||||||
let config = AppConfig::load();
|
let config = AppConfig::load();
|
||||||
validate_session_secret(&config)?;
|
|
||||||
|
|
||||||
tracing::info!("starting AppKS");
|
let server = AppksServer::builder()
|
||||||
|
.config(config)
|
||||||
let db = AppDatabase::from_config(&config).await?;
|
.build()
|
||||||
db.writer().execute("SELECT 1").await?;
|
|
||||||
sqlx::migrate!("./migrate")
|
|
||||||
.run(db.writer())
|
|
||||||
.await
|
|
||||||
.map_err(|e| AppError::Config(format!("database migration failed: {e}")))?;
|
|
||||||
|
|
||||||
let redis = AppRedis::from_config(&config).await?;
|
|
||||||
let cache = Arc::new(AppCache::from_config(&config).await?);
|
|
||||||
let storage = AppS3Storage::from_config(&config).await?;
|
|
||||||
|
|
||||||
let rpc_host = config.get_env_or::<String>("APP_RPC_SELF_HOST", "0.0.0.0".to_string())?;
|
|
||||||
let rpc_port = config.get_env_or::<u16>("APP_RPC_SELF_PORT", 50050)?;
|
|
||||||
let rpc_addr: std::net::SocketAddr = format!("{rpc_host}:{rpc_port}").parse()
|
|
||||||
.map_err(|e| appks::error::AppError::Config(format!("invalid gRPC address: {e}")))?;
|
|
||||||
|
|
||||||
// Bind the TCP listener FIRST so the port is reserved before etcd registration.
|
|
||||||
// This prevents peers from trying to connect before the gRPC server is ready.
|
|
||||||
let rpc_listener = tokio::net::TcpListener::bind(rpc_addr)
|
|
||||||
.await
|
|
||||||
.map_err(|e| appks::error::AppError::Config(format!("gRPC bind failed on {rpc_addr}: {e}")))?;
|
|
||||||
|
|
||||||
let registry = Arc::new(EtcdRegistry::connect(&config).await?);
|
|
||||||
registry.start_discovery().await?;
|
|
||||||
registry
|
|
||||||
.register_self(&config.rpc_self_service_name()?)
|
|
||||||
.await?;
|
.await?;
|
||||||
|
server.serve().await?;
|
||||||
|
|
||||||
let nats = Arc::new(NatsQueue::connect(&config).await?);
|
|
||||||
|
|
||||||
let service = AppService::new(
|
|
||||||
env!("CARGO_PKG_VERSION").to_string(),
|
|
||||||
db.clone(),
|
|
||||||
redis.clone(),
|
|
||||||
cache,
|
|
||||||
config.clone(),
|
|
||||||
storage,
|
|
||||||
registry,
|
|
||||||
nats,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
let grpc_service = service.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
if let Err(e) = appks::grpc::start_grpc_server(rpc_addr, rpc_listener, grpc_service).await {
|
|
||||||
tracing::error!(error = %e, "gRPC server failed");
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Background task: rotate JWT signing keys every 10 minutes.
|
|
||||||
let token_service = service.internal_auth.clone();
|
|
||||||
tokio::spawn(async move {
|
|
||||||
let mut interval = tokio::time::interval(std::time::Duration::from_secs(600));
|
|
||||||
// Skip the first immediate tick.
|
|
||||||
interval.tick().await;
|
|
||||||
loop {
|
|
||||||
interval.tick().await;
|
|
||||||
match token_service.rotate_if_needed().await {
|
|
||||||
Ok(true) => tracing::info!("signing key rotated"),
|
|
||||||
Ok(false) => tracing::debug!("signing key rotation not needed"),
|
|
||||||
Err(e) => tracing::error!(error = %e, "signing key rotation failed"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let host = config.get_env_or::<String>("APP_HTTP_HOST", "0.0.0.0".to_string())?;
|
|
||||||
let port = config.get_env_or::<u16>("APP_HTTP_PORT", 8000)?;
|
|
||||||
let workers = config.get_env_or::<usize>(
|
|
||||||
"APP_HTTP_WORKERS",
|
|
||||||
std::thread::available_parallelism()
|
|
||||||
.map(|n| n.get())
|
|
||||||
.unwrap_or(1),
|
|
||||||
)?;
|
|
||||||
let bind_addr = format!("{host}:{port}");
|
|
||||||
let session_key = build_session_key(&config)?;
|
|
||||||
let session_cfg = config.session_config()?;
|
|
||||||
|
|
||||||
tracing::info!(addr = %bind_addr, workers, "http server listening");
|
|
||||||
|
|
||||||
HttpServer::new(move || {
|
|
||||||
let session_store = RedisSessionStore::new(redis.clone());
|
|
||||||
let session_middleware = session_cfg.build_middleware(session_store, session_key.clone());
|
|
||||||
|
|
||||||
App::new()
|
|
||||||
.wrap(actix_web::middleware::Logger::default())
|
|
||||||
.app_data(web::Data::new(service.clone()))
|
|
||||||
.wrap(session_middleware)
|
|
||||||
.route("/healthz", web::get().to(healthz))
|
|
||||||
.route("/readyz", web::get().to(readyz))
|
|
||||||
.route("/openapi.json", web::get().to(openapi_json))
|
|
||||||
.configure(init_routes)
|
|
||||||
})
|
|
||||||
.workers(workers)
|
|
||||||
.bind(bind_addr)?
|
|
||||||
.run()
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
db.close().await;
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn healthz() -> HttpResponse {
|
|
||||||
HttpResponse::Ok().json(serde_json::json!({ "status": "ok" }))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn readyz(service: web::Data<AppService>) -> Result<HttpResponse, AppError> {
|
|
||||||
service.ctx.db.writer().execute("SELECT 1").await?;
|
|
||||||
Ok(HttpResponse::Ok().json(serde_json::json!({ "status": "ready" })))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn openapi_json() -> HttpResponse {
|
|
||||||
HttpResponse::Ok().json(OpenApiDoc::openapi())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn build_session_key(config: &AppConfig) -> AppResult<Key> {
|
|
||||||
let secret = session_secret(config)?;
|
|
||||||
Ok(Key::derive_from(secret.as_bytes()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn validate_session_secret(config: &AppConfig) -> AppResult<()> {
|
|
||||||
session_secret(config).map(|_| ())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn session_secret(config: &AppConfig) -> AppResult<String> {
|
|
||||||
let secret = config
|
|
||||||
.env
|
|
||||||
.get("APP_SESSION_SECRET")
|
|
||||||
.map(|s| s.trim())
|
|
||||||
.filter(|s| s.len() >= 32)
|
|
||||||
.ok_or_else(|| AppError::Config("APP_SESSION_SECRET must be at least 32 bytes".into()))?;
|
|
||||||
Ok(secret.to_string())
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user