203 lines
6.1 KiB
Rust
203 lines
6.1 KiB
Rust
use std::time::Duration;
|
|
|
|
use async_nats::jetstream::consumer::AckPolicy;
|
|
use async_nats::jetstream::consumer::pull::Config as PullConfig;
|
|
use async_nats::jetstream::consumer::push::Config as PushConfig;
|
|
use async_nats::jetstream::consumer::push::Messages;
|
|
use futures_util::StreamExt;
|
|
use serde::de::DeserializeOwned;
|
|
|
|
use crate::error::{AppError, AppResult};
|
|
|
|
use super::NatsQueue;
|
|
|
|
pub struct NatsMessage {
|
|
inner: async_nats::jetstream::Message,
|
|
}
|
|
|
|
impl NatsMessage {
|
|
pub fn subject(&self) -> &str {
|
|
self.inner.message.subject.as_str()
|
|
}
|
|
|
|
pub fn payload(&self) -> &[u8] {
|
|
&self.inner.message.payload
|
|
}
|
|
|
|
pub fn payload_json<T: DeserializeOwned>(&self) -> AppResult<T> {
|
|
serde_json::from_slice(&self.inner.message.payload).map_err(AppError::from)
|
|
}
|
|
|
|
pub fn headers(&self) -> Option<&async_nats::HeaderMap> {
|
|
self.inner.message.headers.as_ref()
|
|
}
|
|
|
|
pub async fn ack(self) -> AppResult<()> {
|
|
self.inner
|
|
.ack()
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("ack failed: {e}")))
|
|
}
|
|
|
|
pub async fn nack(self) -> AppResult<()> {
|
|
self.inner
|
|
.ack_with(async_nats::jetstream::AckKind::Nak(None))
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("nack failed: {e}")))
|
|
}
|
|
|
|
pub async fn nack_with_delay(self, delay: Duration) -> AppResult<()> {
|
|
self.inner
|
|
.ack_with(async_nats::jetstream::AckKind::Nak(Some(delay)))
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("nack with delay failed: {e}")))
|
|
}
|
|
|
|
pub async fn ack_in_progress(&self) -> AppResult<()> {
|
|
self.inner
|
|
.ack_with(async_nats::jetstream::AckKind::Progress)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("ack in progress failed: {e}")))
|
|
}
|
|
|
|
pub async fn term(self) -> AppResult<()> {
|
|
self.inner
|
|
.ack_with(async_nats::jetstream::AckKind::Term)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("term failed: {e}")))
|
|
}
|
|
}
|
|
|
|
pub struct PullSubscription {
|
|
stream: async_nats::jetstream::stream::Stream,
|
|
consumer_name: String,
|
|
}
|
|
|
|
impl NatsQueue {
|
|
pub async fn subscribe_broadcast(
|
|
&self,
|
|
stream_name: &str,
|
|
consumer_name: &str,
|
|
filter_subject: Option<&str>,
|
|
) -> AppResult<Messages> {
|
|
let full_stream = self.stream_name(stream_name);
|
|
let stream = self
|
|
.inner
|
|
.js
|
|
.get_stream(&full_stream)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("get stream {full_stream} failed: {e}")))?;
|
|
|
|
let deliver_subject = format!(
|
|
"deliver.{}.{}.{}",
|
|
self.inner.stream_prefix, stream_name, consumer_name
|
|
);
|
|
|
|
let config = PushConfig {
|
|
durable_name: Some(consumer_name.to_string()),
|
|
deliver_subject,
|
|
deliver_group: Some(consumer_name.to_string()),
|
|
ack_policy: AckPolicy::Explicit,
|
|
ack_wait: self.inner.ack_wait,
|
|
max_deliver: self.inner.max_deliver,
|
|
filter_subject: filter_subject.unwrap_or(">").to_string(),
|
|
..Default::default()
|
|
};
|
|
|
|
let consumer = stream
|
|
.get_or_create_consumer(consumer_name, config)
|
|
.await
|
|
.map_err(|e| {
|
|
AppError::Config(format!("create push consumer {consumer_name} failed: {e}"))
|
|
})?;
|
|
|
|
let messages = consumer.messages().await.map_err(|e| {
|
|
AppError::Config(format!("subscribe broadcast {consumer_name} failed: {e}"))
|
|
})?;
|
|
|
|
Ok(messages)
|
|
}
|
|
|
|
pub async fn create_pull_consumer(
|
|
&self,
|
|
stream_name: &str,
|
|
consumer_name: &str,
|
|
filter_subject: Option<&str>,
|
|
max_batch: usize,
|
|
) -> AppResult<PullSubscription> {
|
|
let full_stream = self.stream_name(stream_name);
|
|
let stream = self
|
|
.inner
|
|
.js
|
|
.get_stream(&full_stream)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("get stream {full_stream} failed: {e}")))?;
|
|
|
|
let mut config = PullConfig {
|
|
durable_name: Some(consumer_name.to_string()),
|
|
ack_policy: AckPolicy::Explicit,
|
|
ack_wait: self.inner.ack_wait,
|
|
max_deliver: self.inner.max_deliver,
|
|
max_ack_pending: max_batch as i64,
|
|
..Default::default()
|
|
};
|
|
|
|
if let Some(subject) = filter_subject {
|
|
config.filter_subject = subject.to_string();
|
|
}
|
|
|
|
stream
|
|
.get_or_create_consumer(consumer_name, config)
|
|
.await
|
|
.map_err(|e| {
|
|
AppError::Config(format!("create pull consumer {consumer_name} failed: {e}"))
|
|
})?;
|
|
|
|
Ok(PullSubscription {
|
|
stream,
|
|
consumer_name: consumer_name.to_string(),
|
|
})
|
|
}
|
|
|
|
pub async fn fetch(
|
|
&self,
|
|
subscription: &PullSubscription,
|
|
batch_size: usize,
|
|
) -> AppResult<Vec<NatsMessage>> {
|
|
let consumer = subscription
|
|
.stream
|
|
.get_consumer(&subscription.consumer_name)
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("get consumer failed: {e}")))?;
|
|
|
|
let mut messages = consumer
|
|
.fetch()
|
|
.max_messages(batch_size)
|
|
.messages()
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("fetch failed: {e}")))?;
|
|
|
|
let mut result = Vec::with_capacity(batch_size);
|
|
while let Some(msg) = messages.next().await {
|
|
match msg {
|
|
Ok(m) => result.push(NatsMessage { inner: m }),
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "fetch message error");
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
Ok(result)
|
|
}
|
|
|
|
pub async fn subscribe_ephemeral(&self, subject: String) -> AppResult<async_nats::Subscriber> {
|
|
let sub = self
|
|
.inner
|
|
.client
|
|
.subscribe(subject.clone())
|
|
.await
|
|
.map_err(|e| AppError::Config(format!("subscribe ephemeral {subject} failed: {e}")))?;
|
|
Ok(sub)
|
|
}
|
|
}
|