Skip to content

Commit

Permalink
Merge pull request #3154 from autonomys/farmer-fixes
Browse files Browse the repository at this point in the history
Farmer fixes
  • Loading branch information
nazar-pc authored Oct 21, 2024
2 parents bfd967e + 5826692 commit 6693a10
Show file tree
Hide file tree
Showing 5 changed files with 52 additions and 47 deletions.
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl PieceGetter for ClusterPieceGetter {
let mut getting_from_piece_cache = cached_pieces_by_cache_id
.into_iter()
.map(|(piece_cache_id, offsets)| {
let piece_indices_to_download = &piece_indices_to_get;
let piece_indices_to_get = &piece_indices_to_get;

async move {
let mut pieces_stream = match self
Expand Down Expand Up @@ -348,7 +348,7 @@ impl PieceGetter for ClusterPieceGetter {
};

if let Some((piece_index, piece)) = maybe_piece {
piece_indices_to_download.lock().remove(&piece_index);
piece_indices_to_get.lock().remove(&piece_index);

tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
"This future isn't polled after receiver is dropped; qed",
Expand Down
65 changes: 35 additions & 30 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::ops::Deref;
use std::path::Path;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -66,15 +65,6 @@ struct FilePool {
cursor: AtomicU8,
}

impl Deref for FilePool {
type Target = DirectIoFile;

fn deref(&self) -> &Self::Target {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}
}

impl FilePool {
fn open(path: &Path) -> io::Result<Self> {
let files = (0..PIECES_READING_CONCURRENCY)
Expand All @@ -87,12 +77,23 @@ impl FilePool {
cursor: AtomicU8::new(0),
})
}

fn read(&self) -> &DirectIoFile {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}

fn write(&self) -> &DirectIoFile {
// Always the same file or else overlapping writes will be corrupted due to
// read/modify/write internals, which are in turn caused by alignment requirements
&self.files[0]
}
}

#[derive(Debug)]
struct Inner {
id: PieceCacheId,
file: FilePool,
files: FilePool,
max_num_elements: u32,
metrics: Option<DiskPieceCacheMetrics>,
}
Expand Down Expand Up @@ -242,19 +243,22 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

let file = FilePool::open(&directory.join(Self::FILE_NAME))?;
let files = FilePool::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
// Align plot file size for disk sector size
let expected_size =
expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
{
let file = files.write();
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
}
}

// ID for cache is ephemeral unless provided explicitly
Expand All @@ -264,7 +268,7 @@ impl DiskPieceCache {
Ok(Self {
inner: Arc::new(Inner {
id,
file,
files,
max_num_elements: capacity,
metrics,
}),
Expand Down Expand Up @@ -353,16 +357,16 @@ impl DiskPieceCache {
let element_offset = u64::from(offset) * u64::from(Self::element_size());

let piece_index_bytes = piece_index.to_bytes();
// File writes are read/write/modify internally, so combine all data here for more efficient
// write
let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
bytes.extend_from_slice(&piece_index_bytes);
bytes.extend_from_slice(piece.as_ref());
bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref());
self.inner
.file
.write_all_at(&piece_index_bytes, element_offset)?;
self.inner
.file
.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?;
self.inner.file.write_all_at(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64,
)?;
.files
.write()
.write_all_at(&bytes, element_offset)?;

Ok(())
}
Expand Down Expand Up @@ -432,7 +436,8 @@ impl DiskPieceCache {
element: &mut [u8],
) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
self.inner
.file
.files
.read()
.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;

let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ where
let pieces_to_read_from_piece_cache = {
let caches = self.piece_caches.read().await;
// Pieces to read from piece cache grouped by backend for efficiency reasons
let mut reading_from_piece_cache =
let mut pieces_to_read_from_piece_cache =
HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();

for piece_index in piece_indices {
Expand All @@ -1208,7 +1208,7 @@ where
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

match reading_from_piece_cache.entry(cache_index) {
match pieces_to_read_from_piece_cache.entry(cache_index) {
Entry::Occupied(mut entry) => {
let (_backend, pieces) = entry.get_mut();
pieces.insert(piece_offset, (piece_index, key));
Expand All @@ -1227,7 +1227,7 @@ where
}
}

reading_from_piece_cache
pieces_to_read_from_piece_cache
};

let (tx, mut rx) = mpsc::unbounded();
Expand Down
22 changes: 11 additions & 11 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ impl DiskPlotCache {
return Ok(false);
};

let piece_index_bytes = piece_index.to_bytes();
let write_fut = tokio::task::spawn_blocking({
let piece = piece.clone();

move || {
file.write_all_at(&piece_index_bytes, element_offset)?;
file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?;
file.write_all_at(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64,
)
}
let piece_index_bytes = piece_index.to_bytes();
// File writes are read/write/modify internally, so combine all data here for more
// efficient write
let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
bytes.extend_from_slice(&piece_index_bytes);
bytes.extend_from_slice(piece.as_ref());
bytes.extend_from_slice(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
);

move || file.write_all_at(&bytes, element_offset)
});

AsyncJoinOnDrop::new(write_fut, false).await??;
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ where
cached_pieces,
not_cached_pieces,
);
downloading_stream.insert(piece_index, Box::pin(fut.into_stream()));
downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
}

if pieces_to_download.len() == num_pieces {
Expand Down

0 comments on commit 6693a10

Please sign in to comment.