diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index 56a4dc0fa8..9e9c0fc8ad 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -315,7 +315,7 @@ impl PieceGetter for ClusterPieceGetter { let mut getting_from_piece_cache = cached_pieces_by_cache_id .into_iter() .map(|(piece_cache_id, offsets)| { - let piece_indices_to_download = &piece_indices_to_get; + let piece_indices_to_get = &piece_indices_to_get; async move { let mut pieces_stream = match self @@ -348,7 +348,7 @@ impl PieceGetter for ClusterPieceGetter { }; if let Some((piece_index, piece)) = maybe_piece { - piece_indices_to_download.lock().remove(&piece_index); + piece_indices_to_get.lock().remove(&piece_index); tx.unbounded_send((piece_index, Ok(Some(piece)))).expect( "This future isn't polled after receiver is dropped; qed", diff --git a/crates/subspace-farmer/src/disk_piece_cache.rs b/crates/subspace-farmer/src/disk_piece_cache.rs index 529908e3a0..811a170d4e 100644 --- a/crates/subspace-farmer/src/disk_piece_cache.rs +++ b/crates/subspace-farmer/src/disk_piece_cache.rs @@ -15,7 +15,6 @@ use futures::channel::mpsc; use futures::{stream, SinkExt, Stream, StreamExt}; use parking_lot::Mutex; use prometheus_client::registry::Registry; -use std::ops::Deref; use std::path::Path; use std::sync::atomic::{AtomicU8, Ordering}; use std::sync::Arc; @@ -66,15 +65,6 @@ struct FilePool { cursor: AtomicU8, } -impl Deref for FilePool { - type Target = DirectIoFile; - - fn deref(&self) -> &Self::Target { - let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed)); - &self.files[position % PIECES_READING_CONCURRENCY] - } -} - impl FilePool { fn open(path: &Path) -> io::Result { let files = (0..PIECES_READING_CONCURRENCY) @@ -87,12 +77,23 @@ impl FilePool { cursor: AtomicU8::new(0), }) } + + fn read(&self) -> &DirectIoFile { + let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed)); + &self.files[position % PIECES_READING_CONCURRENCY] + } + + fn write(&self) -> &DirectIoFile { + // Always the same file or else overlapping writes will be corrupted due to + // read/modify/write internals, which are in turn caused by alignment requirements + &self.files[0] + } } #[derive(Debug)] struct Inner { id: PieceCacheId, - file: FilePool, + files: FilePool, max_num_elements: u32, metrics: Option, } @@ -242,19 +243,22 @@ impl DiskPieceCache { return Err(DiskPieceCacheError::ZeroCapacity); } - let file = FilePool::open(&directory.join(Self::FILE_NAME))?; + let files = FilePool::open(&directory.join(Self::FILE_NAME))?; let expected_size = u64::from(Self::element_size()) * u64::from(capacity); // Align plot file size for disk sector size let expected_size = expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64; - if file.size()? != expected_size { - // Allocating the whole file (`set_len` below can create a sparse file, which will cause - // writes to fail later) - file.preallocate(expected_size) - .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?; - // Truncating file (if necessary) - file.set_len(expected_size)?; + { + let file = files.write(); + if file.size()? != expected_size { + // Allocating the whole file (`set_len` below can create a sparse file, which will cause + // writes to fail later) + file.preallocate(expected_size) + .map_err(DiskPieceCacheError::CantPreallocateCacheFile)?; + // Truncating file (if necessary) + file.set_len(expected_size)?; + } } // ID for cache is ephemeral unless provided explicitly @@ -264,7 +268,7 @@ impl DiskPieceCache { Ok(Self { inner: Arc::new(Inner { id, - file, + files, max_num_elements: capacity, metrics, }), @@ -353,16 +357,16 @@ impl DiskPieceCache { let element_offset = u64::from(offset) * u64::from(Self::element_size()); let piece_index_bytes = piece_index.to_bytes(); + // File writes are read/write/modify internally, so combine all data here for more efficient + // write + let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE); + bytes.extend_from_slice(&piece_index_bytes); + bytes.extend_from_slice(piece.as_ref()); + bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref()); self.inner - .file - .write_all_at(&piece_index_bytes, element_offset)?; - self.inner - .file - .write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; - self.inner.file.write_all_at( - blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(), - element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, - )?; + .files + .write() + .write_all_at(&bytes, element_offset)?; Ok(()) } @@ -432,7 +436,8 @@ impl DiskPieceCache { element: &mut [u8], ) -> Result, DiskPieceCacheError> { self.inner - .file + .files + .read() .read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?; let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE); diff --git a/crates/subspace-farmer/src/farmer_cache.rs b/crates/subspace-farmer/src/farmer_cache.rs index 539d79bae5..12644e87cd 100644 --- a/crates/subspace-farmer/src/farmer_cache.rs +++ b/crates/subspace-farmer/src/farmer_cache.rs @@ -1188,7 +1188,7 @@ where let pieces_to_read_from_piece_cache = { let caches = self.piece_caches.read().await; // Pieces to read from piece cache grouped by backend for efficiency reasons - let mut reading_from_piece_cache = + let mut pieces_to_read_from_piece_cache = HashMap::)>::new(); for piece_index in piece_indices { @@ -1208,7 +1208,7 @@ where let cache_index = offset.cache_index; let piece_offset = offset.piece_offset; - match reading_from_piece_cache.entry(cache_index) { + match pieces_to_read_from_piece_cache.entry(cache_index) { Entry::Occupied(mut entry) => { let (_backend, pieces) = entry.get_mut(); pieces.insert(piece_offset, (piece_index, key)); @@ -1227,7 +1227,7 @@ where } } - reading_from_piece_cache + pieces_to_read_from_piece_cache }; let (tx, mut rx) = mpsc::unbounded(); diff --git a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs index c86de3e924..1c2b53a846 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plot_cache.rs @@ -220,18 +220,18 @@ impl DiskPlotCache { return Ok(false); }; - let piece_index_bytes = piece_index.to_bytes(); let write_fut = tokio::task::spawn_blocking({ - let piece = piece.clone(); - - move || { - file.write_all_at(&piece_index_bytes, element_offset)?; - file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?; - file.write_all_at( - blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(), - element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64, - ) - } + let piece_index_bytes = piece_index.to_bytes(); + // File writes are read/write/modify internally, so combine all data here for more + // efficient write + let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE); + bytes.extend_from_slice(&piece_index_bytes); + bytes.extend_from_slice(piece.as_ref()); + bytes.extend_from_slice( + blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(), + ); + + move || file.write_all_at(&bytes, element_offset) }); AsyncJoinOnDrop::new(write_fut, false).await??; diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 06ff7f8bc5..bd20905d99 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -674,7 +674,7 @@ where cached_pieces, not_cached_pieces, ); - downloading_stream.insert(piece_index, Box::pin(fut.into_stream())); + downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream())); } if pieces_to_download.len() == num_pieces {