From 4fd2d6c6f93655bf7fe30972109eae21347f26b2 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 20 Nov 2024 15:10:25 +0200 Subject: [PATCH 1/3] Revert "Simplify GPU plotting logic" This reverts commit 21362c4ff1dd47fcbb5ec59ea7979ba87b2e25f8. --- .../commands/cluster/plotter.rs | 10 +- .../src/bin/subspace-farmer/commands/farm.rs | 10 +- .../subspace-farmer/src/plotter/gpu/cuda.rs | 91 +++++++++++++++---- .../subspace-farmer/src/plotter/gpu/rocm.rs | 73 +++++++++++---- 4 files changed, 148 insertions(+), 36 deletions(-) diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs index 49af09578d..26927d8262 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/cluster/plotter.rs @@ -398,7 +398,10 @@ where cuda_devices .into_iter() .map(|cuda_device| CudaRecordsEncoder::new(cuda_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create CUDA records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, @@ -477,7 +480,10 @@ where rocm_devices .into_iter() .map(|rocm_device| RocmRecordsEncoder::new(rocm_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create ROCm records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, diff --git a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs index a6736ed127..1379f42025 100644 --- a/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs +++ b/crates/subspace-farmer/src/bin/subspace-farmer/commands/farm.rs @@ -1072,7 +1072,10 @@ where cuda_devices .into_iter() .map(|cuda_device| CudaRecordsEncoder::new(cuda_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create CUDA records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, @@ -1151,7 +1154,10 @@ where rocm_devices .into_iter() .map(|rocm_device| RocmRecordsEncoder::new(rocm_device, Arc::clone(&global_mutex))) - .collect(), + .collect::>() + .map_err(|error| { + anyhow::anyhow!("Failed to create ROCm records encoder: {error}") + })?, global_mutex, kzg, erasure_coding, diff --git a/crates/subspace-farmer/src/plotter/gpu/cuda.rs b/crates/subspace-farmer/src/plotter/gpu/cuda.rs index bbbedff731..370439edf5 100644 --- a/crates/subspace-farmer/src/plotter/gpu/cuda.rs +++ b/crates/subspace-farmer/src/plotter/gpu/cuda.rs @@ -2,6 +2,9 @@ use crate::plotter::gpu::GpuRecordsEncoder; use async_lock::Mutex as AsyncMutex; +use parking_lot::Mutex; +use rayon::{current_thread_index, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; +use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use subspace_core_primitives::pieces::{PieceOffset, Record}; @@ -14,6 +17,7 @@ use subspace_proof_of_space_gpu::cuda::CudaDevice; #[derive(Debug)] pub struct CudaRecordsEncoder { cuda_device: CudaDevice, + thread_pool: ThreadPool, global_mutex: Arc>, } @@ -34,23 +38,50 @@ impl RecordsEncoder for CudaRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - for ((piece_offset, record), mut encoded_chunks_used) in (PieceOffset::ZERO..) - .zip(records.iter_mut()) - .zip(sector_contents_map.iter_record_bitfields_mut()) - { - // Take mutex briefly to make sure encoding is allowed right now - self.global_mutex.lock_blocking(); + self.thread_pool.install(|| { + let iter = Mutex::new( + (PieceOffset::ZERO..) + .zip(records.iter_mut()) + .zip(sector_contents_map.iter_record_bitfields_mut()), + ); + let plotting_error = Mutex::new(None::); - let pos_seed = sector_id.derive_evaluation_seed(piece_offset); + rayon::scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| loop { + // Take mutex briefly to make sure encoding is allowed right now + self.global_mutex.lock_blocking(); - self.cuda_device - .generate_and_encode_pospace(&pos_seed, record, encoded_chunks_used.iter_mut()) - .map_err(anyhow::Error::msg)?; + // This instead of `while` above because otherwise mutex will be held for the + // duration of the loop and will limit concurrency to 1 record + let Some(((piece_offset, record), mut encoded_chunks_used)) = + iter.lock().next() + else { + return; + }; + let pos_seed = sector_id.derive_evaluation_seed(piece_offset); - if abort_early.load(Ordering::Relaxed) { - break; + if let Err(error) = self.cuda_device.generate_and_encode_pospace( + &pos_seed, + record, + encoded_chunks_used.iter_mut(), + ) { + plotting_error.lock().replace(error); + return; + } + + if abort_early.load(Ordering::Relaxed) { + return; + } + }); + }); + + let plotting_error = plotting_error.lock().take(); + if let Some(error) = plotting_error { + return Err(anyhow::Error::msg(error)); } - } + + Ok(()) + })?; Ok(sector_contents_map) } @@ -58,10 +89,38 @@ impl RecordsEncoder for CudaRecordsEncoder { impl CudaRecordsEncoder { /// Create new instance - pub fn new(cuda_device: CudaDevice, global_mutex: Arc>) -> Self { - Self { + pub fn new( + cuda_device: CudaDevice, + global_mutex: Arc>, + ) -> Result { + let id = cuda_device.id(); + let thread_name = move |thread_index| format!("cuda-{id}.{thread_index}"); + // TODO: remove this panic handler when rayon logs panic_info + // https://github.com/rayon-rs/rayon/issues/1208 + let panic_handler = move |panic_info| { + if let Some(index) = current_thread_index() { + eprintln!("panic on thread {}: {:?}", thread_name(index), panic_info); + } else { + // We want to guarantee exit, rather than panicking in a panic handler. + eprintln!( + "rayon panic handler called on non-rayon thread: {:?}", + panic_info + ); + } + exit(1); + }; + + let thread_pool = ThreadPoolBuilder::new() + .thread_name(thread_name) + .panic_handler(panic_handler) + // Make sure there is overlap between records, so GPU is almost always busy + .num_threads(2) + .build()?; + + Ok(Self { cuda_device, + thread_pool, global_mutex, - } + }) } } diff --git a/crates/subspace-farmer/src/plotter/gpu/rocm.rs b/crates/subspace-farmer/src/plotter/gpu/rocm.rs index 2aef4c482a..c86161f47a 100644 --- a/crates/subspace-farmer/src/plotter/gpu/rocm.rs +++ b/crates/subspace-farmer/src/plotter/gpu/rocm.rs @@ -2,6 +2,8 @@ use crate::plotter::gpu::GpuRecordsEncoder; use async_lock::Mutex as AsyncMutex; +use parking_lot::Mutex; +use rayon::{ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use subspace_core_primitives::pieces::{PieceOffset, Record}; @@ -14,6 +16,7 @@ use subspace_proof_of_space_gpu::rocm::RocmDevice; #[derive(Debug)] pub struct RocmRecordsEncoder { rocm_device: RocmDevice, + thread_pool: ThreadPool, global_mutex: Arc>, } @@ -34,23 +37,50 @@ impl RecordsEncoder for RocmRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - for ((piece_offset, record), mut encoded_chunks_used) in (PieceOffset::ZERO..) - .zip(records.iter_mut()) - .zip(sector_contents_map.iter_record_bitfields_mut()) - { - // Take mutex briefly to make sure encoding is allowed right now - self.global_mutex.lock_blocking(); + self.thread_pool.install(|| { + let iter = Mutex::new( + (PieceOffset::ZERO..) + .zip(records.iter_mut()) + .zip(sector_contents_map.iter_record_bitfields_mut()), + ); + let plotting_error = Mutex::new(None::); - let pos_seed = sector_id.derive_evaluation_seed(piece_offset); + rayon::scope(|scope| { + scope.spawn_broadcast(|_scope, _ctx| loop { + // Take mutex briefly to make sure encoding is allowed right now + self.global_mutex.lock_blocking(); - self.rocm_device - .generate_and_encode_pospace(&pos_seed, record, encoded_chunks_used.iter_mut()) - .map_err(anyhow::Error::msg)?; + // This instead of `while` above because otherwise mutex will be held for the + // duration of the loop and will limit concurrency to 1 record + let Some(((piece_offset, record), mut encoded_chunks_used)) = + iter.lock().next() + else { + return; + }; + let pos_seed = sector_id.derive_evaluation_seed(piece_offset); - if abort_early.load(Ordering::Relaxed) { - break; + if let Err(error) = self.rocm_device.generate_and_encode_pospace( + &pos_seed, + record, + encoded_chunks_used.iter_mut(), + ) { + plotting_error.lock().replace(error); + return; + } + + if abort_early.load(Ordering::Relaxed) { + return; + } + }); + }); + + let plotting_error = plotting_error.lock().take(); + if let Some(error) = plotting_error { + return Err(anyhow::Error::msg(error)); } - } + + Ok(()) + })?; Ok(sector_contents_map) } @@ -58,10 +88,21 @@ impl RecordsEncoder for RocmRecordsEncoder { impl RocmRecordsEncoder { /// Create new instance - pub fn new(rocm_device: RocmDevice, global_mutex: Arc>) -> Self { - Self { + pub fn new( + rocm_device: RocmDevice, + global_mutex: Arc>, + ) -> Result { + let id = rocm_device.id(); + let thread_pool = ThreadPoolBuilder::new() + .thread_name(move |thread_index| format!("rocm-{id}.{thread_index}")) + // Make sure there is overlap between records, so GPU is almost always busy + .num_threads(2) + .build()?; + + Ok(Self { rocm_device, + thread_pool, global_mutex, - } + }) } } From ff1995dd6310c9783e6ae9813729e9d5f69fea7a Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 20 Nov 2024 15:13:12 +0200 Subject: [PATCH 2/3] Add the same panic handling in ROCm code as in CUDA --- .../subspace-farmer/src/plotter/gpu/rocm.rs | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/crates/subspace-farmer/src/plotter/gpu/rocm.rs b/crates/subspace-farmer/src/plotter/gpu/rocm.rs index c86161f47a..86a250cf93 100644 --- a/crates/subspace-farmer/src/plotter/gpu/rocm.rs +++ b/crates/subspace-farmer/src/plotter/gpu/rocm.rs @@ -3,7 +3,8 @@ use crate::plotter::gpu::GpuRecordsEncoder; use async_lock::Mutex as AsyncMutex; use parking_lot::Mutex; -use rayon::{ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; +use rayon::{current_thread_index, ThreadPool, ThreadPoolBuildError, ThreadPoolBuilder}; +use std::process::exit; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use subspace_core_primitives::pieces::{PieceOffset, Record}; @@ -93,8 +94,25 @@ impl RocmRecordsEncoder { global_mutex: Arc>, ) -> Result { let id = rocm_device.id(); + let thread_name = move |thread_index| format!("rocm-{id}.{thread_index}"); + // TODO: remove this panic handler when rayon logs panic_info + // https://github.com/rayon-rs/rayon/issues/1208 + let panic_handler = move |panic_info| { + if let Some(index) = current_thread_index() { + eprintln!("panic on thread {}: {:?}", thread_name(index), panic_info); + } else { + // We want to guarantee exit, rather than panicking in a panic handler. + eprintln!( + "rayon panic handler called on non-rayon thread: {:?}", + panic_info + ); + } + exit(1); + }; + let thread_pool = ThreadPoolBuilder::new() - .thread_name(move |thread_index| format!("rocm-{id}.{thread_index}")) + .thread_name(thread_name) + .panic_handler(panic_handler) // Make sure there is overlap between records, so GPU is almost always busy .num_threads(2) .build()?; From 29b30d02c80f973c5e881ffa58440500f6298b57 Mon Sep 17 00:00:00 2001 From: Nazar Mokrynskyi Date: Wed, 20 Nov 2024 15:58:57 +0200 Subject: [PATCH 3/3] Tiny simplification --- crates/subspace-farmer/src/plotter/gpu/cuda.rs | 8 +++----- crates/subspace-farmer/src/plotter/gpu/rocm.rs | 8 +++----- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/crates/subspace-farmer/src/plotter/gpu/cuda.rs b/crates/subspace-farmer/src/plotter/gpu/cuda.rs index 370439edf5..84af098f9d 100644 --- a/crates/subspace-farmer/src/plotter/gpu/cuda.rs +++ b/crates/subspace-farmer/src/plotter/gpu/cuda.rs @@ -38,7 +38,7 @@ impl RecordsEncoder for CudaRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - self.thread_pool.install(|| { + { let iter = Mutex::new( (PieceOffset::ZERO..) .zip(records.iter_mut()) @@ -46,7 +46,7 @@ impl RecordsEncoder for CudaRecordsEncoder { ); let plotting_error = Mutex::new(None::); - rayon::scope(|scope| { + self.thread_pool.scope(|scope| { scope.spawn_broadcast(|_scope, _ctx| loop { // Take mutex briefly to make sure encoding is allowed right now self.global_mutex.lock_blocking(); @@ -79,9 +79,7 @@ impl RecordsEncoder for CudaRecordsEncoder { if let Some(error) = plotting_error { return Err(anyhow::Error::msg(error)); } - - Ok(()) - })?; + } Ok(sector_contents_map) } diff --git a/crates/subspace-farmer/src/plotter/gpu/rocm.rs b/crates/subspace-farmer/src/plotter/gpu/rocm.rs index 86a250cf93..db875b83e1 100644 --- a/crates/subspace-farmer/src/plotter/gpu/rocm.rs +++ b/crates/subspace-farmer/src/plotter/gpu/rocm.rs @@ -38,7 +38,7 @@ impl RecordsEncoder for RocmRecordsEncoder { .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); - self.thread_pool.install(|| { + { let iter = Mutex::new( (PieceOffset::ZERO..) .zip(records.iter_mut()) @@ -46,7 +46,7 @@ impl RecordsEncoder for RocmRecordsEncoder { ); let plotting_error = Mutex::new(None::); - rayon::scope(|scope| { + self.thread_pool.scope(|scope| { scope.spawn_broadcast(|_scope, _ctx| loop { // Take mutex briefly to make sure encoding is allowed right now self.global_mutex.lock_blocking(); @@ -79,9 +79,7 @@ impl RecordsEncoder for RocmRecordsEncoder { if let Some(error) = plotting_error { return Err(anyhow::Error::msg(error)); } - - Ok(()) - })?; + } Ok(sector_contents_map) }