Skip to content

Commit

Permalink
Merge pull request #2882 from subspace/skip-duplicate-node-client-not…
Browse files Browse the repository at this point in the history
…ifications

Skip duplicate node client notifications
  • Loading branch information
nazar-pc authored Jul 3, 2024
2 parents 12642a9 + db239f3 commit 0d990c2
Show file tree
Hide file tree
Showing 7 changed files with 160 additions and 41 deletions.
3 changes: 3 additions & 0 deletions crates/subspace-core-primitives/src/segments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,6 +239,9 @@ impl From<SegmentIndex> for HistorySize {
}

impl HistorySize {
/// History size of one
pub const ONE: Self = Self(NonZeroU64::new(1).expect("Not zero; qed"));

/// Create new instance.
pub const fn new(value: NonZeroU64) -> Self {
Self(value)
Expand Down
45 changes: 40 additions & 5 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,30 @@ impl NodeClient for ClusterNodeClient {
async fn subscribe_slot_info(
&self,
) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, NodeClientError> {
let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);
let subscription = self
.nats_client
.subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None)
.await?
.map(move |broadcast| {
*last_slot_info_instance.lock() = broadcast.instance;
.filter_map({
let mut last_slot_number = None;
let last_slot_info_instance = Arc::clone(&self.last_slot_info_instance);

broadcast.slot_info
move |broadcast| {
let slot_info = broadcast.slot_info;

let maybe_slot_info = if let Some(last_slot_number) = last_slot_number
&& last_slot_number >= slot_info.slot_number
{
None
} else {
last_slot_number.replace(slot_info.slot_number);
*last_slot_info_instance.lock() = broadcast.instance;

Some(slot_info)
};

async move { maybe_slot_info }
}
});

Ok(Box::pin(subscription))
Expand Down Expand Up @@ -357,7 +372,27 @@ impl NodeClient for ClusterNodeClient {
.nats_client
.subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None)
.await?
.map(|broadcast| broadcast.archived_segment_header);
.filter_map({
let mut last_archived_segment_index = None;

move |broadcast| {
let archived_segment_header = broadcast.archived_segment_header;
let segment_index = archived_segment_header.segment_index();

let maybe_archived_segment_header = if let Some(last_archived_segment_index) =
last_archived_segment_index
&& last_archived_segment_index >= segment_index
{
None
} else {
last_archived_segment_index.replace(segment_index);

Some(archived_segment_header)
};

async move { maybe_archived_segment_header }
}
});

Ok(Box::pin(subscription))
}
Expand Down
61 changes: 46 additions & 15 deletions crates/subspace-farmer/src/cluster/nats_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ pub struct StreamResponseSubscriber<Response> {
#[deref]
#[deref_mut]
subscriber: Subscriber,
response_subject: String,
buffered_responses: Option<GenericStreamResponses<Response>>,
next_index: u32,
acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>,
Expand Down Expand Up @@ -217,6 +218,7 @@ where
actual_index = %responses.index(),
expected_index = %*projected.next_index,
message_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
"Received unexpected response stream index, aborting stream"
);

Expand All @@ -235,6 +237,7 @@ where
%error,
%index,
message_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
%ack_subject,
"Failed to send acknowledgement for stream response"
);
Expand All @@ -252,6 +255,7 @@ where
warn!(
%error,
response_type = %type_name::<Response>(),
response_subject = %projected.response_subject,
message = %hex::encode(message.payload),
"Failed to decode stream response"
);
Expand All @@ -267,26 +271,45 @@ where
}

impl<Response> StreamResponseSubscriber<Response> {
fn new(subscriber: Subscriber, nats_client: NatsClient) -> Self {
fn new(subscriber: Subscriber, response_subject: String, nats_client: NatsClient) -> Self {
let (acknowledgement_sender, mut acknowledgement_receiver) =
mpsc::unbounded::<(String, u32)>();

let background_task = AsyncJoinOnDrop::new(
tokio::spawn(async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(%error, %subject, %index, "Failed to send acknowledgement");
return;
tokio::spawn({
let response_subject = response_subject.clone();

async move {
while let Some((subject, index)) = acknowledgement_receiver.next().await {
trace!(
%subject,
%index,
%response_subject,
%index,
"Sending stream response acknowledgement"
);
if let Err(error) = nats_client
.publish(subject.clone(), index.to_le_bytes().to_vec().into())
.await
{
warn!(
%error,
%subject,
%index,
%response_subject,
%index,
"Failed to send stream response acknowledgement"
);
return;
}
}
}
}),
true,
);

Self {
response_subject,
subscriber,
buffered_responses: None,
next_index: 0,
Expand Down Expand Up @@ -631,17 +654,25 @@ impl NatsClient {
.client
.subscribe(stream_request.response_subject.clone())
.await?;
debug!(request_type = %type_name::<Request>(), ?subscriber, "Stream request subscription");

let stream_request_subject = subject_with_instance(Request::SUBJECT, instance);
debug!(
request_type = %type_name::<Request>(),
%stream_request_subject,
?subscriber,
"Stream request subscription"
);

self.inner
.client
.publish(
subject_with_instance(Request::SUBJECT, instance),
stream_request.encode().into(),
)
.publish(stream_request_subject, stream_request.encode().into())
.await?;

Ok(StreamResponseSubscriber::new(subscriber, self.clone()))
Ok(StreamResponseSubscriber::new(
subscriber,
stream_request.response_subject,
self.clone(),
))
}

/// Helper method to send responses to requests initiated with [`Self::stream_request`]
Expand Down
6 changes: 6 additions & 0 deletions crates/subspace-farmer/src/cluster/plotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,12 @@ where
PS: Sink<SectorPlottingProgress> + Unpin + Send + 'static,
PS::Error: Error,
{
if !matches!(response, ClusterSectorPlottingProgress::SectorChunk(_)) {
trace!(?response, "Processing plotting response notification");
} else {
trace!("Processing plotting response notification (sector chunk)");
}

match response {
ClusterSectorPlottingProgress::Occupied => {
debug!(%free_instance, "Instance was occupied, retrying #2");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,15 @@ where
let mut slot_info_subscription = client.subscribe_slot_info().await?;

async move {
let mut last_slot_number = None;
while let Some(slot_info) = slot_info_subscription.next().await {
if let Some(last_slot_number) = last_slot_number
&& last_slot_number >= slot_info.slot_number
{
continue;
}
last_slot_number.replace(slot_info.slot_number);

if let Err(error) = slot_info_sender.send(Some(slot_info)) {
warn!(%error, "Failed to proxy slot info notification");
return;
Expand All @@ -156,20 +164,18 @@ where
let segment_headers = Arc::clone(&segment_headers);

async move {
let mut last_archived_segment_index = None;
while let Some(archived_segment_header) =
archived_segments_notifications.next().await
{
let segment_index = archived_segment_header.segment_index();
trace!(
?archived_segment_header,
"New archived archived segment header notification"
);

segment_headers.write().await.push(archived_segment_header);

while let Err(error) = client
.acknowledge_archived_segment_header(
archived_segment_header.segment_index(),
)
.acknowledge_archived_segment_header(segment_index)
.await
{
warn!(
Expand All @@ -178,6 +184,15 @@ where
);
}

if let Some(last_archived_segment_index) = last_archived_segment_index
&& last_archived_segment_index >= segment_index
{
continue;
}
last_archived_segment_index.replace(segment_index);

segment_headers.write().await.push(archived_segment_header);

if let Err(error) =
archived_segment_headers_sender.send(Some(archived_segment_header))
{
Expand Down
59 changes: 44 additions & 15 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,17 @@ where
loop {
select! {
sector_plotting_init_result = sector_plotting_init_fut => {
let sector_plotting_fut = match sector_plotting_init_result {
PlotSingleSectorResult::Scheduled(future) => future,
PlotSingleSectorResult::Skipped => {
break;
}
PlotSingleSectorResult::FatalError(error) => {
return Err(error);
}
};
sectors_being_plotted.push_back(
sector_plotting_init_result?.instrument(info_span!("", %sector_index))
sector_plotting_fut.instrument(info_span!("", %sector_index))
);
break;
}
Expand Down Expand Up @@ -251,6 +260,12 @@ where
}
}

enum PlotSingleSectorResult<F> {
Scheduled(F),
Skipped,
FatalError(PlottingError),
}

struct SectorPlottingResult {
sector_index: SectorIndex,
sector_metadata: SectorMetadataChecksummed,
Expand All @@ -263,7 +278,7 @@ async fn plot_single_sector<'a, NC>(
sector_plotting_options: &'a SectorPlottingOptions<'a, NC>,
sectors_metadata: &'a AsyncRwLock<Vec<SectorMetadataChecksummed>>,
sectors_being_modified: &'a AsyncRwLock<HashSet<SectorIndex>>,
) -> Result<impl Future<Output = Result<SectorPlottingResult, PlottingError>> + 'a, PlottingError>
) -> PlotSingleSectorResult<impl Future<Output = Result<SectorPlottingResult, PlottingError>> + 'a>
where
NC: NodeClient,
{
Expand Down Expand Up @@ -311,21 +326,35 @@ where
// farmer app info to return old data, meaning we're replotting exactly the same sector that
// just expired.
let farmer_app_info = loop {
let farmer_app_info = node_client
.farmer_app_info()
.await
.map_err(|error| PlottingError::FailedToGetFarmerInfo { error })?;
let farmer_app_info = match node_client.farmer_app_info().await {
Ok(farmer_app_info) => farmer_app_info,
Err(error) => {
return PlotSingleSectorResult::FatalError(PlottingError::FailedToGetFarmerInfo {
error,
});
}
};

if let Some(old_sector_metadata) = &maybe_old_sector_metadata {
if farmer_app_info.protocol_info.history_size <= old_sector_metadata.history_size {
debug!(
current_history_size = %farmer_app_info.protocol_info.history_size,
old_sector_history_size = %old_sector_metadata.history_size,
"Latest protocol history size is not yet newer than old sector history \
size, wait for a bit and try again"
);
tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
continue;
if farmer_app_info.protocol_info.min_sector_lifetime == HistorySize::ONE {
debug!(
current_history_size = %farmer_app_info.protocol_info.history_size,
old_sector_history_size = %old_sector_metadata.history_size,
"Latest protocol history size is not yet newer than old sector history \
size, wait for a bit and try again"
);
tokio::time::sleep(FARMER_APP_INFO_RETRY_INTERVAL).await;
continue;
} else {
debug!(
current_history_size = %farmer_app_info.protocol_info.history_size,
old_sector_history_size = %old_sector_metadata.history_size,
"Skipped sector plotting, likely redundant due to redundant archived \
segment notification"
);
return PlotSingleSectorResult::Skipped;
}
}
}

Expand All @@ -352,7 +381,7 @@ where
info!("Plotting sector ({progress:.2}% complete)");
}

Ok(async move {
PlotSingleSectorResult::Scheduled(async move {
let plotted_sector = loop {
match plot_single_sector_internal(
sector_index,
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-rpc-primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl Decode for FarmerAppInfo {
}

/// Information about new slot that just arrived
#[derive(Debug, Copy, Clone, Encode, Decode, Serialize, Deserialize)]
#[derive(Debug, Copy, Clone, Eq, PartialEq, Encode, Decode, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct SlotInfo {
/// Slot number
Expand Down

0 comments on commit 0d990c2

Please sign in to comment.