use std::time::Duration; use async_nats::jetstream::consumer::AckPolicy; use async_nats::jetstream::consumer::pull::Config as PullConfig; use async_nats::jetstream::consumer::push::Config as PushConfig; use async_nats::jetstream::consumer::push::Messages; use futures_util::StreamExt; use serde::de::DeserializeOwned; use crate::error::{AppError, AppResult}; use super::NatsQueue; pub struct NatsMessage { inner: async_nats::jetstream::Message, } impl NatsMessage { pub fn subject(&self) -> &str { self.inner.message.subject.as_str() } pub fn payload(&self) -> &[u8] { &self.inner.message.payload } pub fn payload_json(&self) -> AppResult { serde_json::from_slice(&self.inner.message.payload).map_err(AppError::from) } pub fn headers(&self) -> Option<&async_nats::HeaderMap> { self.inner.message.headers.as_ref() } pub async fn ack(self) -> AppResult<()> { self.inner .ack() .await .map_err(|e| AppError::Config(format!("ack failed: {e}"))) } pub async fn nack(self) -> AppResult<()> { self.inner .ack_with(async_nats::jetstream::AckKind::Nak(None)) .await .map_err(|e| AppError::Config(format!("nack failed: {e}"))) } pub async fn nack_with_delay(self, delay: Duration) -> AppResult<()> { self.inner .ack_with(async_nats::jetstream::AckKind::Nak(Some(delay))) .await .map_err(|e| AppError::Config(format!("nack with delay failed: {e}"))) } pub async fn ack_in_progress(&self) -> AppResult<()> { self.inner .ack_with(async_nats::jetstream::AckKind::Progress) .await .map_err(|e| AppError::Config(format!("ack in progress failed: {e}"))) } pub async fn term(self) -> AppResult<()> { self.inner .ack_with(async_nats::jetstream::AckKind::Term) .await .map_err(|e| AppError::Config(format!("term failed: {e}"))) } } pub struct PullSubscription { stream: async_nats::jetstream::stream::Stream, consumer_name: String, } impl NatsQueue { pub async fn subscribe_broadcast( &self, stream_name: &str, consumer_name: &str, filter_subject: Option<&str>, ) -> AppResult { let full_stream = self.stream_name(stream_name); let stream = self .inner .js .get_stream(&full_stream) .await .map_err(|e| AppError::Config(format!("get stream {full_stream} failed: {e}")))?; let deliver_subject = format!( "deliver.{}.{}.{}", self.inner.stream_prefix, stream_name, consumer_name ); let config = PushConfig { durable_name: Some(consumer_name.to_string()), deliver_subject, deliver_group: Some(consumer_name.to_string()), ack_policy: AckPolicy::Explicit, ack_wait: self.inner.ack_wait, max_deliver: self.inner.max_deliver, filter_subject: filter_subject.unwrap_or(">").to_string(), ..Default::default() }; let consumer = stream .get_or_create_consumer(consumer_name, config) .await .map_err(|e| { AppError::Config(format!("create push consumer {consumer_name} failed: {e}")) })?; let messages = consumer.messages().await.map_err(|e| { AppError::Config(format!("subscribe broadcast {consumer_name} failed: {e}")) })?; Ok(messages) } pub async fn create_pull_consumer( &self, stream_name: &str, consumer_name: &str, filter_subject: Option<&str>, max_batch: usize, ) -> AppResult { let full_stream = self.stream_name(stream_name); let stream = self .inner .js .get_stream(&full_stream) .await .map_err(|e| AppError::Config(format!("get stream {full_stream} failed: {e}")))?; let mut config = PullConfig { durable_name: Some(consumer_name.to_string()), ack_policy: AckPolicy::Explicit, ack_wait: self.inner.ack_wait, max_deliver: self.inner.max_deliver, max_ack_pending: max_batch as i64, ..Default::default() }; if let Some(subject) = filter_subject { config.filter_subject = subject.to_string(); } stream .get_or_create_consumer(consumer_name, config) .await .map_err(|e| { AppError::Config(format!("create pull consumer {consumer_name} failed: {e}")) })?; Ok(PullSubscription { stream, consumer_name: consumer_name.to_string(), }) } pub async fn fetch( &self, subscription: &PullSubscription, batch_size: usize, ) -> AppResult> { let consumer = subscription .stream .get_consumer(&subscription.consumer_name) .await .map_err(|e| AppError::Config(format!("get consumer failed: {e}")))?; let mut messages = consumer .fetch() .max_messages(batch_size) .messages() .await .map_err(|e| AppError::Config(format!("fetch failed: {e}")))?; let mut result = Vec::with_capacity(batch_size); while let Some(msg) = messages.next().await { match msg { Ok(m) => result.push(NatsMessage { inner: m }), Err(e) => { tracing::warn!(error = %e, "fetch message error"); break; } } } Ok(result) } pub async fn subscribe_ephemeral(&self, subject: String) -> AppResult { let sub = self .inner .client .subscribe(subject.clone()) .await .map_err(|e| AppError::Config(format!("subscribe ephemeral {subject} failed: {e}")))?; Ok(sub) } }