diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs index 4d4cecf8ac..8cfa727031 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared.rs @@ -26,6 +26,7 @@ pub(in super::super) enum PlottingThreadPriority { impl FromStr for PlottingThreadPriority { type Err = String; + #[inline] fn from_str(s: &str) -> anyhow::Result { match s { "min" => Ok(Self::Min), @@ -37,6 +38,7 @@ impl FromStr for PlottingThreadPriority { } impl fmt::Display for PlottingThreadPriority { + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { Self::Min => "min", @@ -47,6 +49,7 @@ impl fmt::Display for PlottingThreadPriority { } impl From for Option { + #[inline] fn from(value: PlottingThreadPriority) -> Self { match value { PlottingThreadPriority::Min => Some(ThreadPriority::Min), @@ -69,6 +72,7 @@ pub(in super::super) struct DiskFarm { impl FromStr for DiskFarm { type Err = String; + #[inline] fn from_str(s: &str) -> anyhow::Result { let parts = s.split(',').collect::>(); if parts.len() < 2 { diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs index 7df702ad0d..2cc6caa718 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/shared/metrics.rs @@ -18,6 +18,7 @@ pub(in super::super) enum SectorState { } impl fmt::Display for SectorState { + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { Self::NotPlotted => "NotPlotted", diff --git a/crates/subspace-farmer/src/cluster/nats_client.rs b/crates/subspace-farmer/src/cluster/nats_client.rs index b73099a0aa..06522279e2 100644 --- a/crates/subspace-farmer/src/cluster/nats_client.rs +++ b/crates/subspace-farmer/src/cluster/nats_client.rs @@ -13,6 +13,7 @@ //! * notifications (typically targeting a particular instance of an app) and corresponding subscriptions (for example solution notification) //! * broadcasts and corresponding subscriptions (for example slot info broadcast) +use crate::utils::AsyncJoinOnDrop; use async_nats::{ Client, ConnectOptions, HeaderMap, HeaderValue, PublishError, RequestError, RequestErrorKind, Subject, SubscribeError, Subscriber, ToServerAddrs, @@ -20,21 +21,30 @@ use async_nats::{ use backoff::backoff::Backoff; use backoff::ExponentialBackoff; use derive_more::{Deref, DerefMut}; -use futures::{Stream, StreamExt}; +use futures::channel::mpsc; +use futures::stream::FuturesUnordered; +use futures::{FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use std::any::type_name; use std::collections::VecDeque; -use std::fmt; use std::marker::PhantomData; +use std::num::NonZeroUsize; use std::ops::Deref; use std::pin::Pin; +use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::task::{Context, Poll}; +use std::time::Duration; +use std::{fmt, mem}; use thiserror::Error; -use tracing::{debug, warn}; +use tracing::{debug, trace, warn}; use ulid::Ulid; const EXPECTED_MESSAGE_SIZE: usize = 2 * 1024 * 1024; +const ACKNOWLEDGEMENT_TIMEOUT: Duration = Duration::from_mins(1); +/// Requests should time out eventually, but we should set a larger timeout to allow for spikes in +/// load to be absorbed gracefully +const REQUEST_TIMEOUT: Duration = Duration::from_mins(5); /// Generic request with associated response. /// @@ -48,12 +58,11 @@ pub trait GenericRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { type Response: Encode + Decode + fmt::Debug + Send + Sync + 'static; } -/// Generic stream request where response is streamed using [`GenericStreamResponses`]. +/// Generic stream request where response is streamed using [`NatsClient::stream_response`]. /// /// Used for cases where a large payload that doesn't fit into NATS message needs to be sent or /// there is a very large number of messages to send. For simple request/response patten /// [`GenericRequest`] can be used instead. -// TODO: Sequence numbers for streams, backpressure with acknowledgement pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'static { /// Request subject with optional `*` in place of application instance to receive the request const SUBJECT: &'static str; @@ -68,16 +77,30 @@ pub trait GenericStreamRequest: Encode + Decode + fmt::Debug + Send + Sync + 'st #[derive(Debug, Encode, Decode)] pub enum GenericStreamResponses { /// Some responses, but the stream didn't end yet - Continue(VecDeque), + Continue { + /// Monotonically increasing index of responses in a stream + index: u32, + /// Individual responses + responses: VecDeque, + /// Subject where to send acknowledgement of received stream response indices, which acts as + /// a backpressure mechanism + ack_subject: String, + }, /// Remaining responses and this is the end of the stream. - Last(VecDeque), + Last { + /// Monotonically increasing index of responses in a stream + index: u32, + /// Individual responses + responses: VecDeque, + }, } impl From> for VecDeque { + #[inline] fn from(value: GenericStreamResponses) -> Self { match value { - GenericStreamResponses::Continue(responses) => responses, - GenericStreamResponses::Last(responses) => responses, + GenericStreamResponses::Continue { responses, .. } => responses, + GenericStreamResponses::Last { responses, .. } => responses, } } } @@ -85,13 +108,28 @@ impl From> for VecDeque { impl GenericStreamResponses { fn next(&mut self) -> Option { match self { - GenericStreamResponses::Continue(responses) => responses.pop_front(), - GenericStreamResponses::Last(responses) => responses.pop_front(), + GenericStreamResponses::Continue { responses, .. } => responses.pop_front(), + GenericStreamResponses::Last { responses, .. } => responses.pop_front(), + } + } + + fn index(&self) -> u32 { + match self { + GenericStreamResponses::Continue { index, .. } => *index, + GenericStreamResponses::Last { index, .. } => *index, + } + } + + fn ack_subject(&self) -> Option<&str> { + if let GenericStreamResponses::Continue { ack_subject, .. } = self { + Some(ack_subject) + } else { + None } } fn is_last(&self) -> bool { - matches!(self, Self::Last(_)) + matches!(self, Self::Last { .. }) } } @@ -144,7 +182,10 @@ pub struct StreamResponseSubscriber { #[deref] #[deref_mut] subscriber: Subscriber, - buffered_responses: GenericStreamResponses, + buffered_responses: Option>, + next_index: u32, + acknowledgement_sender: mpsc::UnboundedSender<(String, u32)>, + _background_task: AsyncJoinOnDrop<()>, _phantom: PhantomData, } @@ -155,10 +196,15 @@ where type Item = Response; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Some(response) = self.buffered_responses.next() { - return Poll::Ready(Some(response)); - } else if self.buffered_responses.is_last() { - return Poll::Ready(None); + if let Some(buffered_responses) = self.buffered_responses.as_mut() { + if let Some(response) = buffered_responses.next() { + return Poll::Ready(Some(response)); + } else if buffered_responses.is_last() { + return Poll::Ready(None); + } + + self.buffered_responses.take(); + self.next_index += 1; } let mut projected = self.project(); @@ -166,8 +212,37 @@ where Poll::Ready(Some(message)) => { match GenericStreamResponses::::decode(&mut message.payload.as_ref()) { Ok(mut responses) => { + if responses.index() != *projected.next_index { + warn!( + actual_index = %responses.index(), + expected_index = %*projected.next_index, + message_type = %type_name::(), + "Received unexpected response stream index, aborting stream" + ); + + return Poll::Ready(None); + } + + if let Some(ack_subject) = responses.ack_subject() { + let index = responses.index(); + let ack_subject = ack_subject.to_string(); + + if let Err(error) = projected + .acknowledgement_sender + .unbounded_send((ack_subject.clone(), index)) + { + warn!( + %error, + %index, + message_type = %type_name::(), + %ack_subject, + "Failed to send acknowledgement for stream response" + ); + } + } + if let Some(response) = responses.next() { - *projected.buffered_responses = responses; + *projected.buffered_responses = Some(responses); Poll::Ready(Some(response)) } else { Poll::Ready(None) @@ -176,7 +251,7 @@ where Err(error) => { warn!( %error, - message_type = %type_name::(), + response_type = %type_name::(), message = %hex::encode(message.payload), "Failed to decode stream response" ); @@ -191,6 +266,40 @@ where } } +impl StreamResponseSubscriber { + fn new(subscriber: Subscriber, nats_client: NatsClient) -> Self { + let (acknowledgement_sender, mut acknowledgement_receiver) = + mpsc::unbounded::<(String, u32)>(); + + let background_task = AsyncJoinOnDrop::new( + tokio::spawn(async move { + // Make sure to use the same exact NATS connection for all acknowledgements in order to + // ensure consistent ordering + let client = &*nats_client; + while let Some((subject, index)) = acknowledgement_receiver.next().await { + if let Err(error) = client + .publish(subject.clone(), index.to_le_bytes().to_vec().into()) + .await + { + warn!(%error, %subject, %index, "Failed to send acknowledgement"); + return; + } + } + }), + true, + ); + + Self { + subscriber, + buffered_responses: None, + next_index: 0, + acknowledgement_sender, + _background_task: background_task, + _phantom: PhantomData, + } + } +} + /// Generic one-off notification pub trait GenericNotification: Encode + Decode + fmt::Debug + Send + Sync + 'static { /// Notification subject with optional `*` in place of application instance receiving the @@ -254,7 +363,8 @@ where #[derive(Debug)] struct Inner { - client: Client, + clients: Vec, + next_client: AtomicUsize, request_retry_backoff_policy: ExponentialBackoff, approximate_max_message_size: usize, } @@ -268,8 +378,9 @@ pub struct NatsClient { impl Deref for NatsClient { type Target = Client; + #[inline] fn deref(&self) -> &Self::Target { - &self.inner.client + self.client() } } @@ -278,23 +389,37 @@ impl NatsClient { pub async fn new( addrs: A, request_retry_backoff_policy: ExponentialBackoff, + nats_pool_size: NonZeroUsize, ) -> Result { - Self::from_client( - async_nats::connect_with_options( - addrs, - ConnectOptions::default().request_timeout(None), - ) - .await?, + let servers = addrs.to_server_addrs()?.collect::>(); + Self::from_clients( + (0..nats_pool_size.get()) + .map(|_| async { + async_nats::connect_with_options( + &servers, + ConnectOptions::default().request_timeout(Some(REQUEST_TIMEOUT)), + ) + .await + }) + .collect::>() + .collect::>() + .await + .into_iter() + .collect::, _>>()?, request_retry_backoff_policy, ) } /// Create new client from existing NATS instance - pub fn from_client( - client: Client, + pub fn from_clients( + clients: Vec, request_retry_backoff_policy: ExponentialBackoff, ) -> Result { - let max_payload = client.server_info().max_payload; + let max_payload = clients + .first() + .ok_or("Empty list of NATS clients is not supported; qed")? + .server_info() + .max_payload; if max_payload < EXPECTED_MESSAGE_SIZE { return Err(format!( "Max payload {max_payload} is smaller than expected {EXPECTED_MESSAGE_SIZE}, \ @@ -304,7 +429,8 @@ impl NatsClient { } let inner = Inner { - client, + clients, + next_client: AtomicUsize::default(), request_retry_backoff_policy, // Allow up to 90%, the rest will be wrapper data structures, etc. approximate_max_message_size: max_payload * 9 / 10, @@ -334,8 +460,7 @@ impl NatsClient { let mut maybe_retry_backoff = None; let message = loop { match self - .inner - .client + .client() .request(subject.clone(), request.encode().into()) .await { @@ -404,24 +529,205 @@ impl NatsClient { let stream_request = StreamRequest::new(request); let subscriber = self - .inner - .client + .client() .subscribe(stream_request.response_subject.clone()) .await?; - self.inner - .client + self.client() .publish( subject_with_instance(Request::SUBJECT, instance), stream_request.encode().into(), ) .await?; - Ok(StreamResponseSubscriber { - subscriber, - buffered_responses: GenericStreamResponses::Continue(VecDeque::new()), - _phantom: PhantomData, - }) + Ok(StreamResponseSubscriber::new(subscriber, self.clone())) + } + + /// Helper method to send responses to requests initiated with [`Self::stream_request`] + pub async fn stream_response(&self, response_subject: String, response_stream: S) + where + Request: GenericStreamRequest, + S: Stream + Unpin, + { + type Response = + GenericStreamResponses<::Response>; + + let mut response_stream = response_stream.fuse(); + // Make sure to use the same exact NATS connection for all acknowledgements in order to + // ensure consistent ordering + let client = &**self; + + // Pull the first element to measure response size + let first_element = match response_stream.next().await { + Some(first_element) => first_element, + None => { + if let Err(error) = client + .publish( + response_subject.clone(), + Response::::Last { + index: 0, + responses: VecDeque::new(), + } + .encode() + .into(), + ) + .await + { + warn!( + %error, + request_type = %type_name::(), + response_type = %type_name::(), + "Failed to send stream response" + ); + } + + return; + } + }; + let max_responses_per_message = + self.approximate_max_message_size() / first_element.encoded_size(); + + // Initialize buffer that will be reused for responses + let mut buffer = VecDeque::with_capacity(max_responses_per_message); + buffer.push_back(first_element); + + let ack_subject = format!("stream-response-ack.{}", Ulid::new()); + let mut ack_subscription = match self.subscribe(ack_subject.clone()).await { + Ok(ack_subscription) => ack_subscription, + Err(error) => { + warn!( + %error, + request_type = %type_name::(), + response_type = %type_name::(), + "Failed to subscribe to ack subject" + ); + return; + } + }; + let mut index = 0; + + loop { + // Try to fill the buffer + let mut local_response_stream = response_stream + .by_ref() + .take(max_responses_per_message - buffer.len()); + if buffer.is_empty() { + if let Some(element) = local_response_stream.next().await { + buffer.push_back(element); + } + } + while let Some(element) = local_response_stream.next().now_or_never().flatten() { + buffer.push_back(element); + } + + let is_done = response_stream.is_done(); + debug!( + %response_subject, + num_messages = buffer.len(), + %index, + %is_done, + "Publishing stream response messages", + ); + let response = if is_done { + Response::::Last { + index, + responses: buffer, + } + } else { + Response::::Continue { + index, + responses: buffer, + ack_subject: ack_subject.clone(), + } + }; + + if let Err(error) = client + .publish(response_subject.clone(), response.encode().into()) + .await + { + warn!( + %error, + request_type = %type_name::(), + response_type = %type_name::(), + "Failed to send stream response" + ); + return; + } + + if is_done { + return; + } else { + buffer = response.into(); + buffer.clear(); + } + + if index >= 1 { + // Acknowledgements are received with delay + let expected_index = index - 1; + + trace!( + %response_subject, + %expected_index, + "Waiting for acknowledgement" + ); + match tokio::time::timeout(ACKNOWLEDGEMENT_TIMEOUT, ack_subscription.next()).await { + Ok(Some(message)) => { + if let Some(received_index) = message + .payload + .split_at_checked(mem::size_of::()) + .map(|(bytes, _)| { + u32::from_le_bytes( + bytes.try_into().expect("Correctly chunked slice; qed"), + ) + }) + { + debug!( + %response_subject, + %received_index, + "Received acknowledgement" + ); + if received_index != expected_index { + warn!( + %received_index, + %expected_index, + request_type = %type_name::(), + response_type = %type_name::(), + message = %hex::encode(message.payload), + "Unexpected acknowledgement index" + ); + return; + } + } else { + warn!( + request_type = %type_name::(), + response_type = %type_name::(), + message = %hex::encode(message.payload), + "Unexpected acknowledgement message" + ); + return; + } + } + Ok(None) => { + warn!( + request_type = %type_name::(), + response_type = %type_name::(), + "Acknowledgement stream ended unexpectedly" + ); + return; + } + Err(_error) => { + warn!( + request_type = %type_name::(), + response_type = %type_name::(), + "Acknowledgement wait timed out" + ); + return; + } + } + } + + index += 1; + } } /// Make notification without waiting for response @@ -433,8 +739,7 @@ impl NatsClient { where Notification: GenericNotification, { - self.inner - .client + self.client() .publish( subject_with_instance(Notification::SUBJECT, instance), notification.encode().into(), @@ -451,8 +756,7 @@ impl NatsClient { where Broadcast: GenericBroadcast, { - self.inner - .client + self.client() .publish_with_headers( Broadcast::SUBJECT.replace('*', instance), { @@ -467,6 +771,20 @@ impl NatsClient { .await } + /// Simple subscription that will produce decoded stream requests, while skipping messages that + /// fail to decode + pub async fn subscribe_to_stream_requests( + &self, + instance: Option<&str>, + queue_group: Option, + ) -> Result>, SubscribeError> + where + Request: GenericStreamRequest, + { + self.simple_subscribe(Request::SUBJECT, instance, queue_group) + .await + } + /// Simple subscription that will produce decoded notifications, while skipping messages that /// fail to decode pub async fn subscribe_to_notifications( @@ -495,6 +813,12 @@ impl NatsClient { .await } + /// Get NATS client from a pool + fn client(&self) -> &Client { + let client = self.inner.next_client.fetch_add(1, Ordering::Relaxed); + &self.inner.clients[client % self.inner.clients.len()] + } + /// Simple subscription that will produce decoded messages, while skipping messages that fail to /// decode async fn simple_subscribe( @@ -508,13 +832,11 @@ impl NatsClient { { Ok(SubscriberWrapper { subscriber: if let Some(queue_group) = queue_group { - self.inner - .client + self.client() .queue_subscribe(subject_with_instance(subject, instance), queue_group) .await? } else { - self.inner - .client + self.client() .subscribe(subject_with_instance(subject, instance)) .await? }, diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index d727ec8a29..399c7f11d5 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -146,6 +146,7 @@ pub enum ProvingResult { } impl fmt::Display for ProvingResult { + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(match self { ProvingResult::Success => "Success", @@ -174,6 +175,7 @@ pub struct DecodedFarmingError { } impl fmt::Display for DecodedFarmingError { + #[inline] fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { self.error.fmt(f) } @@ -212,6 +214,7 @@ pub enum FarmingError { } impl Encode for FarmingError { + #[inline] fn encode_to(&self, dest: &mut O) { let error = DecodedFarmingError { error: self.to_string(), @@ -223,6 +226,7 @@ impl Encode for FarmingError { } impl Decode for FarmingError { + #[inline] fn decode(input: &mut I) -> Result { DecodedFarmingError::decode(input).map(FarmingError::Decoded) } @@ -230,6 +234,7 @@ impl Decode for FarmingError { impl FarmingError { /// String variant of the error, primarily for monitoring purposes + #[inline] pub fn str_variant(&self) -> &str { match self { FarmingError::FailedToSubscribeSlotInfo { .. } => "FailedToSubscribeSlotInfo", @@ -346,6 +351,7 @@ pub trait HandlerId: Send + Sync + fmt::Debug { } impl HandlerId for event_listener_primitives::HandlerId { + #[inline] fn detach(&self) { self.detach(); } @@ -362,12 +368,15 @@ pub enum FarmId { } impl Encode for FarmId { + #[inline] fn size_hint(&self) -> usize { 1_usize + match self { FarmId::Ulid(ulid) => 0_usize.saturating_add(Encode::size_hint(&ulid.0)), } } + + #[inline] fn encode_to(&self, output: &mut O) { match self { FarmId::Ulid(ulid) => { @@ -381,6 +390,7 @@ impl Encode for FarmId { impl EncodeLike for FarmId {} impl Decode for FarmId { + #[inline] fn decode(input: &mut I) -> Result { match input .read_byte() @@ -397,6 +407,7 @@ impl Decode for FarmId { #[allow(clippy::new_without_default)] impl FarmId { /// Creates new ID + #[inline] pub fn new() -> Self { Self::Ulid(Ulid::new()) } @@ -447,30 +458,37 @@ impl Farm for Box where T: Farm + ?Sized, { + #[inline] fn id(&self) -> &FarmId { self.as_ref().id() } + #[inline] fn total_sectors_count(&self) -> SectorIndex { self.as_ref().total_sectors_count() } + #[inline] fn plotted_sectors(&self) -> Arc { self.as_ref().plotted_sectors() } + #[inline] fn piece_cache(&self) -> Arc { self.as_ref().piece_cache() } + #[inline] fn plot_cache(&self) -> Arc { self.as_ref().plot_cache() } + #[inline] fn piece_reader(&self) -> Arc { self.as_ref().piece_reader() } + #[inline] fn on_sector_update( &self, callback: HandlerFn<(SectorIndex, SectorUpdate)>, @@ -478,6 +496,7 @@ where self.as_ref().on_sector_update(callback) } + #[inline] fn on_farming_notification( &self, callback: HandlerFn, @@ -485,10 +504,12 @@ where self.as_ref().on_farming_notification(callback) } + #[inline] fn on_solution(&self, callback: HandlerFn) -> Box { self.as_ref().on_solution(callback) } + #[inline] fn run(self: Box) -> Pin> + Send>> { (*self).run() } diff --git a/crates/subspace-farmer/src/lib.rs b/crates/subspace-farmer/src/lib.rs index f6da817ec7..1f42a740eb 100644 --- a/crates/subspace-farmer/src/lib.rs +++ b/crates/subspace-farmer/src/lib.rs @@ -13,6 +13,7 @@ let_chains, never_type, slice_flatten, + split_at_checked, trait_alias, try_blocks, type_alias_impl_trait, diff --git a/crates/subspace-farmer/src/piece_cache.rs b/crates/subspace-farmer/src/piece_cache.rs index 40711409da..1761ff4082 100644 --- a/crates/subspace-farmer/src/piece_cache.rs +++ b/crates/subspace-farmer/src/piece_cache.rs @@ -74,6 +74,7 @@ pub struct PieceCache { #[async_trait] impl farm::PieceCache for PieceCache { + #[inline] fn max_num_elements(&self) -> u32 { self.inner.max_num_elements } @@ -123,18 +124,34 @@ impl farm::PieceCache for PieceCache { piece_index: PieceIndex, piece: &Piece, ) -> Result<(), FarmError> { - Ok(self.write_piece(offset, piece_index, piece)?) + let piece = piece.clone(); + let piece_cache = self.clone(); + Ok(AsyncJoinOnDrop::new( + task::spawn_blocking(move || piece_cache.write_piece(offset, piece_index, &piece)), + false, + ) + .await??) } async fn read_piece_index( &self, offset: PieceCacheOffset, ) -> Result, FarmError> { - Ok(self.read_piece_index(offset)?) + let piece_cache = self.clone(); + Ok(AsyncJoinOnDrop::new( + task::spawn_blocking(move || piece_cache.read_piece_index(offset)), + false, + ) + .await??) } async fn read_piece(&self, offset: PieceCacheOffset) -> Result, FarmError> { - Ok(self.read_piece(offset)?) + let piece_cache = self.clone(); + Ok(AsyncJoinOnDrop::new( + task::spawn_blocking(move || piece_cache.read_piece(offset)), + false, + ) + .await??) } } diff --git a/crates/subspace-farmer/src/plotter.rs b/crates/subspace-farmer/src/plotter.rs index dc8ac01bfd..58d8e9cc36 100644 --- a/crates/subspace-farmer/src/plotter.rs +++ b/crates/subspace-farmer/src/plotter.rs @@ -111,10 +111,12 @@ impl

Plotter for Arc

where P: Plotter + Send + Sync, { + #[inline] async fn has_free_capacity(&self) -> Result { self.as_ref().has_free_capacity().await } + #[inline] async fn plot_sector( &self, public_key: PublicKey, @@ -139,6 +141,7 @@ where .await } + #[inline] async fn try_plot_sector( &self, public_key: PublicKey, diff --git a/crates/subspace-farmer/src/plotter/cpu.rs b/crates/subspace-farmer/src/plotter/cpu.rs index cdbfa213f0..ef8a6e146f 100644 --- a/crates/subspace-farmer/src/plotter/cpu.rs +++ b/crates/subspace-farmer/src/plotter/cpu.rs @@ -52,6 +52,7 @@ pub struct CpuPlotter { } impl Drop for CpuPlotter { + #[inline] fn drop(&mut self) { self.abort_early.store(true, Ordering::Release); self.tasks_sender.close_channel(); diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index ab2cb0aaf1..fb5bfdd332 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -599,6 +599,7 @@ pub struct SingleDiskFarm { } impl Drop for SingleDiskFarm { + #[inline] fn drop(&mut self) { self.piece_reader.close_all_readers(); // Make background threads that are waiting to do something exit immediately diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 9dcea9a07c..237d9b82c1 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -88,6 +88,7 @@ impl<'a, 'b, PosTable> Clone for PlotAuditOptions<'a, 'b, PosTable> where PosTable: Table, { + #[inline] fn clone(&self) -> Self { *self } diff --git a/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs index 570455d773..55da0c950d 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming/rayon_files.rs @@ -28,6 +28,7 @@ impl ReadAtSync for &RayonFiles where File: ReadAtSync, { + #[inline] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { (*self).read_at(buf, offset) } diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index e1e3604ddc..840f5c9731 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -33,6 +33,7 @@ pub struct DiskPieceReader { #[async_trait] impl PieceReader for DiskPieceReader { + #[inline] async fn read_piece( &self, sector_index: SectorIndex, diff --git a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs b/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs index 321681a9bf..0957ae5f41 100644 --- a/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs +++ b/crates/subspace-farmer/src/single_disk_farm/unbuffered_io_file_windows.rs @@ -25,12 +25,14 @@ pub struct UnbufferedIoFileWindows { } impl ReadAtSync for UnbufferedIoFileWindows { + #[inline] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { self.read_exact_at(buf, offset) } } impl ReadAtSync for &UnbufferedIoFileWindows { + #[inline] fn read_at(&self, buf: &mut [u8], offset: u64) -> io::Result<()> { (*self).read_at(buf, offset) } diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs index 71479eda01..62cce30552 100644 --- a/crates/subspace-farmer/src/thread_pool_manager.rs +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -29,6 +29,7 @@ pub struct PlottingThreadPoolsGuard { impl Deref for PlottingThreadPoolsGuard { type Target = PlottingThreadPoolPair; + #[inline] fn deref(&self) -> &Self::Target { self.thread_pool_pair .as_ref() @@ -37,6 +38,7 @@ impl Deref for PlottingThreadPoolsGuard { } impl Drop for PlottingThreadPoolsGuard { + #[inline] fn drop(&mut self) { let (mutex, event) = &*self.inner; mutex.lock().thread_pool_pairs.push( diff --git a/crates/subspace-farmer/src/utils.rs b/crates/subspace-farmer/src/utils.rs index 181f74a19d..24a1106b6f 100644 --- a/crates/subspace-farmer/src/utils.rs +++ b/crates/subspace-farmer/src/utils.rs @@ -32,6 +32,7 @@ pub struct AsyncJoinOnDrop { } impl Drop for AsyncJoinOnDrop { + #[inline] fn drop(&mut self) { if let Some(handle) = self.handle.take() { if self.abort_on_drop { @@ -49,6 +50,7 @@ impl Drop for AsyncJoinOnDrop { impl AsyncJoinOnDrop { /// Create new instance. + #[inline] pub fn new(handle: task::JoinHandle, abort_on_drop: bool) -> Self { Self { handle: Some(handle), @@ -60,6 +62,7 @@ impl AsyncJoinOnDrop { impl Future for AsyncJoinOnDrop { type Output = Result; + #[inline] fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { Pin::new( self.handle @@ -74,6 +77,7 @@ impl Future for AsyncJoinOnDrop { pub(crate) struct JoinOnDrop(Option>); impl Drop for JoinOnDrop { + #[inline] fn drop(&mut self) { self.0 .take() @@ -85,6 +89,7 @@ impl Drop for JoinOnDrop { impl JoinOnDrop { // Create new instance + #[inline] pub(crate) fn new(handle: thread::JoinHandle<()>) -> Self { Self(Some(handle)) } diff --git a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs index ef424140f6..616856da95 100644 --- a/crates/subspace-farmer/src/utils/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/utils/farmer_piece_getter.rs @@ -51,6 +51,7 @@ impl fmt::Debug for FarmerPieceGetter { } impl Clone for FarmerPieceGetter { + #[inline] fn clone(&self) -> Self { Self { inner: Arc::clone(&self.inner), @@ -368,6 +369,7 @@ impl fmt::Debug for WeakFarmerPieceGetter } impl Clone for WeakFarmerPieceGetter { + #[inline] fn clone(&self) -> Self { Self { inner: self.inner.clone(), diff --git a/crates/subspace-farmer/src/utils/plotted_pieces.rs b/crates/subspace-farmer/src/utils/plotted_pieces.rs index 3bc23d7b45..97aed2de06 100644 --- a/crates/subspace-farmer/src/utils/plotted_pieces.rs +++ b/crates/subspace-farmer/src/utils/plotted_pieces.rs @@ -17,6 +17,7 @@ struct DummyReader; #[async_trait] impl PieceReader for DummyReader { + #[inline] async fn read_piece( &self, _sector_index: SectorIndex,