Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove piece getter concurrency option #3150

Merged
merged 1 commit into from
Oct 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ const GET_PIECE_MAX_INTERVAL: Duration = Duration::from_secs(40);
/// Arguments for controller
#[derive(Debug, Parser)]
pub(super) struct ControllerArgs {
/// Piece getter concurrency.
///
/// Increasing this value will cause higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Base path where to store P2P network identity
#[arg(long, value_hint = ValueHint::DirPath)]
base_path: Option<PathBuf>,
Expand Down Expand Up @@ -88,7 +83,6 @@ pub(super) async fn controller(
controller_args: ControllerArgs,
) -> anyhow::Result<Pin<Box<dyn Future<Output = anyhow::Result<()>>>>> {
let ControllerArgs {
piece_getter_concurrency,
base_path,
node_rpc_url,
cache_group,
Expand Down Expand Up @@ -186,7 +180,6 @@ pub(super) async fn controller(
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,6 @@ struct RocmPlottingOptions {
/// Arguments for plotter
#[derive(Debug, Parser)]
pub(super) struct PlotterArgs {
/// Piece getter concurrency.
///
/// Increasing this value can cause NATS communication issues if too many messages arrive via
/// NATS, but are not processed quickly enough.
#[arg(long, default_value = "32")]
piece_getter_concurrency: NonZeroUsize,
/// Plotting options only used by CPU plotter
#[clap(flatten)]
cpu_plotting_options: CpuPlottingOptions,
Expand All @@ -154,7 +148,6 @@ where
PosTable: Table,
{
let PlotterArgs {
piece_getter_concurrency,
cpu_plotting_options,
#[cfg(feature = "cuda")]
cuda_plotting_options,
Expand All @@ -169,7 +162,7 @@ where
.expect("Not zero; qed"),
)
.map_err(|error| anyhow!("Failed to instantiate erasure coding: {error}"))?;
let piece_getter = ClusterPieceGetter::new(nats_client.clone(), piece_getter_concurrency);
let piece_getter = ClusterPieceGetter::new(nats_client.clone());

let global_mutex = Arc::default();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,11 +235,6 @@ pub(crate) struct FarmingArgs {
/// one specified endpoint. Format: 127.0.0.1:8080
#[arg(long, aliases = ["metrics-endpoint", "metrics-endpoints"])]
prometheus_listen_on: Vec<SocketAddr>,
/// Piece getter concurrency.
///
/// Increasing this value will cause higher memory usage.
#[arg(long, default_value = "128")]
piece_getter_concurrency: NonZeroUsize,
/// Size of PER FARM thread pool used for farming (mostly for blocking I/O, but also for some
/// compute-intensive operations during proving). Defaults to the number of logical CPUs
/// on UMA systems, or the number of logical CPUs in first NUMA node on NUMA systems, but
Expand Down Expand Up @@ -310,7 +305,6 @@ where
tmp,
mut disk_farms,
prometheus_listen_on,
piece_getter_concurrency,
farming_thread_pool_size,
cpu_plotting_options,
#[cfg(feature = "cuda")]
Expand Down Expand Up @@ -457,7 +451,6 @@ where
..ExponentialBackoff::default()
},
},
piece_getter_concurrency,
);

let farmer_cache_worker_fut = run_future_in_dedicated_thread(
Expand Down
19 changes: 2 additions & 17 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ use crate::farm::{PieceCacheId, PieceCacheOffset};
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use anyhow::anyhow;
use async_lock::Semaphore;
use async_nats::HeaderValue;
use async_trait::async_trait;
use futures::channel::mpsc;
Expand All @@ -26,7 +25,6 @@ use futures::{select, stream, FutureExt, Stream, StreamExt};
use parity_scale_codec::{Decode, Encode};
use parking_lot::Mutex;
use std::collections::{HashMap, HashSet};
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::Arc;
use std::task::Poll;
Expand Down Expand Up @@ -196,14 +194,11 @@ impl GenericStreamRequest for ClusterControllerPiecesRequest {
#[derive(Debug, Clone)]
pub struct ClusterPieceGetter {
nats_client: NatsClient,
request_semaphore: Arc<Semaphore>,
}

#[async_trait]
impl PieceGetter for ClusterPieceGetter {
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.request_semaphore.acquire().await;

if let Some((piece_cache_id, piece_cache_offset)) = self
.nats_client
.request(
Expand Down Expand Up @@ -296,8 +291,6 @@ impl PieceGetter for ClusterPieceGetter {
let mut cached_pieces_by_cache_id = HashMap::<PieceCacheId, Vec<PieceCacheOffset>>::new();

{
let _guard = self.request_semaphore.acquire().await;

let mut cached_pieces = self
.nats_client
.stream_request(
Expand Down Expand Up @@ -325,8 +318,6 @@ impl PieceGetter for ClusterPieceGetter {
let piece_indices_to_download = &piece_indices_to_get;

async move {
let _guard = self.request_semaphore.acquire().await;

let mut pieces_stream = match self
.nats_client
.stream_request(
Expand Down Expand Up @@ -388,8 +379,6 @@ impl PieceGetter for ClusterPieceGetter {
return;
}

let _guard = self.request_semaphore.acquire().await;

let mut pieces_from_controller = match self
.nats_client
.stream_request(
Expand Down Expand Up @@ -448,12 +437,8 @@ impl PieceGetter for ClusterPieceGetter {
impl ClusterPieceGetter {
/// Create new instance
#[inline]
pub fn new(nats_client: NatsClient, request_concurrency: NonZeroUsize) -> Self {
let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get()));
Self {
nats_client,
request_semaphore,
}
pub fn new(nats_client: NatsClient) -> Self {
Self { nats_client }
}
}

Expand Down
13 changes: 1 addition & 12 deletions crates/subspace-farmer/src/farmer_piece_getter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
use crate::farm::plotted_pieces::PlottedPieces;
use crate::farmer_cache::FarmerCache;
use crate::node_client::NodeClient;
use async_lock::{RwLock as AsyncRwLock, Semaphore};
use async_lock::RwLock as AsyncRwLock;
use async_trait::async_trait;
use backoff::backoff::Backoff;
use backoff::future::retry;
Expand All @@ -14,7 +14,6 @@ use futures::stream::FuturesUnordered;
use futures::{stream, FutureExt, Stream, StreamExt};
use std::fmt;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::pin::Pin;
use std::sync::atomic::{AtomicU32, Ordering};
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -44,7 +43,6 @@ struct Inner<FarmIndex, CacheIndex, PV, NC> {
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
request_semaphore: Arc<Semaphore>,
}

/// Farmer-specific piece getter.
Expand Down Expand Up @@ -89,25 +87,20 @@ where
node_client: NC,
plotted_pieces: Arc<AsyncRwLock<PlottedPieces<FarmIndex>>>,
dsn_cache_retry_policy: DsnCacheRetryPolicy,
request_concurrency: NonZeroUsize,
) -> Self {
let request_semaphore = Arc::new(Semaphore::new(request_concurrency.get()));
Self {
inner: Arc::new(Inner {
piece_provider,
farmer_cache,
node_client,
plotted_pieces,
dsn_cache_retry_policy,
request_semaphore,
}),
}
}

/// Fast way to get piece using various caches
pub async fn get_piece_fast(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

self.get_piece_fast_internal(piece_index).await
}

Expand Down Expand Up @@ -163,8 +156,6 @@ where

/// Slow way to get piece using archival storage
pub async fn get_piece_slow(&self, piece_index: PieceIndex) -> Option<Piece> {
let _guard = self.inner.request_semaphore.acquire().await;

self.get_piece_slow_internal(piece_index).await
}

Expand Down Expand Up @@ -230,8 +221,6 @@ where
NC: NodeClient,
{
async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> {
let _guard = self.inner.request_semaphore.acquire().await;

{
let retries = AtomicU32::new(0);
let max_retries = u32::from(self.inner.dsn_cache_retry_policy.max_retries);
Expand Down
Loading