diff --git a/Cargo.lock b/Cargo.lock index 10818cb057..54b53bccca 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12546,6 +12546,7 @@ dependencies = [ name = "subspace-farmer-components" version = "0.1.0" dependencies = [ + "anyhow", "async-lock 3.4.0", "async-trait", "backoff", diff --git a/crates/subspace-farmer-components/Cargo.toml b/crates/subspace-farmer-components/Cargo.toml index 6fa1335404..8edc69aa4e 100644 --- a/crates/subspace-farmer-components/Cargo.toml +++ b/crates/subspace-farmer-components/Cargo.toml @@ -16,6 +16,7 @@ include = [ bench = false [dependencies] +anyhow = "1.0.89" async-lock = "3.4.0" async-trait = "0.1.83" backoff = { version = "0.4.0", features = ["futures", "tokio"] } diff --git a/crates/subspace-farmer-components/src/lib.rs b/crates/subspace-farmer-components/src/lib.rs index 6fd0c97119..c8ca4a5ea4 100644 --- a/crates/subspace-farmer-components/src/lib.rs +++ b/crates/subspace-farmer-components/src/lib.rs @@ -28,7 +28,6 @@ use async_trait::async_trait; use parity_scale_codec::{Decode, Encode}; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; -use std::error::Error; use std::fs::File; use std::future::Future; use std::io; @@ -40,10 +39,7 @@ use subspace_core_primitives::segments::{ArchivedHistorySegment, HistorySize}; #[async_trait] pub trait PieceGetter { /// Get piece by index - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>>; + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>; } #[async_trait] @@ -51,20 +47,14 @@ impl<T> PieceGetter for Arc<T> where T: PieceGetter + Send + Sync, { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { self.as_ref().get_piece(piece_index).await } } #[async_trait] impl PieceGetter for ArchivedHistorySegment { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { let position = usize::try_from(u64::from(piece_index))?; Ok(self.pieces().nth(position)) diff --git a/crates/subspace-farmer-components/src/plotting.rs b/crates/subspace-farmer-components/src/plotting.rs index 3349636192..c97261e9c4 100644 --- a/crates/subspace-farmer-components/src/plotting.rs +++ b/crates/subspace-farmer-components/src/plotting.rs @@ -69,7 +69,7 @@ pub enum PlottingError { #[error("Records encoder error: {error}")] RecordsEncoderError { /// Lower-level error - error: Box<dyn std::error::Error + Send + Sync + 'static>, + error: anyhow::Error, }, /// Bad sector output size #[error("Bad sector output size: provided {provided}, expected {expected}")] @@ -97,7 +97,7 @@ pub enum PlottingError { /// Piece index piece_index: PieceIndex, /// Lower-level error - error: Box<dyn std::error::Error + Send + Sync + 'static>, + error: anyhow::Error, }, /// Failed to acquire permit #[error("Failed to acquire permit: {error}")] @@ -338,7 +338,7 @@ pub trait RecordsEncoder { sector_id: &SectorId, records: &mut [Record], abort_early: &AtomicBool, - ) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>>; + ) -> anyhow::Result<SectorContentsMap>; } /// CPU implementation of [`RecordsEncoder`] @@ -361,24 +361,23 @@ where sector_id: &SectorId, records: &mut [Record], abort_early: &AtomicBool, - ) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>> { + ) -> anyhow::Result<SectorContentsMap> { if self.erasure_coding.max_shards() < Record::NUM_S_BUCKETS { - return Err(format!( + return Err(anyhow::anyhow!( "Invalid erasure coding instance: {} shards needed, {} supported", Record::NUM_S_BUCKETS, self.erasure_coding.max_shards() - ) - .into()); + )); } if self.table_generators.is_empty() { - return Err("No table generators".into()); + return Err(anyhow::anyhow!("No table generators")); } let pieces_in_sector = records .len() .try_into() - .map_err(|error| format!("Failed to convert pieces in sector: {error}"))?; + .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); { @@ -751,7 +750,7 @@ async fn download_sector_internal<PG: PieceGetter>( let _permit = match recovery_semaphore.acquire().await { Ok(permit) => permit, Err(error) => { - let error = format!("Recovery semaphore was closed: {error}").into(); + let error = anyhow::anyhow!("Recovery semaphore was closed: {error}"); return Err(PlottingError::FailedToRetrievePiece { piece_index, error }); } }; diff --git a/crates/subspace-farmer/src/cluster/controller.rs b/crates/subspace-farmer/src/cluster/controller.rs index 4bfd786657..ec9acde162 100644 --- a/crates/subspace-farmer/src/cluster/controller.rs +++ b/crates/subspace-farmer/src/cluster/controller.rs @@ -12,7 +12,7 @@ use crate::cluster::nats_client::{ }; use crate::farm::{PieceCacheId, PieceCacheOffset}; use crate::farmer_cache::FarmerCache; -use crate::node_client::{Error as NodeClientError, NodeClient}; +use crate::node_client::NodeClient; use anyhow::anyhow; use async_lock::Semaphore; use async_nats::HeaderValue; @@ -20,7 +20,6 @@ use async_trait::async_trait; use futures::{select, FutureExt, Stream, StreamExt}; use parity_scale_codec::{Decode, Encode}; use parking_lot::Mutex; -use std::error::Error; use std::num::NonZeroUsize; use std::pin::Pin; use std::sync::Arc; @@ -171,10 +170,7 @@ pub struct ClusterPieceGetter { #[async_trait] impl PieceGetter for ClusterPieceGetter { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { let _guard = self.request_semaphore.acquire().await; if let Some((piece_cache_id, piece_cache_offset)) = self @@ -286,16 +282,17 @@ impl ClusterNodeClient { #[async_trait] impl NodeClient for ClusterNodeClient { - async fn farmer_app_info(&self) -> Result<FarmerAppInfo, NodeClientError> { + async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> { Ok(self .nats_client .request(&ClusterControllerFarmerAppInfoRequest, None) - .await??) + .await? + .map_err(anyhow::Error::msg)?) } async fn subscribe_slot_info( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, NodeClientError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> { let subscription = self .nats_client .subscribe_to_broadcasts::<ClusterControllerSlotInfoBroadcast>(None, None) @@ -328,7 +325,7 @@ impl NodeClient for ClusterNodeClient { async fn submit_solution_response( &self, solution_response: SolutionResponse, - ) -> Result<(), NodeClientError> { + ) -> anyhow::Result<()> { let last_slot_info_instance = self.last_slot_info_instance.lock().clone(); Ok(self .nats_client @@ -341,8 +338,7 @@ impl NodeClient for ClusterNodeClient { async fn subscribe_reward_signing( &self, - ) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, NodeClientError> - { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> { let subscription = self .nats_client .subscribe_to_broadcasts::<ClusterControllerRewardSigningBroadcast>(None, None) @@ -356,7 +352,7 @@ impl NodeClient for ClusterNodeClient { async fn submit_reward_signature( &self, reward_signature: RewardSignatureResponse, - ) -> Result<(), NodeClientError> { + ) -> anyhow::Result<()> { Ok(self .nats_client .notification( @@ -368,7 +364,7 @@ impl NodeClient for ClusterNodeClient { async fn subscribe_archived_segment_headers( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, NodeClientError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> { let subscription = self .nats_client .subscribe_to_broadcasts::<ClusterControllerArchivedSegmentHeaderBroadcast>(None, None) @@ -401,7 +397,7 @@ impl NodeClient for ClusterNodeClient { async fn segment_headers( &self, segment_indices: Vec<SegmentIndex>, - ) -> Result<Vec<Option<SegmentHeader>>, NodeClientError> { + ) -> anyhow::Result<Vec<Option<SegmentHeader>>> { Ok(self .nats_client .request( @@ -411,7 +407,7 @@ impl NodeClient for ClusterNodeClient { .await?) } - async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, NodeClientError> { + async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { Ok(self .nats_client .request(&ClusterControllerPieceRequest { piece_index }, None) @@ -421,7 +417,7 @@ impl NodeClient for ClusterNodeClient { async fn acknowledge_archived_segment_header( &self, _segment_index: SegmentIndex, - ) -> Result<(), NodeClientError> { + ) -> anyhow::Result<()> { // Acknowledgement is unnecessary/unsupported Ok(()) } diff --git a/crates/subspace-farmer/src/farm.rs b/crates/subspace-farmer/src/farm.rs index f80f51bf07..fa205a7cf3 100644 --- a/crates/subspace-farmer/src/farm.rs +++ b/crates/subspace-farmer/src/farm.rs @@ -7,7 +7,6 @@ //! way). This crate provides a few of such implementations, but more can be created externally as //! well if needed without modifying the library itself. -use crate::node_client; use async_trait::async_trait; use derive_more::{Display, From}; use futures::Stream; @@ -278,13 +277,13 @@ pub enum FarmingError { #[error("Failed to subscribe to slot info notifications: {error}")] FailedToSubscribeSlotInfo { /// Lower-level error - error: node_client::Error, + error: anyhow::Error, }, /// Failed to retrieve farmer info #[error("Failed to retrieve farmer info: {error}")] FailedToGetFarmerInfo { /// Lower-level error - error: node_client::Error, + error: anyhow::Error, }, /// Slot info notification stream ended #[error("Slot info notification stream ended")] diff --git a/crates/subspace-farmer/src/farmer_cache/tests.rs b/crates/subspace-farmer/src/farmer_cache/tests.rs index 9b964efd96..d25ce1d4d8 100644 --- a/crates/subspace-farmer/src/farmer_cache/tests.rs +++ b/crates/subspace-farmer/src/farmer_cache/tests.rs @@ -6,7 +6,7 @@ use crate::disk_piece_cache::DiskPieceCache; use crate::farmer_cache::{decode_piece_index_from_record_key, FarmerCache}; -use crate::node_client::{Error, NodeClient}; +use crate::node_client::NodeClient; use async_trait::async_trait; use futures::channel::{mpsc, oneshot}; use futures::{SinkExt, Stream, StreamExt}; @@ -44,7 +44,7 @@ struct MockNodeClient { #[async_trait] impl NodeClient for MockNodeClient { - async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error> { + async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> { // Most of these values make no sense, but they are not used by piece cache anyway Ok(FarmerAppInfo { genesis_hash: [0; 32], @@ -68,33 +68,33 @@ impl NodeClient for MockNodeClient { async fn subscribe_slot_info( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, Error> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> { unimplemented!() } async fn submit_solution_response( &self, _solution_response: SolutionResponse, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { unimplemented!() } async fn subscribe_reward_signing( &self, - ) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, Error> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> { unimplemented!() } async fn submit_reward_signature( &self, _reward_signature: RewardSignatureResponse, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { unimplemented!() } async fn subscribe_archived_segment_headers( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> { let (tx, rx) = oneshot::channel(); self.archived_segment_headers_stream_request_sender .clone() @@ -109,11 +109,11 @@ impl NodeClient for MockNodeClient { async fn segment_headers( &self, _segment_indexes: Vec<SegmentIndex>, - ) -> Result<Vec<Option<SegmentHeader>>, Error> { + ) -> anyhow::Result<Vec<Option<SegmentHeader>>> { unimplemented!() } - async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error> { + async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { Ok(Some( self.pieces .lock() @@ -130,7 +130,7 @@ impl NodeClient for MockNodeClient { async fn acknowledge_archived_segment_header( &self, segment_index: SegmentIndex, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { self.acknowledge_archived_segment_header_sender .clone() .send(segment_index) @@ -147,10 +147,7 @@ struct MockPieceGetter { #[async_trait] impl PieceGetter for MockPieceGetter { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn std::error::Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { Ok(Some( self.pieces .lock() diff --git a/crates/subspace-farmer/src/farmer_piece_getter.rs b/crates/subspace-farmer/src/farmer_piece_getter.rs index 24306f07fa..475782718e 100644 --- a/crates/subspace-farmer/src/farmer_piece_getter.rs +++ b/crates/subspace-farmer/src/farmer_piece_getter.rs @@ -12,7 +12,6 @@ use backoff::future::retry; use backoff::ExponentialBackoff; use parking_lot::Mutex; use std::collections::HashMap; -use std::error::Error; use std::fmt; use std::hash::Hash; use std::num::NonZeroUsize; @@ -383,10 +382,7 @@ where PV: PieceValidator + Send + 'static, NC: NodeClient, { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { let _guard = self.inner.request_semaphore.acquire().await; match InProgressPiece::new(piece_index, &self.inner.in_progress_pieces) { @@ -453,10 +449,7 @@ where PV: PieceValidator + Send + 'static, NC: NodeClient, { - async fn get_piece( - &self, - piece_index: PieceIndex, - ) -> Result<Option<Piece>, Box<dyn Error + Send + Sync + 'static>> { + async fn get_piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { let Some(piece_getter) = self.upgrade() else { debug!("Farmer piece getter upgrade didn't succeed"); return Ok(None); diff --git a/crates/subspace-farmer/src/node_client.rs b/crates/subspace-farmer/src/node_client.rs index a68bd52f1f..38c327cbc6 100644 --- a/crates/subspace-farmer/src/node_client.rs +++ b/crates/subspace-farmer/src/node_client.rs @@ -21,61 +21,58 @@ use subspace_rpc_primitives::{ FarmerAppInfo, RewardSignatureResponse, RewardSigningInfo, SlotInfo, SolutionResponse, }; -/// Erased error type -pub type Error = Box<dyn std::error::Error + Send + Sync + 'static>; - /// Abstraction of the Node Client #[async_trait] pub trait NodeClient: fmt::Debug + Send + Sync + 'static { /// Get farmer app info - async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error>; + async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo>; /// Subscribe to slot async fn subscribe_slot_info( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, Error>; + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>>; /// Submit a slot solution async fn submit_solution_response( &self, solution_response: SolutionResponse, - ) -> Result<(), Error>; + ) -> anyhow::Result<()>; /// Subscribe to block signing request async fn subscribe_reward_signing( &self, - ) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, Error>; + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>>; /// Submit a block signature async fn submit_reward_signature( &self, reward_signature: RewardSignatureResponse, - ) -> Result<(), Error>; + ) -> anyhow::Result<()>; /// Subscribe to archived segment headers async fn subscribe_archived_segment_headers( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, Error>; + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>>; /// Get segment headers for the segments async fn segment_headers( &self, segment_indices: Vec<SegmentIndex>, - ) -> Result<Vec<Option<SegmentHeader>>, Error>; + ) -> anyhow::Result<Vec<Option<SegmentHeader>>>; /// Get piece by index. - async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, Error>; + async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>>; /// Acknowledge segment header. async fn acknowledge_archived_segment_header( &self, segment_index: SegmentIndex, - ) -> Result<(), Error>; + ) -> anyhow::Result<()>; } /// Node Client extension methods that are not necessary for farmer as a library, but might be useful for an app #[async_trait] pub trait NodeClientExt: NodeClient { /// Get the last segment headers. - async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error>; + async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>>; } diff --git a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs index b9f4ab40a8..6bb6a99862 100644 --- a/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs +++ b/crates/subspace-farmer/src/node_client/caching_proxy_node_client.rs @@ -1,7 +1,7 @@ //! Node client wrapper around another node client that caches some data for better performance and //! proxies other requests through -use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; +use crate::node_client::{NodeClient, NodeClientExt}; use crate::utils::AsyncJoinOnDrop; use async_lock::{Mutex as AsyncMutex, RwLock as AsyncRwLock}; use async_trait::async_trait; @@ -58,7 +58,7 @@ impl SegmentHeaders { .collect() } - async fn sync<NC>(&mut self, client: &NC) -> Result<(), Error> + async fn sync<NC>(&mut self, client: &NC) -> anyhow::Result<()> where NC: NodeClient, { @@ -81,7 +81,9 @@ impl SegmentHeaders { .segment_headers((from..to).collect::<Vec<_>>()) .await .map_err(|error| { - format!("Failed to download segment headers {from}..{to} from node: {error}") + anyhow::anyhow!( + "Failed to download segment headers {from}..{to} from node: {error}" + ) })? { let Some(segment_header) = maybe_segment_header else { @@ -122,7 +124,7 @@ where NC: NodeClient + Clone, { /// Create a new instance - pub async fn new(client: NC) -> Result<Self, Error> { + pub async fn new(client: NC) -> anyhow::Result<Self> { let mut segment_headers = SegmentHeaders::default(); let mut archived_segments_notifications = client.subscribe_archived_segment_headers().await?; @@ -219,7 +221,7 @@ where let farmer_app_info = client .farmer_app_info() .await - .map_err(|error| format!("Failed to get farmer app info: {error}"))?; + .map_err(|error| anyhow::anyhow!("Failed to get farmer app info: {error}"))?; let last_farmer_app_info = Arc::new(AsyncMutex::new((farmer_app_info, Instant::now()))); let background_task = tokio::spawn(async move { @@ -249,7 +251,7 @@ impl<NC> NodeClient for CachingProxyNodeClient<NC> where NC: NodeClient, { - async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error> { + async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> { let (last_farmer_app_info, last_farmer_app_info_request) = &mut *self.last_farmer_app_info.lock().await; @@ -265,7 +267,7 @@ where async fn subscribe_slot_info( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> { Ok(Box::pin( WatchStream::new(self.slot_info_receiver.clone()) .filter_map(|maybe_slot_info| async move { maybe_slot_info }), @@ -275,13 +277,13 @@ where async fn submit_solution_response( &self, solution_response: SolutionResponse, - ) -> Result<(), RpcError> { + ) -> anyhow::Result<()> { self.inner.submit_solution_response(solution_response).await } async fn subscribe_reward_signing( &self, - ) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> { Ok(Box::pin( WatchStream::new(self.reward_signing_receiver.clone()) .filter_map(|maybe_reward_signing_info| async move { maybe_reward_signing_info }), @@ -292,13 +294,13 @@ where async fn submit_reward_signature( &self, reward_signature: RewardSignatureResponse, - ) -> Result<(), RpcError> { + ) -> anyhow::Result<()> { self.inner.submit_reward_signature(reward_signature).await } async fn subscribe_archived_segment_headers( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> { Ok(Box::pin( WatchStream::new(self.archived_segment_headers_receiver.clone()) .filter_map(|maybe_segment_header| async move { maybe_segment_header }), @@ -308,7 +310,7 @@ where async fn segment_headers( &self, segment_indices: Vec<SegmentIndex>, - ) -> Result<Vec<Option<SegmentHeader>>, RpcError> { + ) -> anyhow::Result<Vec<Option<SegmentHeader>>> { let retrieved_segment_headers = self .segment_headers .read() @@ -326,14 +328,14 @@ where } } - async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> { + async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { self.inner.piece(piece_index).await } async fn acknowledge_archived_segment_header( &self, _segment_index: SegmentIndex, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { // Not supported Ok(()) } @@ -344,7 +346,7 @@ impl<NC> NodeClientExt for CachingProxyNodeClient<NC> where NC: NodeClientExt, { - async fn last_segment_headers(&self, limit: u32) -> Result<Vec<Option<SegmentHeader>>, Error> { + async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> { Ok(self .segment_headers .read() diff --git a/crates/subspace-farmer/src/node_client/rpc_node_client.rs b/crates/subspace-farmer/src/node_client/rpc_node_client.rs index b4eb430ee6..ded1fbc53e 100644 --- a/crates/subspace-farmer/src/node_client/rpc_node_client.rs +++ b/crates/subspace-farmer/src/node_client/rpc_node_client.rs @@ -1,6 +1,6 @@ //! Node client implementation that connects to node via RPC (WebSockets) -use crate::node_client::{Error as RpcError, Error, NodeClient, NodeClientExt}; +use crate::node_client::{NodeClient, NodeClientExt}; use async_lock::Semaphore; use async_trait::async_trait; use futures::{Stream, StreamExt}; @@ -51,7 +51,7 @@ impl RpcNodeClient { #[async_trait] impl NodeClient for RpcNodeClient { - async fn farmer_app_info(&self) -> Result<FarmerAppInfo, Error> { + async fn farmer_app_info(&self) -> anyhow::Result<FarmerAppInfo> { Ok(self .client .request("subspace_getFarmerAppInfo", rpc_params![]) @@ -60,7 +60,7 @@ impl NodeClient for RpcNodeClient { async fn subscribe_slot_info( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SlotInfo> + Send + 'static>>> { let subscription = self .client .subscribe( @@ -78,7 +78,7 @@ impl NodeClient for RpcNodeClient { async fn submit_solution_response( &self, solution_response: SolutionResponse, - ) -> Result<(), RpcError> { + ) -> anyhow::Result<()> { Ok(self .client .request( @@ -90,7 +90,7 @@ impl NodeClient for RpcNodeClient { async fn subscribe_reward_signing( &self, - ) -> Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = RewardSigningInfo> + Send + 'static>>> { let subscription = self .client .subscribe( @@ -109,7 +109,7 @@ impl NodeClient for RpcNodeClient { async fn submit_reward_signature( &self, reward_signature: RewardSignatureResponse, - ) -> Result<(), RpcError> { + ) -> anyhow::Result<()> { Ok(self .client .request( @@ -121,7 +121,7 @@ impl NodeClient for RpcNodeClient { async fn subscribe_archived_segment_headers( &self, - ) -> Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>, RpcError> { + ) -> anyhow::Result<Pin<Box<dyn Stream<Item = SegmentHeader> + Send + 'static>>> { let subscription = self .client .subscribe( @@ -139,14 +139,14 @@ impl NodeClient for RpcNodeClient { async fn segment_headers( &self, segment_indices: Vec<SegmentIndex>, - ) -> Result<Vec<Option<SegmentHeader>>, RpcError> { + ) -> anyhow::Result<Vec<Option<SegmentHeader>>> { Ok(self .client .request("subspace_segmentHeaders", rpc_params![&segment_indices]) .await?) } - async fn piece(&self, piece_index: PieceIndex) -> Result<Option<Piece>, RpcError> { + async fn piece(&self, piece_index: PieceIndex) -> anyhow::Result<Option<Piece>> { let _permit = self.piece_request_semaphore.acquire().await; let client = Arc::clone(&self.client); // Spawn a separate task to improve concurrency due to slow-ish JSON decoding that causes @@ -162,7 +162,7 @@ impl NodeClient for RpcNodeClient { async fn acknowledge_archived_segment_header( &self, segment_index: SegmentIndex, - ) -> Result<(), Error> { + ) -> anyhow::Result<()> { Ok(self .client .request( @@ -175,10 +175,7 @@ impl NodeClient for RpcNodeClient { #[async_trait] impl NodeClientExt for RpcNodeClient { - async fn last_segment_headers( - &self, - limit: u32, - ) -> Result<Vec<Option<SegmentHeader>>, RpcError> { + async fn last_segment_headers(&self, limit: u32) -> anyhow::Result<Vec<Option<SegmentHeader>>> { Ok(self .client .request("subspace_lastSegmentHeaders", rpc_params![limit]) diff --git a/crates/subspace-farmer/src/plotter/gpu/cuda.rs b/crates/subspace-farmer/src/plotter/gpu/cuda.rs index 1c7e3de118..d53467203a 100644 --- a/crates/subspace-farmer/src/plotter/gpu/cuda.rs +++ b/crates/subspace-farmer/src/plotter/gpu/cuda.rs @@ -30,11 +30,11 @@ impl RecordsEncoder for CudaRecordsEncoder { sector_id: &SectorId, records: &mut [Record], abort_early: &AtomicBool, - ) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>> { + ) -> anyhow::Result<SectorContentsMap> { let pieces_in_sector = records .len() .try_into() - .map_err(|error| format!("Failed to convert pieces in sector: {error}"))?; + .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); self.thread_pool.install(|| { @@ -76,7 +76,7 @@ impl RecordsEncoder for CudaRecordsEncoder { let plotting_error = plotting_error.lock().take(); if let Some(error) = plotting_error { - return Err(error); + return Err(anyhow::Error::msg(error)); } Ok(()) diff --git a/crates/subspace-farmer/src/plotter/gpu/rocm.rs b/crates/subspace-farmer/src/plotter/gpu/rocm.rs index d5257d98b3..c86161f47a 100644 --- a/crates/subspace-farmer/src/plotter/gpu/rocm.rs +++ b/crates/subspace-farmer/src/plotter/gpu/rocm.rs @@ -30,11 +30,11 @@ impl RecordsEncoder for RocmRecordsEncoder { sector_id: &SectorId, records: &mut [Record], abort_early: &AtomicBool, - ) -> Result<SectorContentsMap, Box<dyn std::error::Error + Send + Sync + 'static>> { + ) -> anyhow::Result<SectorContentsMap> { let pieces_in_sector = records .len() .try_into() - .map_err(|error| format!("Failed to convert pieces in sector: {error}"))?; + .map_err(|error| anyhow::anyhow!("Failed to convert pieces in sector: {error}"))?; let mut sector_contents_map = SectorContentsMap::new(pieces_in_sector); self.thread_pool.install(|| { @@ -76,7 +76,7 @@ impl RecordsEncoder for RocmRecordsEncoder { let plotting_error = plotting_error.lock().take(); if let Some(error) = plotting_error { - return Err(error); + return Err(anyhow::Error::msg(error)); } Ok(()) diff --git a/crates/subspace-farmer/src/single_disk_farm.rs b/crates/subspace-farmer/src/single_disk_farm.rs index f4fec958d0..a1ba385b97 100644 --- a/crates/subspace-farmer/src/single_disk_farm.rs +++ b/crates/subspace-farmer/src/single_disk_farm.rs @@ -55,7 +55,6 @@ use rayon::{ThreadPoolBuildError, ThreadPoolBuilder}; use serde::{Deserialize, Serialize}; use static_assertions::const_assert; use std::collections::HashSet; -use std::error::Error; use std::fs::{File, OpenOptions}; use std::future::Future; use std::io::Write; @@ -550,7 +549,7 @@ pub enum BackgroundTaskError { Farming(#[from] FarmingError), /// Reward signing #[error(transparent)] - RewardSigning(#[from] Box<dyn Error + Send + Sync + 'static>), + RewardSigning(#[from] anyhow::Error), /// Background task panicked #[error("Background task {task} panicked")] BackgroundTaskPanicked { @@ -1227,10 +1226,9 @@ impl SingleDiskFarm { reward_signing_fut.await; } Err(error) => { - return Err(BackgroundTaskError::RewardSigning( - format!("Failed to subscribe to reward signing notifications: {error}") - .into(), - )); + return Err(BackgroundTaskError::RewardSigning(anyhow::anyhow!( + "Failed to subscribe to reward signing notifications: {error}" + ))); } } diff --git a/crates/subspace-farmer/src/single_disk_farm/plotting.rs b/crates/subspace-farmer/src/single_disk_farm/plotting.rs index 29930c0223..56d3778520 100644 --- a/crates/subspace-farmer/src/single_disk_farm/plotting.rs +++ b/crates/subspace-farmer/src/single_disk_farm/plotting.rs @@ -1,5 +1,5 @@ use crate::farm::{SectorExpirationDetails, SectorPlottingDetails, SectorUpdate}; -use crate::node_client::{Error as NodeClientError, NodeClient}; +use crate::node_client::NodeClient; use crate::plotter::{Plotter, SectorPlottingProgress}; use crate::single_disk_farm::direct_io_file::DirectIoFile; use crate::single_disk_farm::metrics::{SectorState, SingleDiskFarmMetrics}; @@ -50,13 +50,13 @@ pub enum PlottingError { #[error("Failed to retrieve farmer info: {error}")] FailedToGetFarmerInfo { /// Lower-level error - error: NodeClientError, + error: anyhow::Error, }, /// Failed to get segment header #[error("Failed to get segment header: {error}")] FailedToGetSegmentHeader { /// Lower-level error - error: NodeClientError, + error: anyhow::Error, }, /// Missing archived segment header #[error("Missing archived segment header: {segment_index}")] @@ -68,7 +68,7 @@ pub enum PlottingError { #[error("Failed to subscribe to archived segments: {error}")] FailedToSubscribeArchivedSegments { /// Lower-level error - error: NodeClientError, + error: anyhow::Error, }, /// Low-level plotting error #[error("Low-level plotting error: {0}")] diff --git a/crates/subspace-farmer/src/single_disk_farm/reward_signing.rs b/crates/subspace-farmer/src/single_disk_farm/reward_signing.rs index d9352770c2..305071afc8 100644 --- a/crates/subspace-farmer/src/single_disk_farm/reward_signing.rs +++ b/crates/subspace-farmer/src/single_disk_farm/reward_signing.rs @@ -8,7 +8,7 @@ use tracing::{info, warn}; pub(super) async fn reward_signing<NC>( node_client: NC, identity: Identity, -) -> Result<impl Future<Output = ()>, Box<dyn std::error::Error + Send + Sync>> +) -> anyhow::Result<impl Future<Output = ()>> where NC: NodeClient, { diff --git a/crates/subspace-networking/src/utils/piece_provider.rs b/crates/subspace-networking/src/utils/piece_provider.rs index 960180c2f6..5a32620cbd 100644 --- a/crates/subspace-networking/src/utils/piece_provider.rs +++ b/crates/subspace-networking/src/utils/piece_provider.rs @@ -78,7 +78,7 @@ where } } - /// Get a pieces with provided indices from cache + /// Get pieces with provided indices from cache pub async fn get_from_cache<'a, PieceIndices>( &'a self, piece_indices: PieceIndices,