Skip to content

Commit

Permalink
Switch remaining key farmer file operations to UnbufferedIoFileWindows
Browse files Browse the repository at this point in the history
  • Loading branch information
nazar-pc committed Mar 6, 2024
1 parent 78f4d16 commit 43e3f62
Show file tree
Hide file tree
Showing 12 changed files with 199 additions and 70 deletions.
16 changes: 8 additions & 8 deletions crates/subspace-farmer-components/src/file_ext.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
//! File extension trait
use std::fs::{File, OpenOptions};
use std::io::Result;
use std::io::{Result, Seek, SeekFrom};

/// Extension convenience trait that allows setting some file opening options in cross-platform way
pub trait OpenOptionsExt {
Expand Down Expand Up @@ -78,11 +78,11 @@ impl OpenOptionsExt for OpenOptions {
/// Extension convenience trait that allows pre-allocating files, suggesting random access pattern
/// and doing cross-platform exact reads/writes
pub trait FileExt {
/// Get allocated file size
fn allocated_size(&self) -> Result<u64>;
/// Get file size
fn size(&mut self) -> Result<u64>;

/// Make sure file has specified number of bytes allocated for it
fn preallocate(&self, len: u64) -> Result<()>;
fn preallocate(&mut self, len: u64) -> Result<()>;

/// Advise OS/file system that file will use random access and read-ahead behavior is
/// undesirable, on Windows this can only be set when file is opened, see [`OpenOptionsExt`]
Expand All @@ -100,13 +100,13 @@ pub trait FileExt {
}

impl FileExt for File {
fn allocated_size(&self) -> Result<u64> {
fs4::FileExt::allocated_size(self)
fn size(&mut self) -> Result<u64> {
self.seek(SeekFrom::End(0))
}

fn preallocate(&self, len: u64) -> Result<()> {
fn preallocate(&mut self, len: u64) -> Result<()> {
// TODO: Hack due to bugs on Windows: https://github.com/al8n/fs4-rs/issues/13
if fs4::FileExt::allocated_size(self)? == len {
if self.size()? == len {
return Ok(());
}
fs4::FileExt::allocate(self, len)
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/src/proving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ pub enum ProvingError {
#[error("Failed to decode sector contents map: {0}")]
FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Proving I/O error: {0}")]
Io(#[from] io::Error),
/// Record reading error
#[error("Record reading error: {0}")]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer-components/src/reading.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ pub enum ReadingError {
#[error("Failed to decode sector contents map: {0}")]
FailedToDecodeSectorContentsMap(#[from] SectorContentsMapFromBytesError),
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Reading I/O error: {0}")]
Io(#[from] io::Error),
/// Checksum mismatch
#[error("Checksum mismatch")]
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/identity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ fn keypair_from_entropy(entropy: &[u8]) -> Keypair {
#[derive(Debug, Error)]
pub enum IdentityError {
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Identity I/O error: {0}")]
Io(#[from] io::Error),
/// Decoding error
#[error("Decoding error: {0}")]
Expand Down
61 changes: 40 additions & 21 deletions crates/subspace-farmer/src/single_disk_farm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ use static_assertions::const_assert;
use std::error::Error;
use std::fs::{File, OpenOptions};
use std::future::Future;
use std::io::{Seek, SeekFrom};
use std::num::{NonZeroU8, NonZeroUsize};
use std::path::{Path, PathBuf};
use std::pin::Pin;
Expand All @@ -58,7 +57,9 @@ use subspace_core_primitives::{
SegmentIndex,
};
use subspace_erasure_coding::ErasureCoding;
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use subspace_farmer_components::plotting::PlottedSector;
use subspace_farmer_components::sector::{sector_size, SectorMetadata, SectorMetadataChecksummed};
use subspace_farmer_components::{FarmerProtocolInfo, PieceGetter};
Expand Down Expand Up @@ -307,7 +308,7 @@ pub enum SingleDiskFarmError {
LikelyAlreadyInUse(io::Error),
// TODO: Make more variants out of this generic one
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Single disk farm I/O error: {0}")]
Io(#[from] io::Error),
/// Piece cache error
#[error("Piece cache error: {0}")]
Expand Down Expand Up @@ -748,10 +749,10 @@ impl SingleDiskFarm {
allocated_space,
});
}
let plot_file_size = target_sector_count * sector_size as u64;
// Align plot file size for disk sector size
let plot_file_size = (target_sector_count * sector_size as u64)
.div_ceil(DISK_SECTOR_SIZE as u64)
* DISK_SECTOR_SIZE as u64;
let plot_file_size =
plot_file_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;

// Remaining space will be used for caching purposes
let cache_capacity = {
Expand Down Expand Up @@ -779,18 +780,26 @@ impl SingleDiskFarm {
};

let metadata_file_path = directory.join(Self::METADATA_FILE);
#[cfg(not(windows))]
let mut metadata_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(&metadata_file_path)?;

#[cfg(not(windows))]
metadata_file.advise_random_access()?;

let metadata_size = metadata_file.seek(SeekFrom::End(0))?;
#[cfg(windows)]
let mut metadata_file = UnbufferedIoFileWindows::open(&metadata_file_path)?;

let metadata_size = metadata_file.size()?;
let expected_metadata_size =
RESERVED_PLOT_METADATA + sector_metadata_size as u64 * u64::from(target_sector_count);
// Align plot file size for disk sector size
let expected_metadata_size =
expected_metadata_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
let metadata_header = if metadata_size == 0 {
let metadata_header = PlotMetadataHeader {
version: 0,
Expand Down Expand Up @@ -874,25 +883,28 @@ impl SingleDiskFarm {
Arc::new(RwLock::new(sectors_metadata))
};

let plot_file = Arc::new(
OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::PLOT_FILE))?,
);
#[cfg(not(windows))]
let mut plot_file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::PLOT_FILE))?;

#[cfg(not(windows))]
plot_file.advise_random_access()?;

if plot_file.allocated_size()? != plot_file_size {
#[cfg(windows)]
let mut plot_file = UnbufferedIoFileWindows::open(&directory.join(Self::PLOT_FILE))?;

if plot_file.size()? != plot_file_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
plot_file
.preallocate(plot_file_size)
.map_err(SingleDiskFarmError::CantPreallocatePlotFile)?;
// Truncating file (if necessary)
plot_file.set_len(sector_size as u64 * u64::from(target_sector_count))?;
plot_file.set_len(plot_file_size)?;

// TODO: Hack due to Windows bugs:
// https://learn.microsoft.com/en-us/answers/questions/1608540/getfileinformationbyhandle-followed-by-read-with-f
Expand All @@ -901,6 +913,8 @@ impl SingleDiskFarm {
}
}

let plot_file = Arc::new(plot_file);

let piece_cache = DiskPieceCache::open(&directory, cache_capacity)?;
let plot_cache = DiskPlotCache::new(
&plot_file,
Expand Down Expand Up @@ -1248,11 +1262,16 @@ impl SingleDiskFarm {
pub fn read_all_sectors_metadata(
directory: &Path,
) -> io::Result<Vec<SectorMetadataChecksummed>> {
#[cfg(not(windows))]
let mut metadata_file = OpenOptions::new()
.read(true)
.open(directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.seek(SeekFrom::End(0))?;
#[cfg(windows)]
let mut metadata_file =
UnbufferedIoFileWindows::open(&directory.join(Self::METADATA_FILE))?;

let metadata_size = metadata_file.size()?;
let sector_metadata_size = SectorMetadataChecksummed::encoded_size();

let mut metadata_header_bytes = vec![0; PlotMetadataHeader::encoded_size()];
Expand Down Expand Up @@ -1539,7 +1558,7 @@ impl SingleDiskFarm {
// Error doesn't matter here
let _ = metadata_file.advise_sequential_access();

let metadata_size = match metadata_file.seek(SeekFrom::End(0)) {
let metadata_size = match metadata_file.size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
Expand Down Expand Up @@ -1640,7 +1659,7 @@ impl SingleDiskFarm {
// Error doesn't matter here
let _ = plot_file.advise_sequential_access();

let plot_size = match plot_file.seek(SeekFrom::End(0)) {
let plot_size = match plot_file.size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
Expand Down Expand Up @@ -1907,7 +1926,7 @@ impl SingleDiskFarm {
// Error doesn't matter here
let _ = cache_file.advise_sequential_access();

let cache_size = match cache_file.seek(SeekFrom::End(0)) {
let cache_size = match cache_file.size() {
Ok(metadata_size) => metadata_size,
Err(error) => {
return Err(SingleDiskFarmScrubError::FailedToDetermineFileSize {
Expand Down
2 changes: 1 addition & 1 deletion crates/subspace-farmer/src/single_disk_farm/farming.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ pub enum FarmingError {
#[error("Low-level proving error: {0}")]
LowLevelProving(#[from] ProvingError),
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Farming I/O error: {0}")]
Io(#[from] io::Error),
/// Failed to create thread pool
#[error("Failed to create thread pool: {0}")]
Expand Down
25 changes: 21 additions & 4 deletions crates/subspace-farmer/src/single_disk_farm/piece_cache.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,28 @@
#[cfg(test)]
mod tests;

#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use crate::single_disk_farm::unbuffered_io_file_windows::DISK_SECTOR_SIZE;
use derive_more::Display;
#[cfg(not(windows))]
use std::fs::{File, OpenOptions};
use std::path::Path;
use std::sync::Arc;
use std::{fs, io, mem};
use subspace_core_primitives::crypto::blake3_hash_list;
use subspace_core_primitives::{Blake3Hash, Piece, PieceIndex};
use subspace_farmer_components::file_ext::{FileExt, OpenOptionsExt};
use subspace_farmer_components::file_ext::FileExt;
#[cfg(not(windows))]
use subspace_farmer_components::file_ext::OpenOptionsExt;
use thiserror::Error;
use tracing::{debug, info, warn};

/// Disk piece cache open error
#[derive(Debug, Error)]
pub enum DiskPieceCacheError {
/// I/O error occurred
#[error("I/O error: {0}")]
#[error("Disk piece cache I/O error: {0}")]
Io(#[from] io::Error),
/// Can't preallocate cache file, probably not enough space on disk
#[error("Can't preallocate cache file, probably not enough space on disk: {0}")]
Expand Down Expand Up @@ -44,7 +50,10 @@ pub struct Offset(u32);

#[derive(Debug)]
struct Inner {
#[cfg(not(windows))]
file: File,
#[cfg(windows)]
file: UnbufferedIoFileWindows,
num_elements: u32,
}

Expand All @@ -63,17 +72,25 @@ impl DiskPieceCache {
return Err(DiskPieceCacheError::ZeroCapacity);
}

let file = OpenOptions::new()
#[cfg(not(windows))]
let mut file = OpenOptions::new()
.read(true)
.write(true)
.create(true)
.advise_random_access()
.open(directory.join(Self::FILE_NAME))?;

#[cfg(not(windows))]
file.advise_random_access()?;

#[cfg(windows)]
let mut file = UnbufferedIoFileWindows::open(&directory.join(Self::FILE_NAME))?;

let expected_size = u64::from(Self::element_size()) * u64::from(capacity);
if file.allocated_size()? != expected_size {
// Align plot file size for disk sector size
let expected_size =
expected_size.div_ceil(DISK_SECTOR_SIZE as u64) * DISK_SECTOR_SIZE as u64;
if file.size()? != expected_size {
// Allocating the whole file (`set_len` below can create a sparse file, which will cause
// writes to fail later)
file.preallocate(expected_size)
Expand Down
32 changes: 20 additions & 12 deletions crates/subspace-farmer/src/single_disk_farm/piece_reader.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
#[cfg(windows)]
use crate::single_disk_farm::unbuffered_io_file_windows::UnbufferedIoFileWindows;
use async_lock::RwLock;
use futures::channel::{mpsc, oneshot};
use futures::{SinkExt, StreamExt};
#[cfg(not(windows))]
use std::fs::File;
use std::future::Future;
use std::sync::Arc;
Expand Down Expand Up @@ -32,7 +35,8 @@ impl PieceReader {
pub(super) fn new<PosTable>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: Arc<File>,
#[cfg(not(windows))] plot_file: Arc<File>,
#[cfg(windows)] plot_file: Arc<UnbufferedIoFileWindows>,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
Expand All @@ -42,15 +46,18 @@ impl PieceReader {
{
let (read_piece_sender, read_piece_receiver) = mpsc::channel(10);

let reading_fut = read_pieces::<PosTable>(
public_key,
pieces_in_sector,
plot_file,
sectors_metadata,
erasure_coding,
modifying_sector_index,
read_piece_receiver,
);
let reading_fut = async move {
read_pieces::<PosTable, _>(
public_key,
pieces_in_sector,
&*plot_file,
sectors_metadata,
erasure_coding,
modifying_sector_index,
read_piece_receiver,
)
.await
};

(Self { read_piece_sender }, reading_fut)
}
Expand Down Expand Up @@ -80,16 +87,17 @@ impl PieceReader {
}

#[allow(clippy::too_many_arguments)]
async fn read_pieces<PosTable>(
async fn read_pieces<PosTable, S>(
public_key: PublicKey,
pieces_in_sector: u16,
plot_file: Arc<File>,
plot_file: S,
sectors_metadata: Arc<RwLock<Vec<SectorMetadataChecksummed>>>,
erasure_coding: ErasureCoding,
modifying_sector_index: Arc<RwLock<Option<SectorIndex>>>,
mut read_piece_receiver: mpsc::Receiver<ReadPieceRequest>,
) where
PosTable: Table,
S: ReadAtSync,
{
let mut table_generator = PosTable::generator();

Expand Down
Loading

0 comments on commit 43e3f62

Please sign in to comment.