Skip to content

Commit

Permalink
Cancel async task on StreamResponseSubscriber drop
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed May 13, 2024
1 parent e127680 commit 045d809
Showing 1 changed file with 19 additions and 13 deletions.
32 changes: 19 additions & 13 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
//! * notifications (typically targeting a particular instance of an app) and corresponding subscriptions (for example solution notification)
//! * broadcasts and corresponding subscriptions (for example slot info broadcast)
use crate::utils::AsyncJoinOnDrop;
use async_nats::{
Client, ConnectOptions, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind,
Subject, SubscribeError, Subscriber, ToServerAddrs,
Expand Down Expand Up @@ -184,6 +185,7 @@ pub struct StreamResponseSubscriber<Response> {
buffered_responses: Option<GenericStreamResponses<Response>>,
next_index: u32,
acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
_background_task: AsyncJoinOnDrop<()>,
_phantom: PhantomData<Response>,
}

Expand Down Expand Up @@ -269,26 +271,30 @@ impl<Response> StreamResponseSubscriber<Response> {
let (acknowledgement_sender, mut acknowledgement_receiver) =
mpsc::unbounded::<(String, u32)>();

tokio::spawn(async move {
// Make sure to use the same exact NATS connection for all acknowledgements in order to
// ensure consistent ordering
let client = &*nats_client;
while let Some((subject, index)) = acknowledgement_receiver.next().await {
if let Err(error) = client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(%error, %subject, %index, "Failed to send acknowledgement");
return;
let background_task = AsyncJoinOnDrop::new(
tokio::spawn(async move {
// Make sure to use the same exact NATS connection for all acknowledgements in order to
// ensure consistent ordering
let client = &*nats_client;
while let Some((subject, index)) = acknowledgement_receiver.next().await {
if let Err(error) = client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(%error, %subject, %index, "Failed to send acknowledgement");
return;
}
}
}
});
}),
true,
);

Self {
subscriber,
buffered_responses: None,
next_index: 0,
acknowledgement_sender,
_background_task: background_task,
_phantom: PhantomData,
}
}
Expand Down

0 comments on commit 045d809

Please sign in to comment.