184 lines
5.6 KiB
Rust
184 lines
5.6 KiB
Rust
pub mod publisher;
|
|
pub mod subscriber;
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use async_nats::jetstream::Context;
|
|
use async_nats::jetstream::stream::{Config as StreamConfig, RetentionPolicy};
|
|
|
|
use crate::config::AppConfig;
|
|
use crate::error::{AppError, AppResult};
|
|
|
|
#[derive(Clone)]
|
|
pub struct NatsQueue {
|
|
inner: Arc<NatsQueueInner>,
|
|
}
|
|
|
|
struct NatsQueueInner {
|
|
js: Context,
|
|
client: async_nats::Client,
|
|
stream_prefix: String,
|
|
ack_wait: Duration,
|
|
max_deliver: i64,
|
|
}
|
|
|
|
pub struct StreamHandle {
|
|
pub name: String,
|
|
pub subjects: Vec<String>,
|
|
}
|
|
|
|
impl NatsQueue {
|
|
pub async fn connect(config: &AppConfig) -> AppResult<Self> {
|
|
let url = config.nats_url()?;
|
|
let timeout = config.nats_connection_timeout_secs()?;
|
|
let ping_interval = config.nats_ping_interval_secs()?;
|
|
let reconnect_delay = config.nats_reconnect_delay_secs()?;
|
|
let max_reconnects = config.nats_max_reconnects()?;
|
|
|
|
let mut opts = async_nats::ConnectOptions::new()
|
|
.connection_timeout(Duration::from_secs(timeout))
|
|
.ping_interval(Duration::from_secs(ping_interval))
|
|
.retry_on_initial_connect()
|
|
.max_reconnects(Some(max_reconnects))
|
|
.reconnect_delay_callback(move |_| Duration::from_secs(reconnect_delay))
|
|
.event_callback(|event| async move {
|
|
match event {
|
|
async_nats::Event::Disconnected => {
|
|
tracing::warn!("nats disconnected");
|
|
}
|
|
async_nats::Event::Connected => {
|
|
tracing::info!("nats connected");
|
|
}
|
|
async_nats::Event::LameDuckMode => {
|
|
tracing::warn!("nats server entering lame duck mode");
|
|
}
|
|
_ => {}
|
|
}
|
|
});
|
|
|
|
if let (Some(user), Some(pass)) = (config.nats_username()?, config.nats_password()?) {
|
|
opts = opts.user_and_password(user, pass);
|
|
} else if let Some(token) = config.nats_token()? {
|
|
opts = opts.token(token);
|
|
}
|
|
|
|
let client = async_nats::connect_with_options(&url, opts)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("nats connect failed: {e}")))?;
|
|
|
|
let js = async_nats::jetstream::new(client.clone());
|
|
let stream_prefix = config.nats_stream_prefix()?;
|
|
let ack_wait = config.nats_default_ack_wait_secs()?;
|
|
let max_deliver = config.nats_default_max_deliver()?;
|
|
|
|
tracing::info!(url = %url, "nats connected");
|
|
|
|
Ok(Self {
|
|
inner: Arc::new(NatsQueueInner {
|
|
js,
|
|
client,
|
|
stream_prefix,
|
|
ack_wait: Duration::from_secs(ack_wait),
|
|
max_deliver,
|
|
}),
|
|
})
|
|
}
|
|
|
|
pub fn client(&self) -> &async_nats::Client {
|
|
&self.inner.client
|
|
}
|
|
|
|
pub fn js(&self) -> &Context {
|
|
&self.inner.js
|
|
}
|
|
|
|
pub fn stream_name(&self, name: &str) -> String {
|
|
format!("{}_{}", self.inner.stream_prefix, name)
|
|
}
|
|
|
|
pub async fn ensure_stream(
|
|
&self,
|
|
name: &str,
|
|
subjects: Vec<String>,
|
|
) -> AppResult<StreamHandle> {
|
|
let stream_name = self.stream_name(name);
|
|
let config = StreamConfig {
|
|
name: stream_name.clone(),
|
|
subjects,
|
|
retention: RetentionPolicy::Limits,
|
|
max_messages: 100_000,
|
|
max_age: Duration::from_secs(7 * 24 * 3600),
|
|
duplicate_window: Duration::from_secs(120),
|
|
storage: async_nats::jetstream::stream::StorageType::File,
|
|
..Default::default()
|
|
};
|
|
|
|
self.inner
|
|
.js
|
|
.get_or_create_stream(config)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("ensure stream {stream_name} failed: {e}")))?;
|
|
|
|
let subjects = self
|
|
.inner
|
|
.js
|
|
.get_stream(&stream_name)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("get stream {stream_name} failed: {e}")))?
|
|
.info()
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("stream info {stream_name} failed: {e}")))?
|
|
.config
|
|
.subjects
|
|
.clone();
|
|
|
|
tracing::info!(stream = %stream_name, subjects = ?subjects, "stream ready");
|
|
|
|
Ok(StreamHandle {
|
|
name: stream_name,
|
|
subjects,
|
|
})
|
|
}
|
|
|
|
pub async fn ensure_ephemeral_stream(
|
|
&self,
|
|
name: &str,
|
|
subjects: Vec<String>,
|
|
max_age_secs: u64,
|
|
) -> AppResult<StreamHandle> {
|
|
let stream_name = self.stream_name(name);
|
|
let config = StreamConfig {
|
|
name: stream_name.clone(),
|
|
subjects,
|
|
retention: RetentionPolicy::Limits,
|
|
max_messages: 10_000,
|
|
max_age: Duration::from_secs(max_age_secs),
|
|
duplicate_window: Duration::from_secs(60),
|
|
storage: async_nats::jetstream::stream::StorageType::Memory,
|
|
..Default::default()
|
|
};
|
|
|
|
self.inner
|
|
.js
|
|
.get_or_create_stream(config)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("ensure stream {stream_name} failed: {e}")))?;
|
|
|
|
Ok(StreamHandle {
|
|
name: stream_name,
|
|
subjects: vec![],
|
|
})
|
|
}
|
|
|
|
pub async fn delete_stream(&self, name: &str) -> AppResult<()> {
|
|
let stream_name = self.stream_name(name);
|
|
self.inner
|
|
.js
|
|
.delete_stream(&stream_name)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("delete stream {stream_name} failed: {e}")))?;
|
|
Ok(())
|
|
}
|
|
}
|