Skip to content

Commit f59718a

Browse files
feat(new sink): Add possibility to use nats jetstream in nats sink (#20834)
* use nats jetstream as an option * no need to manually flush messages * fix jetstream option annotation + prettify code * generate component docs * add more precise field description + generate docs * check nats stream existence in healthcheck * do not check stream existance * add new field to struct in tests * add changelog * add author to changelog * remove example config from changelog
1 parent 3b6a738 commit f59718a

File tree

7 files changed

+94
-10
lines changed

7 files changed

+94
-10
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
Add possibility to use NATS JetStream in NATS sink. Can be turned on/off via `jetstream` option (default is false).
2+
3+
authors: whatcouldbepizza

src/sinks/nats/config.rs

+56
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use bytes::Bytes;
12
use futures_util::TryFutureExt;
23
use snafu::ResultExt;
34
use vector_lib::codecs::JsonSerializerConfig;
@@ -76,6 +77,14 @@ pub struct NatsSinkConfig {
7677
#[configurable(derived)]
7778
#[serde(default)]
7879
pub(super) request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,
80+
81+
/// Send messages using [Jetstream][jetstream].
82+
///
83+
/// If set, the `subject` must belong to an existing JetStream stream.
84+
///
85+
/// [jetstream]: https://docs.nats.io/nats-concepts/jetstream
86+
#[serde(default)]
87+
pub(super) jetstream: bool,
7988
}
8089

8190
fn default_name() -> String {
@@ -93,6 +102,7 @@ impl GenerateConfig for NatsSinkConfig {
93102
tls: None,
94103
url: "nats://127.0.0.1:4222".into(),
95104
request: Default::default(),
105+
jetstream: Default::default(),
96106
})
97107
.unwrap()
98108
}
@@ -130,8 +140,54 @@ impl NatsSinkConfig {
130140

131141
options.connect(&self.url).await.context(ConnectSnafu)
132142
}
143+
144+
pub(super) async fn publisher(&self) -> Result<NatsPublisher, NatsError> {
145+
let connection = self.connect().await?;
146+
147+
if self.jetstream {
148+
Ok(NatsPublisher::JetStream(async_nats::jetstream::new(
149+
connection,
150+
)))
151+
} else {
152+
Ok(NatsPublisher::Core(connection))
153+
}
154+
}
133155
}
134156

135157
async fn healthcheck(config: NatsSinkConfig) -> crate::Result<()> {
136158
config.connect().map_ok(|_| ()).map_err(|e| e.into()).await
137159
}
160+
161+
pub enum NatsPublisher {
162+
Core(async_nats::Client),
163+
JetStream(async_nats::jetstream::Context),
164+
}
165+
166+
impl NatsPublisher {
167+
pub(super) async fn publish<S: async_nats::subject::ToSubject>(
168+
&self,
169+
subject: S,
170+
payload: Bytes,
171+
) -> Result<(), NatsError> {
172+
match self {
173+
NatsPublisher::Core(client) => {
174+
client
175+
.publish(subject, payload)
176+
.await
177+
.map_err(|e| NatsError::PublishError {
178+
source: Box::new(e),
179+
})
180+
}
181+
NatsPublisher::JetStream(jetstream) => {
182+
let ack = jetstream.publish(subject, payload).await.map_err(|e| {
183+
NatsError::PublishError {
184+
source: Box::new(e),
185+
}
186+
})?;
187+
ack.await.map(|_| ()).map_err(|e| NatsError::PublishError {
188+
source: Box::new(e),
189+
})
190+
}
191+
}
192+
}
193+
}

src/sinks/nats/integration_tests.rs

+13
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ async fn nats_no_auth() {
7878
tls: None,
7979
auth: None,
8080
request: Default::default(),
81+
jetstream: false,
8182
};
8283

8384
let r = publish_and_check(conf).await;
@@ -110,6 +111,7 @@ async fn nats_userpass_auth_valid() {
110111
},
111112
}),
112113
request: Default::default(),
114+
jetstream: false,
113115
};
114116

115117
publish_and_check(conf)
@@ -139,6 +141,7 @@ async fn nats_userpass_auth_invalid() {
139141
},
140142
}),
141143
request: Default::default(),
144+
jetstream: false,
142145
};
143146

144147
let r = publish_and_check(conf).await;
@@ -170,6 +173,7 @@ async fn nats_token_auth_valid() {
170173
},
171174
}),
172175
request: Default::default(),
176+
jetstream: false,
173177
};
174178

175179
let r = publish_and_check(conf).await;
@@ -201,6 +205,7 @@ async fn nats_token_auth_invalid() {
201205
},
202206
}),
203207
request: Default::default(),
208+
jetstream: false,
204209
};
205210

206211
let r = publish_and_check(conf).await;
@@ -233,6 +238,7 @@ async fn nats_nkey_auth_valid() {
233238
},
234239
}),
235240
request: Default::default(),
241+
jetstream: false,
236242
};
237243

238244
let r = publish_and_check(conf).await;
@@ -265,6 +271,7 @@ async fn nats_nkey_auth_invalid() {
265271
},
266272
}),
267273
request: Default::default(),
274+
jetstream: false,
268275
};
269276

270277
let r = publish_and_check(conf).await;
@@ -298,6 +305,7 @@ async fn nats_tls_valid() {
298305
}),
299306
auth: None,
300307
request: Default::default(),
308+
jetstream: false,
301309
};
302310

303311
let r = publish_and_check(conf).await;
@@ -325,6 +333,7 @@ async fn nats_tls_invalid() {
325333
tls: None,
326334
auth: None,
327335
request: Default::default(),
336+
jetstream: false,
328337
};
329338

330339
let r = publish_and_check(conf).await;
@@ -360,6 +369,7 @@ async fn nats_tls_client_cert_valid() {
360369
}),
361370
auth: None,
362371
request: Default::default(),
372+
jetstream: false,
363373
};
364374

365375
let r = publish_and_check(conf).await;
@@ -393,6 +403,7 @@ async fn nats_tls_client_cert_invalid() {
393403
}),
394404
auth: None,
395405
request: Default::default(),
406+
jetstream: false,
396407
};
397408

398409
let r = publish_and_check(conf).await;
@@ -430,6 +441,7 @@ async fn nats_tls_jwt_auth_valid() {
430441
},
431442
}),
432443
request: Default::default(),
444+
jetstream: false,
433445
};
434446

435447
let r = publish_and_check(conf).await;
@@ -467,6 +479,7 @@ async fn nats_tls_jwt_auth_invalid() {
467479
},
468480
}),
469481
request: Default::default(),
482+
jetstream: false,
470483
};
471484

472485
let r = publish_and_check(conf).await;

src/sinks/nats/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -27,4 +27,6 @@ enum NatsError {
2727
Connect { source: async_nats::ConnectError },
2828
#[snafu(display("NATS Server Error: {}", source))]
2929
ServerError { source: async_nats::Error },
30+
#[snafu(display("NATS Publish Error: {}", source))]
31+
PublishError { source: async_nats::Error },
3032
}

src/sinks/nats/service.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ use futures_util::TryFutureExt;
77

88
use crate::sinks::prelude::*;
99

10-
use super::{request_builder::NatsRequest, NatsError};
10+
use super::{config::NatsPublisher, request_builder::NatsRequest, NatsError};
1111

1212
#[derive(Clone)]
1313
pub(super) struct NatsService {
14-
pub(super) connection: Arc<async_nats::Client>,
14+
pub(super) publisher: Arc<NatsPublisher>,
1515
}
1616

1717
pub(super) struct NatsResponse {
@@ -44,13 +44,12 @@ impl Service<NatsRequest> for NatsService {
4444
}
4545

4646
fn call(&mut self, req: NatsRequest) -> Self::Future {
47-
let connection = Arc::clone(&self.connection);
47+
let publisher = Arc::clone(&self.publisher);
4848

4949
Box::pin(async move {
50-
match connection
50+
match publisher
5151
.publish(req.subject, req.bytes)
5252
.map_err(async_nats::Error::from)
53-
.and_then(|_| connection.flush().map_err(Into::into))
5453
.await
5554
{
5655
Err(error) => Err(NatsError::ServerError { source: error }),

src/sinks/nats/sink.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use snafu::ResultExt;
55
use crate::sinks::prelude::*;
66

77
use super::{
8-
config::{NatsSinkConfig, NatsTowerRequestConfigDefaults},
8+
config::{NatsPublisher, NatsSinkConfig, NatsTowerRequestConfigDefaults},
99
request_builder::{NatsEncoder, NatsRequestBuilder},
1010
service::{NatsResponse, NatsService},
1111
EncodingSnafu, NatsError,
@@ -20,7 +20,7 @@ pub(super) struct NatsSink {
2020
request: TowerRequestConfig<NatsTowerRequestConfigDefaults>,
2121
transformer: Transformer,
2222
encoder: Encoder<()>,
23-
connection: Arc<async_nats::Client>,
23+
publisher: Arc<NatsPublisher>,
2424
subject: Template,
2525
}
2626

@@ -42,7 +42,7 @@ impl NatsSink {
4242
}
4343

4444
pub(super) async fn new(config: NatsSinkConfig) -> Result<Self, NatsError> {
45-
let connection = Arc::new(config.connect().await?);
45+
let publisher = Arc::new(config.publisher().await?);
4646
let transformer = config.encoding.transformer();
4747
let serializer = config.encoding.build().context(EncodingSnafu)?;
4848
let encoder = Encoder::<()>::new(serializer);
@@ -51,9 +51,9 @@ impl NatsSink {
5151

5252
Ok(NatsSink {
5353
request,
54-
connection,
5554
transformer,
5655
encoder,
56+
publisher,
5757
subject,
5858
})
5959
}
@@ -71,7 +71,7 @@ impl NatsSink {
7171
let service = ServiceBuilder::new()
7272
.settings(request, NatsRetryLogic)
7373
.service(NatsService {
74-
connection: Arc::clone(&self.connection),
74+
publisher: Arc::clone(&self.publisher),
7575
});
7676

7777
input

website/cue/reference/components/sinks/base/nats.cue

+11
Original file line numberDiff line numberDiff line change
@@ -383,6 +383,17 @@ base: components: sinks: nats: configuration: {
383383
}
384384
}
385385
}
386+
jetstream: {
387+
description: """
388+
Send messages using [Jetstream][jetstream].
389+
390+
If set, the `subject` must belong to an existing JetStream stream.
391+
392+
[jetstream]: https://docs.nats.io/nats-concepts/jetstream
393+
"""
394+
required: false
395+
type: bool: default: false
396+
}
386397
request: {
387398
description: """
388399
Middleware settings for outbound requests.

0 commit comments

Comments
 (0)