Files
2026-06-07 11:30:56 +08:00

131 lines
3.3 KiB
Rust

use crate::config::AppConfig;
use crate::error::AppResult;
use sqlx::PgPool;
use sqlx::postgres::PgPoolOptions;
use std::ops::Deref;
use std::time::Duration;
pub trait PoolProvider: Clone + Send + Sync + 'static {
fn read(&self) -> &PgPool;
fn write(&self) -> &PgPool;
}
#[derive(Clone, Debug)]
pub struct AppDatabase {
primary: PgPool,
replica: Option<PgPool>,
}
impl AppDatabase {
pub fn reader(&self) -> &PgPool {
self.replica.as_ref().unwrap_or(&self.primary)
}
pub fn writer(&self) -> &PgPool {
&self.primary
}
pub fn new(primary: PgPool) -> Self {
Self {
primary,
replica: None,
}
}
pub fn with_replica(primary: PgPool, replica: PgPool) -> Self {
Self {
primary,
replica: Some(replica),
}
}
pub async fn from_config(config: &AppConfig) -> AppResult<Self> {
let primary_url = config.database_url()?;
let primary = Self::build_pool(&primary_url, config, false).await?;
if config.database_read_write_split()? {
let replicas = config.database_read_replicas()?;
if let Some(replica_url) = replicas.first() {
let replica = Self::build_pool(replica_url, config, true).await?;
return Ok(Self {
primary,
replica: Some(replica),
});
}
}
Ok(Self {
primary,
replica: None,
})
}
async fn build_pool(url: &str, config: &AppConfig, is_replica: bool) -> AppResult<PgPool> {
let (max_conn, min_conn, idle_timeout, max_lifetime, conn_timeout) = if is_replica {
(
config.database_replica_max_connections()?,
config.database_replica_min_connections()?,
config.database_replica_idle_timeout()?,
config.database_replica_max_lifetime()?,
config.database_replica_connection_timeout()?,
)
} else {
(
config.database_max_connections()?,
config.database_min_connections()?,
config.database_idle_timeout()?,
config.database_max_lifetime()?,
config.database_connection_timeout()?,
)
};
Ok(PgPoolOptions::new()
.max_connections(max_conn)
.min_connections(min_conn)
.idle_timeout(Duration::from_secs(idle_timeout))
.max_lifetime(Duration::from_secs(max_lifetime))
.acquire_timeout(Duration::from_secs(conn_timeout))
.connect(url)
.await?)
}
pub fn has_replica(&self) -> bool {
self.replica.is_some()
}
pub async fn close(&self) {
self.primary.close().await;
if let Some(replica) = &self.replica {
replica.close().await;
}
}
}
impl PoolProvider for AppDatabase {
fn read(&self) -> &PgPool {
self.replica.as_ref().unwrap_or(&self.primary)
}
fn write(&self) -> &PgPool {
&self.primary
}
}
impl Deref for AppDatabase {
type Target = PgPool;
fn deref(&self) -> &Self::Target {
&self.primary
}
}
impl PoolProvider for PgPool {
fn read(&self) -> &PgPool {
self
}
fn write(&self) -> &PgPool {
self
}
}