Skip to content

Commit 273b63f

Browse files
committed
Made Subject a Template in config
Signed-off-by: Stephen Wakely <[email protected]>
1 parent dfcc055 commit 273b63f

File tree

4 files changed

+20
-21
lines changed

4 files changed

+20
-21
lines changed

src/sinks/nats/config.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ pub struct NatsSinkConfig {
4848
docs::examples = "time.>",
4949
docs::examples = ">"
5050
))]
51-
pub(super) subject: String,
51+
pub(super) subject: Template,
5252

5353
/// The NATS [URL][nats_url] to connect to.
5454
///
@@ -82,7 +82,7 @@ impl GenerateConfig for NatsSinkConfig {
8282
auth: None,
8383
connection_name: "vector".into(),
8484
encoding: JsonSerializerConfig::default().into(),
85-
subject: "from.vector".into(),
85+
subject: Template::try_from("from.vector").unwrap(),
8686
tls: None,
8787
url: "nats://127.0.0.1:4222".into(),
8888
request: Default::default(),

src/sinks/nats/integration_tests.rs

+14-14
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ async fn publish_and_check(conf: NatsSinkConfig) -> Result<(), NatsError> {
3232
.await
3333
.expect("failed to connect with test consumer");
3434
let mut sub = consumer
35-
.subscribe(subject)
35+
.subscribe(subject.to_string())
3636
.await
3737
.expect("failed to subscribe with test consumer");
3838
consumer
@@ -73,7 +73,7 @@ async fn nats_no_auth() {
7373
acknowledgements: Default::default(),
7474
encoding: TextSerializerConfig::default().into(),
7575
connection_name: "".to_owned(),
76-
subject: subject.clone(),
76+
subject: Template::try_from(subject.as_str()).unwrap(),
7777
url,
7878
tls: None,
7979
auth: None,
@@ -100,7 +100,7 @@ async fn nats_userpass_auth_valid() {
100100
acknowledgements: Default::default(),
101101
encoding: TextSerializerConfig::default().into(),
102102
connection_name: "".to_owned(),
103-
subject: subject.clone(),
103+
subject: Template::try_from(subject.as_str()).unwrap(),
104104
url,
105105
tls: None,
106106
auth: Some(NatsAuthConfig::UserPassword {
@@ -129,7 +129,7 @@ async fn nats_userpass_auth_invalid() {
129129
acknowledgements: Default::default(),
130130
encoding: TextSerializerConfig::default().into(),
131131
connection_name: "".to_owned(),
132-
subject: subject.clone(),
132+
subject: Template::try_from(subject.as_str()).unwrap(),
133133
url,
134134
tls: None,
135135
auth: Some(NatsAuthConfig::UserPassword {
@@ -161,7 +161,7 @@ async fn nats_token_auth_valid() {
161161
acknowledgements: Default::default(),
162162
encoding: TextSerializerConfig::default().into(),
163163
connection_name: "".to_owned(),
164-
subject: subject.clone(),
164+
subject: Template::try_from(subject.as_str()).unwrap(),
165165
url,
166166
tls: None,
167167
auth: Some(NatsAuthConfig::Token {
@@ -192,7 +192,7 @@ async fn nats_token_auth_invalid() {
192192
acknowledgements: Default::default(),
193193
encoding: TextSerializerConfig::default().into(),
194194
connection_name: "".to_owned(),
195-
subject: subject.clone(),
195+
subject: Template::try_from(subject.as_str()).unwrap(),
196196
url,
197197
tls: None,
198198
auth: Some(NatsAuthConfig::Token {
@@ -223,7 +223,7 @@ async fn nats_nkey_auth_valid() {
223223
acknowledgements: Default::default(),
224224
encoding: TextSerializerConfig::default().into(),
225225
connection_name: "".to_owned(),
226-
subject: subject.clone(),
226+
subject: Template::try_from(subject.as_str()).unwrap(),
227227
url,
228228
tls: None,
229229
auth: Some(NatsAuthConfig::Nkey {
@@ -255,7 +255,7 @@ async fn nats_nkey_auth_invalid() {
255255
acknowledgements: Default::default(),
256256
encoding: TextSerializerConfig::default().into(),
257257
connection_name: "".to_owned(),
258-
subject: subject.clone(),
258+
subject: Template::try_from(subject.as_str()).unwrap(),
259259
url,
260260
tls: None,
261261
auth: Some(NatsAuthConfig::Nkey {
@@ -287,7 +287,7 @@ async fn nats_tls_valid() {
287287
acknowledgements: Default::default(),
288288
encoding: TextSerializerConfig::default().into(),
289289
connection_name: "".to_owned(),
290-
subject: subject.clone(),
290+
subject: Template::try_from(subject.as_str()).unwrap(),
291291
url,
292292
tls: Some(TlsEnableableConfig {
293293
enabled: Some(true),
@@ -320,7 +320,7 @@ async fn nats_tls_invalid() {
320320
acknowledgements: Default::default(),
321321
encoding: TextSerializerConfig::default().into(),
322322
connection_name: "".to_owned(),
323-
subject: subject.clone(),
323+
subject: Template::try_from(subject.as_str()).unwrap(),
324324
url,
325325
tls: None,
326326
auth: None,
@@ -347,7 +347,7 @@ async fn nats_tls_client_cert_valid() {
347347
acknowledgements: Default::default(),
348348
encoding: TextSerializerConfig::default().into(),
349349
connection_name: "".to_owned(),
350-
subject: subject.clone(),
350+
subject: Template::try_from(subject.as_str()).unwrap(),
351351
url,
352352
tls: Some(TlsEnableableConfig {
353353
enabled: Some(true),
@@ -382,7 +382,7 @@ async fn nats_tls_client_cert_invalid() {
382382
acknowledgements: Default::default(),
383383
encoding: TextSerializerConfig::default().into(),
384384
connection_name: "".to_owned(),
385-
subject: subject.clone(),
385+
subject: Template::try_from(subject.as_str()).unwrap(),
386386
url,
387387
tls: Some(TlsEnableableConfig {
388388
enabled: Some(true),
@@ -415,7 +415,7 @@ async fn nats_tls_jwt_auth_valid() {
415415
acknowledgements: Default::default(),
416416
encoding: TextSerializerConfig::default().into(),
417417
connection_name: "".to_owned(),
418-
subject: subject.clone(),
418+
subject: Template::try_from(subject.as_str()).unwrap(),
419419
url,
420420
tls: Some(TlsEnableableConfig {
421421
enabled: Some(true),
@@ -452,7 +452,7 @@ async fn nats_tls_jwt_auth_invalid() {
452452
acknowledgements: Default::default(),
453453
encoding: TextSerializerConfig::default().into(),
454454
connection_name: "".to_owned(),
455-
subject: subject.clone(),
455+
subject: Template::try_from(subject.as_str()).unwrap(),
456456
url,
457457
tls: Some(TlsEnableableConfig {
458458
enabled: Some(true),

src/sinks/nats/mod.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
44
use snafu::Snafu;
55

6-
use crate::{nats::NatsConfigError, sinks::prelude::*};
6+
use crate::nats::NatsConfigError;
77

88
mod config;
99
#[cfg(feature = "nats-integration-tests")]
@@ -21,8 +21,6 @@ enum NatsError {
2121
Encoding {
2222
source: codecs::encoding::BuildError,
2323
},
24-
#[snafu(display("invalid subject template: {}", source))]
25-
SubjectTemplate { source: TemplateParseError },
2624
#[snafu(display("NATS Config Error: {}", source))]
2725
Config { source: NatsConfigError },
2826
#[snafu(display("NATS Connect Error: {}", source))]

src/sinks/nats/sink.rs

+3-2
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use super::{
88
config::NatsSinkConfig,
99
request_builder::{NatsEncoder, NatsRequestBuilder},
1010
service::{NatsResponse, NatsService},
11-
EncodingSnafu, NatsError, SubjectTemplateSnafu,
11+
EncodingSnafu, NatsError,
1212
};
1313

1414
pub(super) struct NatsEvent {
@@ -47,13 +47,14 @@ impl NatsSink {
4747
let serializer = config.encoding.build().context(EncodingSnafu)?;
4848
let encoder = Encoder::<()>::new(serializer);
4949
let request = config.request;
50+
let subject = config.subject;
5051

5152
Ok(NatsSink {
5253
request,
5354
connection,
5455
transformer,
5556
encoder,
56-
subject: Template::try_from(config.subject).context(SubjectTemplateSnafu)?,
57+
subject,
5758
})
5859
}
5960

0 commit comments

Comments
 (0)