Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new sink): Add possibility to use nats jetstream in nats sink #20834

Merged
Prev Previous commit
Next Next commit
flush core messages
  • Loading branch information
whatcouldbepizza committed Jul 23, 2024
commit 602ba0d42287f2548a15cb066370ccff85dea104
7 changes: 7 additions & 0 deletions src/sinks/nats/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,4 +190,11 @@ impl NatsPublisher {
}
}
}

pub(super) async fn flush(&self) -> Result<(), ()> {
if let NatsPublisher::Core(client) = self {
return client.flush().map_err(|_| ()).await;
}
Ok(())
}
}
7 changes: 5 additions & 2 deletions src/sinks/nats/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl NatsSink {
publisher: Arc::clone(&self.publisher),
});

input
let result = input
.filter_map(|event| std::future::ready(self.make_nats_event(event)))
.request_builder(default_request_builder_concurrency_limit(), request_builder)
.filter_map(|request| async move {
Expand All @@ -89,7 +89,10 @@ impl NatsSink {
.into_driver(service)
.protocol("nats")
.run()
.await
.await;

self.publisher.flush().await?;
return result;
}
}

Expand Down
Loading