pub mod publisher; pub mod subscriber; use std::sync::Arc; use std::time::Duration; use async_nats::jetstream::Context; use async_nats::jetstream::stream::{Config as StreamConfig, RetentionPolicy}; use crate::config::AppConfig; use crate::error::{AppError, AppResult}; #[derive(Clone)] pub struct NatsQueue { inner: Arc, } struct NatsQueueInner { js: Context, client: async_nats::Client, stream_prefix: String, ack_wait: Duration, max_deliver: i64, } pub struct StreamHandle { pub name: String, pub subjects: Vec, } impl NatsQueue { pub async fn connect(config: &AppConfig) -> AppResult { let url = config.nats_url()?; let timeout = config.nats_connection_timeout_secs()?; let ping_interval = config.nats_ping_interval_secs()?; let reconnect_delay = config.nats_reconnect_delay_secs()?; let max_reconnects = config.nats_max_reconnects()?; let mut opts = async_nats::ConnectOptions::new() .connection_timeout(Duration::from_secs(timeout)) .ping_interval(Duration::from_secs(ping_interval)) .retry_on_initial_connect() .max_reconnects(Some(max_reconnects)) .reconnect_delay_callback(move |_| Duration::from_secs(reconnect_delay)) .event_callback(|event| async move { match event { async_nats::Event::Disconnected => { tracing::warn!("nats disconnected"); } async_nats::Event::Connected => { tracing::info!("nats connected"); } async_nats::Event::LameDuckMode => { tracing::warn!("nats server entering lame duck mode"); } _ => {} } }); if let (Some(user), Some(pass)) = (config.nats_username()?, config.nats_password()?) { opts = opts.user_and_password(user, pass); } else if let Some(token) = config.nats_token()? { opts = opts.token(token); } let client = async_nats::connect_with_options(&url, opts) .await .map_err(|e| AppError::Config(format!("nats connect failed: {e}")))?; let js = async_nats::jetstream::new(client.clone()); let stream_prefix = config.nats_stream_prefix()?; let ack_wait = config.nats_default_ack_wait_secs()?; let max_deliver = config.nats_default_max_deliver()?; tracing::info!(url = %url, "nats connected"); Ok(Self { inner: Arc::new(NatsQueueInner { js, client, stream_prefix, ack_wait: Duration::from_secs(ack_wait), max_deliver, }), }) } pub fn client(&self) -> &async_nats::Client { &self.inner.client } pub fn js(&self) -> &Context { &self.inner.js } pub fn stream_name(&self, name: &str) -> String { format!("{}_{}", self.inner.stream_prefix, name) } pub async fn ensure_stream( &self, name: &str, subjects: Vec, ) -> AppResult { let stream_name = self.stream_name(name); let config = StreamConfig { name: stream_name.clone(), subjects, retention: RetentionPolicy::Limits, max_messages: 100_000, max_age: Duration::from_secs(7 * 24 * 3600), duplicate_window: Duration::from_secs(120), storage: async_nats::jetstream::stream::StorageType::File, ..Default::default() }; self.inner .js .get_or_create_stream(config) .await .map_err(|e| AppError::Config(format!("ensure stream {stream_name} failed: {e}")))?; let subjects = self .inner .js .get_stream(&stream_name) .await .map_err(|e| AppError::Config(format!("get stream {stream_name} failed: {e}")))? .info() .await .map_err(|e| AppError::Config(format!("stream info {stream_name} failed: {e}")))? .config .subjects .clone(); tracing::info!(stream = %stream_name, subjects = ?subjects, "stream ready"); Ok(StreamHandle { name: stream_name, subjects, }) } pub async fn ensure_ephemeral_stream( &self, name: &str, subjects: Vec, max_age_secs: u64, ) -> AppResult { let stream_name = self.stream_name(name); let config = StreamConfig { name: stream_name.clone(), subjects, retention: RetentionPolicy::Limits, max_messages: 10_000, max_age: Duration::from_secs(max_age_secs), duplicate_window: Duration::from_secs(60), storage: async_nats::jetstream::stream::StorageType::Memory, ..Default::default() }; self.inner .js .get_or_create_stream(config) .await .map_err(|e| AppError::Config(format!("ensure stream {stream_name} failed: {e}")))?; Ok(StreamHandle { name: stream_name, subjects: vec![], }) } pub async fn delete_stream(&self, name: &str) -> AppResult<()> { let stream_name = self.stream_name(name); self.inner .js .delete_stream(&stream_name) .await .map_err(|e| AppError::Config(format!("delete stream {stream_name} failed: {e}")))?; Ok(()) } }