Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Farmer fixes #3154

Merged
merged 4 commits into from
Oct 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/subspace-farmer/src/cluster/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ impl PieceGetter for ClusterPieceGetter {
let mut getting_from_piece_cache = cached_pieces_by_cache_id
.into_iter()
.map(|(piece_cache_id, offsets)| {
let piece_indices_to_download = &piece_indices_to_get;
let piece_indices_to_get = &piece_indices_to_get;

async move {
let mut pieces_stream = match self
Expand Down Expand Up @@ -348,7 +348,7 @@ impl PieceGetter for ClusterPieceGetter {
};

if let Some((piece_index, piece)) = maybe_piece {
piece_indices_to_download.lock().remove(&piece_index);
piece_indices_to_get.lock().remove(&piece_index);

tx.unbounded_send((piece_index, Ok(Some(piece)))).expect(
"This future isn't polled after receiver is dropped; qed",
Expand Down
65 changes: 35 additions & 30 deletions crates/subspace-farmer/src/disk_piece_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use futures::channel::mpsc;
use futures::{stream, SinkExt, Stream, StreamExt};
use parking_lot::Mutex;
use prometheus_client::registry::Registry;
use std::ops::Deref;
use std::path::Path;
use std::sync::atomic::{AtomicU8, Ordering};
use std::sync::Arc;
Expand Down Expand Up @@ -66,15 +65,6 @@ struct FilePool {
cursor: AtomicU8,
}

impl Deref for FilePool {
type Target = DirectIoFile;

fn deref(&self) -> &Self::Target {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}
}

impl FilePool {
fn open(path: &Path) -> io::Result<Self> {
let files = (0..PIECES_READING_CONCURRENCY)
Expand All @@ -87,12 +77,23 @@ impl FilePool {
cursor: AtomicU8::new(0),
})
}

fn read(&self) -> &DirectIoFile {
let position = usize::from(self.cursor.fetch_add(1, Ordering::Relaxed));
&self.files[position % PIECES_READING_CONCURRENCY]
}

fn write(&self) -> &DirectIoFile {
// Always the same file or else overlapping writes will be corrupted due to
// read/modify/write internals, which are in turn caused by alignment requirements
&self.files[0]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is rather confusing despite the comment. Can you rephrase to explain writing API for a single file and reading API from different files?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When you read it is fine to read concurrently, but due to alignment requirements when we do not write chunks of 512 bytes at a multiple of 512 bytes offset, we need to first read 512 bytes, then replace what we want to modify in those bytes with new contents and write it back to disk. We can't just write arbitrary sizes at arbitrary offsets with Direct IO, see the DirectIoFile implementation.

}
}

#[derive(Debug)]
struct Inner {
id: PieceCacheId,
file: FilePool,
files: FilePool,
max_num_elements: u32,
metrics: Option<DiskPieceCacheMetrics>,
}
Expand Down Expand Up @@ -242,19 +243,22 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

let file = FilePool::open(&directory.join(Self::FILE_NAME))?;
let files = FilePool::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
// Align plot file size for disk sector size
let expected_size =
expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
{
let file = files.write();
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
.map_err(DiskPieceCacheError::CantPreallocateCacheFile)?;
// Truncating file (if necessary)
file.set_len(expected_size)?;
}
}

// ID for cache is ephemeral unless provided explicitly
Expand All @@ -264,7 +268,7 @@ impl DiskPieceCache {
Ok(Self {
inner: Arc::new(Inner {
id,
file,
files,
max_num_elements: capacity,
metrics,
}),
Expand Down Expand Up @@ -353,16 +357,16 @@ impl DiskPieceCache {
let element_offset = u64::from(offset) * u64::from(Self::element_size());

let piece_index_bytes = piece_index.to_bytes();
// File writes are read/write/modify internally, so combine all data here for more efficient
// write
let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
bytes.extend_from_slice(&piece_index_bytes);
bytes.extend_from_slice(piece.as_ref());
bytes.extend_from_slice(blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref());
self.inner
.file
.write_all_at(&piece_index_bytes, element_offset)?;
self.inner
.file
.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?;
self.inner.file.write_all_at(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64,
)?;
.files
.write()
.write_all_at(&bytes, element_offset)?;

Ok(())
}
Expand Down Expand Up @@ -432,7 +436,8 @@ impl DiskPieceCache {
element: &mut [u8],
) -> Result<Option<PieceIndex>, DiskPieceCacheError> {
self.inner
.file
.files
.read()
.read_exact_at(element, u64::from(offset) * u64::from(Self::element_size()))?;

let (piece_index_bytes, remaining_bytes) = element.split_at(PieceIndex::SIZE);
Expand Down
6 changes: 3 additions & 3 deletions crates/subspace-farmer/src/farmer_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1188,7 +1188,7 @@ where
let pieces_to_read_from_piece_cache = {
let caches = self.piece_caches.read().await;
// Pieces to read from piece cache grouped by backend for efficiency reasons
let mut reading_from_piece_cache =
let mut pieces_to_read_from_piece_cache =
HashMap::<CacheIndex, (CacheBackend, HashMap<_, _>)>::new();

for piece_index in piece_indices {
Expand All @@ -1208,7 +1208,7 @@ where
let cache_index = offset.cache_index;
let piece_offset = offset.piece_offset;

match reading_from_piece_cache.entry(cache_index) {
match pieces_to_read_from_piece_cache.entry(cache_index) {
Entry::Occupied(mut entry) => {
let (_backend, pieces) = entry.get_mut();
pieces.insert(piece_offset, (piece_index, key));
Expand All @@ -1227,7 +1227,7 @@ where
}
}

reading_from_piece_cache
pieces_to_read_from_piece_cache
};

let (tx, mut rx) = mpsc::unbounded();
Expand Down
22 changes: 11 additions & 11 deletions crates/subspace-farmer/src/single_disk_farm/plot_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,18 +220,18 @@ impl DiskPlotCache {
return Ok(false);
};

let piece_index_bytes = piece_index.to_bytes();
let write_fut = tokio::task::spawn_blocking({
let piece = piece.clone();

move || {
file.write_all_at(&piece_index_bytes, element_offset)?;
file.write_all_at(piece.as_ref(), element_offset + PieceIndex::SIZE as u64)?;
file.write_all_at(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
element_offset + PieceIndex::SIZE as u64 + Piece::SIZE as u64,
)
}
let piece_index_bytes = piece_index.to_bytes();
// File writes are read/write/modify internally, so combine all data here for more
// efficient write
let mut bytes = Vec::with_capacity(PieceIndex::SIZE + piece.len() + Blake3Hash::SIZE);
bytes.extend_from_slice(&piece_index_bytes);
bytes.extend_from_slice(piece.as_ref());
bytes.extend_from_slice(
blake3_hash_list(&[&piece_index_bytes, piece.as_ref()]).as_ref(),
);

move || file.write_all_at(&bytes, element_offset)
});

AsyncJoinOnDrop::new(write_fut, false).await??;
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-networking/src/utils/piece_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,7 +674,7 @@ where
cached_pieces,
not_cached_pieces,
);
downloading_stream.insert(piece_index, Box::pin(fut.into_stream()));
downloading_stream.insert(piece_index_to_download_next, Box::pin(fut.into_stream()));
}

if pieces_to_download.len() == num_pieces {
Expand Down
Loading