use crate::config::AppConfig; use crate::error::AppResult; use sqlx::PgPool; use sqlx::postgres::PgPoolOptions; use std::ops::Deref; use std::time::Duration; pub trait PoolProvider: Clone + Send + Sync + 'static { fn read(&self) -> &PgPool; fn write(&self) -> &PgPool; } #[derive(Clone, Debug)] pub struct AppDatabase { primary: PgPool, replica: Option, } impl AppDatabase { pub fn reader(&self) -> &PgPool { self.replica.as_ref().unwrap_or(&self.primary) } pub fn writer(&self) -> &PgPool { &self.primary } pub fn new(primary: PgPool) -> Self { Self { primary, replica: None, } } pub fn with_replica(primary: PgPool, replica: PgPool) -> Self { Self { primary, replica: Some(replica), } } pub async fn from_config(config: &AppConfig) -> AppResult { let primary_url = config.database_url()?; let primary = Self::build_pool(&primary_url, config, false).await?; if config.database_read_write_split()? { let replicas = config.database_read_replicas()?; if let Some(replica_url) = replicas.first() { let replica = Self::build_pool(replica_url, config, true).await?; return Ok(Self { primary, replica: Some(replica), }); } } Ok(Self { primary, replica: None, }) } async fn build_pool(url: &str, config: &AppConfig, is_replica: bool) -> AppResult { let (max_conn, min_conn, idle_timeout, max_lifetime, conn_timeout) = if is_replica { ( config.database_replica_max_connections()?, config.database_replica_min_connections()?, config.database_replica_idle_timeout()?, config.database_replica_max_lifetime()?, config.database_replica_connection_timeout()?, ) } else { ( config.database_max_connections()?, config.database_min_connections()?, config.database_idle_timeout()?, config.database_max_lifetime()?, config.database_connection_timeout()?, ) }; Ok(PgPoolOptions::new() .max_connections(max_conn) .min_connections(min_conn) .idle_timeout(Duration::from_secs(idle_timeout)) .max_lifetime(Duration::from_secs(max_lifetime)) .acquire_timeout(Duration::from_secs(conn_timeout)) .connect(url) .await?) } pub fn has_replica(&self) -> bool { self.replica.is_some() } pub async fn close(&self) { self.primary.close().await; if let Some(replica) = &self.replica { replica.close().await; } } } impl PoolProvider for AppDatabase { fn read(&self) -> &PgPool { self.replica.as_ref().unwrap_or(&self.primary) } fn write(&self) -> &PgPool { &self.primary } } impl Deref for AppDatabase { type Target = PgPool; fn deref(&self) -> &Self::Target { &self.primary } } impl PoolProvider for PgPool { fn read(&self) -> &PgPool { self } fn write(&self) -> &PgPool { self } }