From be49c7472ef2a1f3849474863a6e14a8cfbd18bd Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Sat, 9 Mar 2024 08:22:07 +0200 Subject: [PATCH] Pause plotting, farming and piece reading while proving is happening to give it the highest chance of success --- .../src/plotting.rs | 9 ++++++++ .../src/bin/subspace-farmer/commands/farm.rs | 2 ++ .../subspace-farmer/src/single_disk_farm.rs | 18 ++++++++++++---- .../src/single_disk_farm/farming.rs | 21 +++++++++++++++---- .../src/single_disk_farm/piece_reader.rs | 17 ++++++++++----- .../src/single_disk_farm/plotting.rs | 19 ++++++++++++----- 6 files changed, 68 insertions(+), 18 deletions(-) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 727d3ea4e1a..0ae5ef1d065 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -209,6 +209,7 @@ where sector_metadata_output, table_generators, abort_early, + global_mutex: &Default::default(), }, ) } @@ -345,6 +346,10 @@ where pub table_generators: &'a mut [PosTable::Generator], /// Whether encoding should be aborted early pub abort_early: &'a AtomicBool, + /// Global mutex that can restrict concurrency of resource-intensive operations and make sure + /// that those operations that are very sensitive (like proving) have all the resources + /// available to them for the highest probability of success + pub global_mutex: &'a AsyncMutex<()>, } pub fn encode_sector( @@ -368,6 +373,7 @@ where sector_metadata_output, table_generators, abort_early, + global_mutex, } = encoding_options; if erasure_coding.max_shards() < Record::NUM_S_BUCKETS { @@ -410,6 +416,9 @@ where let mut chunks_scratch = Vec::with_capacity(Record::NUM_S_BUCKETS); loop { + // Take mutex briefly to make sure encoding is allowed right now + global_mutex.lock_blocking(); + // This instead of `while` above because otherwise mutex will be held for // the duration of the loop and will limit concurrency to 1 table generator let Some(((piece_offset, record), encoded_chunks_used)) = diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index e2239cb5d95..0eb948e1589 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -630,6 +630,7 @@ where let (single_disk_farms, plotting_delay_senders) = tokio::task::block_in_place(|| { let handle = Handle::current(); + let global_mutex = Arc::default(); let faster_read_sector_record_chunks_mode_concurrency = &Semaphore::new(1); let (plotting_delay_senders, plotting_delay_receivers) = (0..disk_farms.len()) .map(|_| oneshot::channel()) @@ -664,6 +665,7 @@ where farming_thread_pool_size, plotting_thread_pool_manager: plotting_thread_pool_manager.clone(), plotting_delay: Some(plotting_delay_receiver), + global_mutex: Arc::clone(&global_mutex), disable_farm_locking, faster_read_sector_record_chunks_mode_concurrency, }, diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index fafd197bfcf..830dfb4bb57 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -28,7 +28,7 @@ use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE; use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::{tokio_rayon_spawn_handler, AsyncJoinOnDrop}; use crate::KNOWN_PEERS_CACHE_SIZE; -use async_lock::RwLock; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use derive_more::{Display, From}; use event_listener_primitives::{Bag, HandlerId}; use futures::channel::{mpsc, oneshot}; @@ -298,6 +298,10 @@ pub struct SingleDiskFarmOptions<'a, NC, PG> { /// Notification for plotter to start, can be used to delay plotting until some initialization /// has happened externally pub plotting_delay: Option>, + /// Global mutex that can restrict concurrency of resource-intensive operations and make sure + /// that those operations that are very sensitive (like proving) have all the resources + /// available to them for the highest probability of success + pub global_mutex: Arc>, /// Disable farm locking, for example if file system doesn't support it pub disable_farm_locking: bool, /// Limit concurrency of internal benchmarking between different farms @@ -570,7 +574,7 @@ pub struct SingleDiskFarm { farmer_protocol_info: FarmerProtocolInfo, single_disk_farm_info: SingleDiskFarmInfo, /// Metadata of all sectors plotted so far - sectors_metadata: Arc>>, + sectors_metadata: Arc>>, pieces_in_sector: u16, total_sectors_count: SectorIndex, span: Span, @@ -630,6 +634,7 @@ impl SingleDiskFarm { plotting_thread_pool_manager, plotting_delay, farm_during_initial_plotting, + global_mutex, disable_farm_locking, faster_read_sector_record_chunks_mode_concurrency, } = options; @@ -889,7 +894,7 @@ impl SingleDiskFarm { sectors_metadata.push(sector_metadata); } - Arc::new(RwLock::new(sectors_metadata)) + Arc::new(AsyncRwLock::new(sectors_metadata)) }; #[cfg(not(windows))] @@ -956,7 +961,7 @@ impl SingleDiskFarm { let handlers = Arc::::default(); let (start_sender, mut start_receiver) = broadcast::channel::<()>(1); let (stop_sender, mut stop_receiver) = broadcast::channel::<()>(1); - let modifying_sector_index = Arc::>>::default(); + let modifying_sector_index = Arc::>>::default(); let (sectors_to_plot_sender, sectors_to_plot_receiver) = mpsc::channel(1); // Some sectors may already be plotted, skip them let sectors_indices_left_to_plot = @@ -979,6 +984,7 @@ impl SingleDiskFarm { let plot_file = Arc::clone(&plot_file); let error_sender = Arc::clone(&error_sender); let span = span.clone(); + let global_mutex = Arc::clone(&global_mutex); move || { let _span_guard = span.enter(); @@ -1003,6 +1009,7 @@ impl SingleDiskFarm { record_encoding_concurrency, plotting_thread_pool_manager, stop_receiver: stop_receiver.resubscribe(), + global_mutex: &global_mutex, }; let plotting_fut = async { @@ -1088,6 +1095,7 @@ impl SingleDiskFarm { let mut stop_receiver = stop_sender.subscribe(); let node_client = node_client.clone(); let span = span.clone(); + let global_mutex = Arc::clone(&global_mutex); move || { let _span_guard = span.enter(); @@ -1155,6 +1163,7 @@ impl SingleDiskFarm { slot_info_notifications: slot_info_forwarder_receiver, thread_pool, read_sector_record_chunks_mode, + global_mutex, }; farming::(farming_options).await }; @@ -1198,6 +1207,7 @@ impl SingleDiskFarm { erasure_coding, modifying_sector_index, read_sector_record_chunks_mode, + global_mutex, ); let reading_join_handle = tokio::task::spawn_blocking({ diff --git a/crates/subspace-farmer/src/single_disk_farm/farming.rs b/crates/subspace-farmer/src/single_disk_farm/farming.rs index 5b5ce0d0c39..7de69253935 100644 --- a/crates/subspace-farmer/src/single_disk_farm/farming.rs +++ b/crates/subspace-farmer/src/single_disk_farm/farming.rs @@ -3,7 +3,7 @@ pub mod rayon_files; use crate::node_client; use crate::node_client::NodeClient; use crate::single_disk_farm::Handlers; -use async_lock::RwLock; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::mpsc; use futures::StreamExt; use parity_scale_codec::{Decode, Encode, Error, Input, Output}; @@ -329,14 +329,15 @@ pub(super) struct FarmingOptions { pub(super) reward_address: PublicKey, pub(super) node_client: NC, pub(super) plot_audit: PlotAudit, - pub(super) sectors_metadata: Arc>>, + pub(super) sectors_metadata: Arc>>, pub(super) kzg: Kzg, pub(super) erasure_coding: ErasureCoding, pub(super) handlers: Arc, - pub(super) modifying_sector_index: Arc>>, + pub(super) modifying_sector_index: Arc>>, pub(super) slot_info_notifications: mpsc::Receiver, pub(super) thread_pool: ThreadPool, pub(super) read_sector_record_chunks_mode: ReadSectorRecordChunksMode, + pub(super) global_mutex: Arc>, } /// Starts farming process. @@ -364,6 +365,7 @@ where mut slot_info_notifications, thread_pool, read_sector_record_chunks_mode, + global_mutex, } = farming_options; let farmer_app_info = node_client @@ -378,9 +380,16 @@ where let span = Span::current(); while let Some(slot_info) = slot_info_notifications.next().await { + let slot = slot_info.slot_number; + + // Take mutex briefly to make sure farming is allowed right now + if global_mutex.try_lock().is_some() { + debug!(%slot, "Auditing skipped during concurrent proving"); + continue; + } + let result: Result<(), FarmingError> = try { let start = Instant::now(); - let slot = slot_info.slot_number; let sectors_metadata = sectors_metadata.read().await; debug!(%slot, sector_count = %sectors_metadata.len(), "Reading sectors"); @@ -424,6 +433,10 @@ where time: start.elapsed(), })); + // Take mutex and hold until proving end to make sure nothing else major happens at the + // same time + let _proving_guard = global_mutex.lock().await; + 'solutions_processing: while let Some((sector_index, sector_solutions)) = thread_pool .install(|| { let _span_guard = span.enter(); diff --git a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs index 2edc3d5bb44..e294d2c2d68 100644 --- a/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs +++ b/crates/subspace-farmer/src/single_disk_farm/piece_reader.rs @@ -1,6 +1,6 @@ #[cfg(windows)] use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows; -use async_lock::RwLock; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, StreamExt}; #[cfg(not(windows))] @@ -33,15 +33,17 @@ impl PieceReader { /// /// NOTE: Background future is async, but does blocking operations and should be running in /// dedicated thread. + #[allow(clippy::too_many_arguments)] pub(super) fn new( public_key: PublicKey, pieces_in_sector: u16, #[cfg(not(windows))] plot_file: Arc, #[cfg(windows)] plot_file: Arc, - sectors_metadata: Arc>>, + sectors_metadata: Arc>>, erasure_coding: ErasureCoding, - modifying_sector_index: Arc>>, + modifying_sector_index: Arc>>, read_sector_record_chunks_mode: ReadSectorRecordChunksMode, + global_mutex: Arc>, ) -> (Self, impl Future) where PosTable: Table, @@ -58,6 +60,7 @@ impl PieceReader { modifying_sector_index, read_piece_receiver, read_sector_record_chunks_mode, + global_mutex, ) .await }; @@ -94,11 +97,12 @@ async fn read_pieces( public_key: PublicKey, pieces_in_sector: u16, plot_file: S, - sectors_metadata: Arc>>, + sectors_metadata: Arc>>, erasure_coding: ErasureCoding, - modifying_sector_index: Arc>>, + modifying_sector_index: Arc>>, mut read_piece_receiver: mpsc::Receiver, mode: ReadSectorRecordChunksMode, + global_mutex: Arc>, ) where PosTable: Table, S: ReadAtSync, @@ -171,6 +175,9 @@ async fn read_pieces( let sector_size = sector_size(pieces_in_sector); let sector = plot_file.offset(u64::from(sector_index) * sector_size as u64); + // Take mutex briefly to make sure piece reading is allowed right now + global_mutex.lock().await; + let maybe_piece = read_piece::( &public_key, piece_offset, diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 0e6e79829a5..82a5c3aee02 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -6,7 +6,7 @@ use crate::single_disk_farm::{ use crate::thread_pool_manager::PlottingThreadPoolManager; use crate::utils::AsyncJoinOnDrop; use crate::{node_client, NodeClient}; -use async_lock::RwLock; +use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use atomic::Atomic; use futures::channel::{mpsc, oneshot}; use futures::{select, FutureExt, SinkExt, StreamExt}; @@ -156,12 +156,12 @@ pub(super) struct PlottingOptions<'a, NC, PG> { pub(super) metadata_file: File, #[cfg(windows)] pub(super) metadata_file: UnbufferedIoFileWindows, - pub(super) sectors_metadata: Arc>>, + pub(super) sectors_metadata: Arc>>, pub(super) piece_getter: &'a PG, pub(super) kzg: &'a Kzg, pub(super) erasure_coding: &'a ErasureCoding, pub(super) handlers: Arc, - pub(super) modifying_sector_index: Arc>>, + pub(super) modifying_sector_index: Arc>>, pub(super) sectors_to_plot_receiver: mpsc::Receiver, /// Semaphore for part of the plotting when farmer downloads new sector, allows to limit memory /// usage of the plotting process, permit will be held until the end of the plotting process @@ -169,6 +169,7 @@ pub(super) struct PlottingOptions<'a, NC, PG> { pub(crate) record_encoding_concurrency: NonZeroUsize, pub(super) plotting_thread_pool_manager: PlottingThreadPoolManager, pub(super) stop_receiver: broadcast::Receiver<()>, + pub(super) global_mutex: &'a AsyncMutex<()>, } /// Starts plotting process. @@ -203,6 +204,7 @@ where record_encoding_concurrency, plotting_thread_pool_manager, mut stop_receiver, + global_mutex, } = plotting_options; let abort_early = Arc::new(AtomicBool::new(false)); @@ -292,6 +294,9 @@ where break farmer_app_info; }; + // Take mutex briefly to make sure plotting is allowed right now + global_mutex.lock().await; + let (_downloading_permit, downloaded_sector) = if let Some(downloaded_sector_fut) = maybe_next_downloaded_sector_fut.take() { downloaded_sector_fut @@ -404,6 +409,7 @@ where sector_metadata_output: &mut sector_metadata, table_generators: &mut table_generators, abort_early: &abort_early, + global_mutex, }, )?; @@ -442,6 +448,9 @@ where modifying_sector_index.write().await.replace(sector_index); { + // Take mutex briefly to make sure writing is allowed right now + global_mutex.lock().await; + handlers.sector_update.call_simple(&( sector_index, SectorUpdate::Plotting(SectorPlottingDetails::Writing), @@ -538,7 +547,7 @@ pub(super) struct PlottingSchedulerOptions { pub(super) min_sector_lifetime: HistorySize, pub(super) node_client: NC, pub(super) handlers: Arc, - pub(super) sectors_metadata: Arc>>, + pub(super) sectors_metadata: Arc>>, pub(super) sectors_to_plot_sender: mpsc::Sender, pub(super) initial_plotting_finished: Option>, // Delay between segment header being acknowledged by farmer and potentially triggering @@ -672,7 +681,7 @@ async fn send_plotting_notifications( min_sector_lifetime: HistorySize, node_client: &NC, handlers: &Handlers, - sectors_metadata: Arc>>, + sectors_metadata: Arc>>, last_archived_segment: &Atomic, mut archived_segments_receiver: mpsc::Receiver<()>, mut sectors_to_plot_sender: mpsc::Sender,