Skip to content

Commit

Permalink
Pause plotting, farming and piece reading while proving is happening …
Browse files Browse the repository at this point in the history
…to give it the highest chance of success
  • Loading branch information
nazar-pc committed Mar 9, 2024
1 parent 916f772 commit be49c74
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 18 deletions.
9 changes: 9 additions & 0 deletions crates/subspace-farmer-components/src/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ where
sector_metadata_output,
table_generators,
abort_early,
global_mutex: &Default::default(),
},
)
}
Expand Down Expand Up @@ -345,6 +346,10 @@ where
pub table_generators: &'a mut [PosTable::Generator],
/// Whether encoding should be aborted early
pub abort_early: &'a AtomicBool,
/// Global mutex that can restrict concurrency of resource-intensive operations and make sure
/// that those operations that are very sensitive (like proving) have all the resources
/// available to them for the highest probability of success
pub global_mutex: &'a AsyncMutex<()>,
}

pub fn encode_sector<PosTable>(
Expand All @@ -368,6 +373,7 @@ where
sector_metadata_output,
table_generators,
abort_early,
global_mutex,
} = encoding_options;

if erasure_coding.max_shards() < Record::NUM_S_BUCKETS {
Expand Down Expand Up @@ -410,6 +416,9 @@ where
let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS);

loop {
// Take mutex briefly to make sure encoding is allowed right now
global_mutex.lock_blocking();

// This instead of `while` above because otherwise mutex will be held for
// the duration of the loop and will limit concurrency to 1 table generator
let Some(((piece_offset, record), encoded_chunks_used)) =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -630,6 +630,7 @@ where

let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| {
let handle = Handle::current();
let global_mutex = Arc::default();
let faster_read_sector_record_chunks_mode_concurrency = &Semaphore::new(1);
let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len())
.map(|_| oneshot::channel())
Expand Down Expand Up @@ -664,6 +665,7 @@ where
farming_thread_pool_size,
plotting_thread_pool_manager: plotting_thread_pool_manager.clone(),
plotting_delay: Some(plotting_delay_receiver),
global_mutex: Arc::clone(&global_mutex),
disable_farm_locking,
faster_read_sector_record_chunks_mode_concurrency,
},
Expand Down
18 changes: 14 additions & 4 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop};
use crate::KNOWN_PEERS_CACHE_SIZE;
use async_lock::RwLock;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use derive_more::{Display, From};
use event_listener_primitives::{Bag, HandlerId};
use futures::channel::{mpsc, oneshot};
Expand Down Expand Up @@ -298,6 +298,10 @@ pub struct SingleDiskFarmOptions<'a, NC, PG> {
/// Notification for plotter to start, can be used to delay plotting until some initialization
/// has happened externally
pub plotting_delay: Option<oneshot::Receiver<()>>,
/// Global mutex that can restrict concurrency of resource-intensive operations and make sure
/// that those operations that are very sensitive (like proving) have all the resources
/// available to them for the highest probability of success
pub global_mutex: Arc<AsyncMutex<()>>,
/// Disable farm locking, for example if file system doesn't support it
pub disable_farm_locking: bool,
/// Limit concurrency of internal benchmarking between different farms
Expand Down Expand Up @@ -570,7 +574,7 @@ pub struct SingleDiskFarm {
farmer_protocol_info: FarmerProtocolInfo,
single_disk_farm_info: SingleDiskFarmInfo,
/// Metadata of all sectors plotted so far
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
pieces_in_sector: u16,
total_sectors_count: SectorIndex,
span: Span,
Expand Down Expand Up @@ -630,6 +634,7 @@ impl SingleDiskFarm {
plotting_thread_pool_manager,
plotting_delay,
farm_during_initial_plotting,
global_mutex,
disable_farm_locking,
faster_read_sector_record_chunks_mode_concurrency,
} = options;
Expand Down Expand Up @@ -889,7 +894,7 @@ impl SingleDiskFarm {
sectors_metadata.push(sector_metadata);
}

Arc::new(RwLock::new(sectors_metadata))
Arc::new(AsyncRwLock::new(sectors_metadata))
};

#[cfg(not(windows))]
Expand Down Expand Up @@ -956,7 +961,7 @@ impl SingleDiskFarm {
let handlers = Arc::<Handlers>::default();
let (start_sender, mut start_receiver) = broadcast::channel::<()>(1);
let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1);
let modifying_sector_index = Arc::<RwLock<Option<SectorIndex>>>::default();
let modifying_sector_index = Arc::<AsyncRwLock<Option<SectorIndex>>>::default();
let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1);
// Some sectors may already be plotted, skip them
let sectors_indices_left_to_plot =
Expand All @@ -979,6 +984,7 @@ impl SingleDiskFarm {
let plot_file = Arc::clone(&plot_file);
let error_sender = Arc::clone(&error_sender);
let span = span.clone();
let global_mutex = Arc::clone(&global_mutex);

move || {
let _span_guard = span.enter();
Expand All @@ -1003,6 +1009,7 @@ impl SingleDiskFarm {
record_encoding_concurrency,
plotting_thread_pool_manager,
stop_receiver: stop_receiver.resubscribe(),
global_mutex: &global_mutex,
};

let plotting_fut = async {
Expand Down Expand Up @@ -1088,6 +1095,7 @@ impl SingleDiskFarm {
let mut stop_receiver = stop_sender.subscribe();
let node_client = node_client.clone();
let span = span.clone();
let global_mutex = Arc::clone(&global_mutex);

move || {
let _span_guard = span.enter();
Expand Down Expand Up @@ -1155,6 +1163,7 @@ impl SingleDiskFarm {
slot_info_notifications: slot_info_forwarder_receiver,
thread_pool,
read_sector_record_chunks_mode,
global_mutex,
};
farming::<PosTable, _, _>(farming_options).await
};
Expand Down Expand Up @@ -1198,6 +1207,7 @@ impl SingleDiskFarm {
erasure_coding,
modifying_sector_index,
read_sector_record_chunks_mode,
global_mutex,
);

let reading_join_handle = tokio::task::spawn_blocking({
Expand Down
21 changes: 17 additions & 4 deletions crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ pub mod rayon_files;
use crate::node_client;
use crate::node_client::NodeClient;
use crate::single_disk_farm::Handlers;
use async_lock::RwLock;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use futures::channel::mpsc;
use futures::StreamExt;
use parity_scale_codec::{Decode, Encode, Error, Input, Output};
Expand Down Expand Up @@ -329,14 +329,15 @@ pub(super) struct FarmingOptions<NC, PlotAudit> {
pub(super) reward_address: PublicKey,
pub(super) node_client: NC,
pub(super) plot_audit: PlotAudit,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) kzg: Kzg,
pub(super) erasure_coding: ErasureCoding,
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) modifying_sector_index: Arc<AsyncRwLock<Option<SectorIndex>>>,
pub(super) slot_info_notifications: mpsc::Receiver<SlotInfo>,
pub(super) thread_pool: ThreadPool,
pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
pub(super) global_mutex: Arc<AsyncMutex<()>>,
}

/// Starts farming process.
Expand Down Expand Up @@ -364,6 +365,7 @@ where
mut slot_info_notifications,
thread_pool,
read_sector_record_chunks_mode,
global_mutex,
} = farming_options;

let farmer_app_info = node_client
Expand All @@ -378,9 +380,16 @@ where
let span = Span::current();

while let Some(slot_info) = slot_info_notifications.next().await {
let slot = slot_info.slot_number;

// Take mutex briefly to make sure farming is allowed right now
if global_mutex.try_lock().is_some() {
debug!(%slot, "Auditing skipped during concurrent proving");
continue;
}

let result: Result<(), FarmingError> = try {
let start = Instant::now();
let slot = slot_info.slot_number;
let sectors_metadata = sectors_metadata.read().await;

debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors");
Expand Down Expand Up @@ -424,6 +433,10 @@ where
time: start.elapsed(),
}));

// Take mutex and hold until proving end to make sure nothing else major happens at the
// same time
let _proving_guard = global_mutex.lock().await;

'solutions_processing: while let Some((sector_index, sector_solutions)) = thread_pool
.install(|| {
let _span_guard = span.enter();
Expand Down
17 changes: 12 additions & 5 deletions crates/subspace-farmer/src/single_disk_farm/piece_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use async_lock::RwLock;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
#[cfg(not(windows))]
Expand Down Expand Up @@ -33,15 +33,17 @@ impl PieceReader {
///
/// NOTE: Background future is async, but does blocking operations and should be running in
/// dedicated thread.
#[allow(clippy::too_many_arguments)]
pub(super) fn new<PosTable>(
public_key: PublicKey,
pieces_in_sector: u16,
#[cfg(not(windows))] plot_file: Arc<File>,
#[cfg(windows)] plot_file: Arc<UnbufferedIoFileWindows>,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
modifying_sector_index: Arc<AsyncRwLock<Option<SectorIndex>>>,
read_sector_record_chunks_mode: ReadSectorRecordChunksMode,
global_mutex: Arc<AsyncMutex<()>>,
) -> (Self, impl Future<Output = ()>)
where
PosTable: Table,
Expand All @@ -58,6 +60,7 @@ impl PieceReader {
modifying_sector_index,
read_piece_receiver,
read_sector_record_chunks_mode,
global_mutex,
)
.await
};
Expand Down Expand Up @@ -94,11 +97,12 @@ async fn read_pieces<PosTable, S>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: S,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
modifying_sector_index: Arc<AsyncRwLock<Option<SectorIndex>>>,
mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
mode: ReadSectorRecordChunksMode,
global_mutex: Arc<AsyncMutex<()>>,
) where
PosTable: Table,
S: ReadAtSync,
Expand Down Expand Up @@ -171,6 +175,9 @@ async fn read_pieces<PosTable, S>(
let sector_size = sector_size(pieces_in_sector);
let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64);

// Take mutex briefly to make sure piece reading is allowed right now
global_mutex.lock().await;

let maybe_piece = read_piece::<PosTable, _, _>(
&public_key,
piece_offset,
Expand Down
19 changes: 14 additions & 5 deletions crates/subspace-farmer/src/single_disk_farm/plotting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::single_disk_farm::{
use crate::thread_pool_manager::PlottingThreadPoolManager;
use crate::utils::AsyncJoinOnDrop;
use crate::{node_client, NodeClient};
use async_lock::RwLock;
use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock};
use atomic::Atomic;
use futures::channel::{mpsc, oneshot};
use futures::{select, FutureExt, SinkExt, StreamExt};
Expand Down Expand Up @@ -156,19 +156,20 @@ pub(super) struct PlottingOptions<'a, NC, PG> {
pub(super) metadata_file: File,
#[cfg(windows)]
pub(super) metadata_file: UnbufferedIoFileWindows,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) piece_getter: &'a PG,
pub(super) kzg: &'a Kzg,
pub(super) erasure_coding: &'a ErasureCoding,
pub(super) handlers: Arc<Handlers>,
pub(super) modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
pub(super) modifying_sector_index: Arc<AsyncRwLock<Option<SectorIndex>>>,
pub(super) sectors_to_plot_receiver: mpsc::Receiver<SectorToPlot>,
/// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory
/// usage of the plotting process, permit will be held until the end of the plotting process
pub(crate) downloading_semaphore: Arc<Semaphore>,
pub(crate) record_encoding_concurrency: NonZeroUsize,
pub(super) plotting_thread_pool_manager: PlottingThreadPoolManager,
pub(super) stop_receiver: broadcast::Receiver<()>,
pub(super) global_mutex: &'a AsyncMutex<()>,
}

/// Starts plotting process.
Expand Down Expand Up @@ -203,6 +204,7 @@ where
record_encoding_concurrency,
plotting_thread_pool_manager,
mut stop_receiver,
global_mutex,
} = plotting_options;

let abort_early = Arc::new(AtomicBool::new(false));
Expand Down Expand Up @@ -292,6 +294,9 @@ where
break farmer_app_info;
};

// Take mutex briefly to make sure plotting is allowed right now
global_mutex.lock().await;

let (_downloading_permit, downloaded_sector) =
if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() {
downloaded_sector_fut
Expand Down Expand Up @@ -404,6 +409,7 @@ where
sector_metadata_output: &mut sector_metadata,
table_generators: &mut table_generators,
abort_early: &abort_early,
global_mutex,
},
)?;

Expand Down Expand Up @@ -442,6 +448,9 @@ where
modifying_sector_index.write().await.replace(sector_index);

{
// Take mutex briefly to make sure writing is allowed right now
global_mutex.lock().await;

handlers.sector_update.call_simple(&(
sector_index,
SectorUpdate::Plotting(SectorPlottingDetails::Writing),
Expand Down Expand Up @@ -538,7 +547,7 @@ pub(super) struct PlottingSchedulerOptions<NC> {
pub(super) min_sector_lifetime: HistorySize,
pub(super) node_client: NC,
pub(super) handlers: Arc<Handlers>,
pub(super) sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
pub(super) sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
pub(super) initial_plotting_finished: Option<oneshot::Sender<()>>,
// Delay between segment header being acknowledged by farmer and potentially triggering
Expand Down Expand Up @@ -672,7 +681,7 @@ async fn send_plotting_notifications<NC>(
min_sector_lifetime: HistorySize,
node_client: &NC,
handlers: &Handlers,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
sectors_metadata: Arc<AsyncRwLock<Vec<SectorMetadataChecksummed>>>,
last_archived_segment: &Atomic<SegmentHeader>,
mut archived_segments_receiver: mpsc::Receiver<()>,
mut sectors_to_plot_sender: mpsc::Sender<SectorToPlot>,
Expand Down

0 comments on commit be49c74

Please sign in to comment.