diff --git a/Cargo.lock b/Cargo.lock index a1783ebc24a..24cbb1a6e96 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1128,7 +1128,6 @@ dependencies = [ "hex", "http", "lru", - "pollster", "rs-dapi-client", "sanitize-filename", "serde", @@ -1140,6 +1139,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "zeroize", ] [[package]] @@ -3406,12 +3406,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "pollster" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" - [[package]] name = "portable-atomic" version = "1.7.0" @@ -5669,6 +5663,21 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "serde", + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] [[package]] name = "zip" diff --git a/packages/rs-drive-proof-verifier/src/error.rs b/packages/rs-drive-proof-verifier/src/error.rs index 02752a4b128..3203eb73174 100644 --- a/packages/rs-drive-proof-verifier/src/error.rs +++ b/packages/rs-drive-proof-verifier/src/error.rs @@ -106,6 +106,10 @@ pub enum ContextProviderError { /// Core Fork Error #[error("activation fork error: {0}")] ActivationForkError(String), + + /// Async error, eg. when tokio runtime fails + #[error("async error: {0}")] + AsyncError(String), } impl From for Error { diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index 43333af6ff1..788bd8a5932 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -18,7 +18,7 @@ drive-proof-verifier = { path = "../rs-drive-proof-verifier" } dapi-grpc-macros = { path = "../rs-dapi-grpc-macros" } http = { version = "1.1" } thiserror = "1.0.64" -tokio = { version = "1.40", features = ["macros"] } +tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.12" } async-trait = { version = "0.1.83" } ciborium = { git = "https://github.com/qrayven/ciborium", branch = "feat-ser-null-as-undefined" } @@ -36,7 +36,7 @@ derive_more = { version = "1.0", features = ["from"] } dashcore-rpc = { git = "https://github.com/dashpay/rust-dashcore-rpc", tag = "v0.15.4" } lru = { version = "0.12.3", optional = true } bip37-bloom-filter = { git = "https://github.com/dashpay/rs-bip37-bloom-filter", branch = "develop" } -pollster = { version = "0.3.0" } +zeroize = { version = "1.8", features = ["derive"] } [dev-dependencies] tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } @@ -71,6 +71,7 @@ mocks = [ "dep:dotenvy", "dep:envy", "dep:lru", + "zeroize/serde", ] # Run integration tests using test vectors from `tests/vectors/` instead of connecting to live Dash Platform. diff --git a/packages/rs-sdk/examples/read_contract.rs b/packages/rs-sdk/examples/read_contract.rs index ca37f1cfa7d..7ac2cc333db 100644 --- a/packages/rs-sdk/examples/read_contract.rs +++ b/packages/rs-sdk/examples/read_contract.rs @@ -4,6 +4,7 @@ use clap::Parser; use dash_sdk::{mock::provider::GrpcContextProvider, platform::Fetch, Sdk, SdkBuilder}; use dpp::prelude::{DataContract, Identifier}; use rs_dapi_client::AddressList; +use zeroize::Zeroizing; #[derive(clap::Parser, Debug)] #[command(version)] @@ -22,7 +23,7 @@ pub struct Config { // Dash Core RPC password #[arg(short = 'p', long)] - pub core_password: String, + pub core_password: Zeroizing, /// Dash Platform DAPI port #[arg(short = 'd', long)] @@ -86,7 +87,7 @@ fn setup_sdk(config: &Config) -> Sdk { .expect("parse uri"); // Now, we create the Sdk with the wallet and context provider. - let mut sdk = SdkBuilder::new(AddressList::from_iter([uri])) + let sdk = SdkBuilder::new(AddressList::from_iter([uri])) .build() .expect("cannot build sdk"); diff --git a/packages/rs-sdk/src/core_client.rs b/packages/rs-sdk/src/core/dash_core_client.rs similarity index 89% rename from packages/rs-sdk/src/core_client.rs rename to packages/rs-sdk/src/core/dash_core_client.rs index e68bb6166d3..d59af4207c5 100644 --- a/packages/rs-sdk/src/core_client.rs +++ b/packages/rs-sdk/src/core/dash_core_client.rs @@ -14,20 +14,33 @@ use dpp::dashcore::ProTxHash; use dpp::prelude::CoreBlockHeight; use drive_proof_verifier::error::ContextProviderError; use std::{fmt::Debug, sync::Mutex}; +use zeroize::Zeroizing; /// Core RPC client that can be used to retrieve quorum keys from core. /// -/// Implements [`ContextProvider`] trait. -/// /// TODO: This is a temporary implementation, effective until we integrate SPV. -pub struct CoreClient { +pub struct LowLevelDashCoreClient { core: Mutex, server_address: String, core_user: String, + core_password: Zeroizing, core_port: u16, } -impl Debug for CoreClient { +impl Clone for LowLevelDashCoreClient { + // As Client does not implement Clone, we just create a new instance of CoreClient here. + fn clone(&self) -> Self { + LowLevelDashCoreClient::new( + &self.server_address, + self.core_port, + &self.core_user, + &self.core_password, + ) + .expect("Failed to clone CoreClient when cloning, this should not happen") + } +} + +impl Debug for LowLevelDashCoreClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CoreClient") .field("server_address", &self.server_address) @@ -37,7 +50,7 @@ impl Debug for CoreClient { } } -impl CoreClient { +impl LowLevelDashCoreClient { /// Create new Dash Core client. /// /// # Arguments @@ -63,13 +76,14 @@ impl CoreClient { core: Mutex::new(core), server_address: server_address.to_string(), core_user: core_user.to_string(), + core_password: core_password.to_string().into(), core_port, }) } } // Wallet functions -impl CoreClient { +impl LowLevelDashCoreClient { /// List unspent transactions /// /// ## Arguments diff --git a/packages/rs-sdk/src/core/mod.rs b/packages/rs-sdk/src/core/mod.rs index ed7d4726cfd..f642f3b26f6 100644 --- a/packages/rs-sdk/src/core/mod.rs +++ b/packages/rs-sdk/src/core/mod.rs @@ -1,4 +1,8 @@ //! Dash Core SDK implementation. //! //! TODO: This is work in progress. +#[cfg(feature = "mocks")] +mod dash_core_client; mod transaction; +#[cfg(feature = "mocks")] +pub use dash_core_client::LowLevelDashCoreClient; diff --git a/packages/rs-sdk/src/lib.rs b/packages/rs-sdk/src/lib.rs index 7f181746108..1f928ab6db9 100644 --- a/packages/rs-sdk/src/lib.rs +++ b/packages/rs-sdk/src/lib.rs @@ -62,8 +62,6 @@ #![allow(rustdoc::private_intra_doc_links)] pub mod core; -#[cfg(feature = "mocks")] -mod core_client; pub mod error; mod internal_cache; pub mod mock; @@ -78,6 +76,7 @@ pub use dpp; pub use drive; pub use drive_proof_verifier::types as query_types; pub use rs_dapi_client as dapi_client; +pub mod sync; /// Version of the SDK pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/packages/rs-sdk/src/mock/provider.rs b/packages/rs-sdk/src/mock/provider.rs index 8c0297bf475..879c4137ebe 100644 --- a/packages/rs-sdk/src/mock/provider.rs +++ b/packages/rs-sdk/src/mock/provider.rs @@ -1,13 +1,13 @@ //! Example ContextProvider that uses the Core gRPC API to fetch data from Platform. -use crate::core_client::CoreClient; +use crate::core::LowLevelDashCoreClient; use crate::platform::Fetch; +use crate::sync::block_on; use crate::{Error, Sdk}; use arc_swap::ArcSwapAny; use dpp::prelude::{CoreBlockHeight, DataContract, Identifier}; use drive_proof_verifier::error::ContextProviderError; use drive_proof_verifier::ContextProvider; -use pollster::FutureExt; use std::hash::Hash; use std::num::NonZeroUsize; use std::sync::Arc; @@ -17,7 +17,7 @@ use std::sync::Arc; /// Example [ContextProvider] used by the Sdk for testing purposes. pub struct GrpcContextProvider { /// Core client - core: CoreClient, + core: LowLevelDashCoreClient, /// [Sdk] to use when fetching data from Platform /// /// Note that if the `sdk` is `None`, the context provider will not be able to fetch data itself and will rely on @@ -62,7 +62,8 @@ impl GrpcContextProvider { data_contracts_cache_size: NonZeroUsize, quorum_public_keys_cache_size: NonZeroUsize, ) -> Result { - let core_client = CoreClient::new(core_ip, core_port, core_user, core_password)?; + let core_client = + LowLevelDashCoreClient::new(core_ip, core_port, core_user, core_password)?; Ok(Self { core: core_client, sdk: ArcSwapAny::new(Arc::new(sdk)), @@ -197,9 +198,9 @@ impl ContextProvider for GrpcContextProvider { let sdk_cloned = sdk.clone(); - let data_contract: Option = DataContract::fetch(&sdk_cloned, contract_id) - .block_on() - .map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?; + let data_contract: Option = + block_on(async move { DataContract::fetch(&sdk_cloned, contract_id).await })? + .map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?; if let Some(ref dc) = data_contract { self.data_contracts_cache.put(*data_contract_id, dc.clone()); diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index 02258c0cd13..bc9c3927716 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -6,6 +6,7 @@ use crate::{ types::{evonode::EvoNode, identity::IdentityRequest}, DocumentQuery, Fetch, FetchMany, Query, }, + sync::block_on, Error, Sdk, }; use arc_swap::ArcSwapOption; @@ -24,7 +25,7 @@ use rs_dapi_client::{ DapiClient, DumpData, }; use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, OwnedMutexGuard}; use super::MockResponse; @@ -82,6 +83,17 @@ impl MockDashPlatformSdk { self.platform_version } + /// Load all expectations from files in a directory asynchronously. + /// + /// See [MockDashPlatformSdk::load_expectations_sync()] for more details. + #[deprecated(since = "1.4.0", note = "use load_expectations_sync")] + pub async fn load_expectations + Send + 'static>( + &mut self, + dir: P, + ) -> Result<&mut Self, Error> { + self.load_expectations_sync(dir) + } + /// Load all expectations from files in a directory. /// /// @@ -89,7 +101,7 @@ impl MockDashPlatformSdk { /// This function can be used to load expectations after the Sdk is created, or use alternative location. /// Expectation files must be prefixed with [DapiClient::DUMP_FILE_PREFIX] and /// have `.json` extension. - pub async fn load_expectations>( + pub fn load_expectations_sync>( &mut self, dir: P, ) -> Result<&mut Self, Error> { @@ -114,97 +126,80 @@ impl MockDashPlatformSdk { .map(|f| f.path()) .collect(); + let mut dapi = block_on(self.dapi.clone().lock_owned())?; + for filename in &files { let basename = filename.file_name().unwrap().to_str().unwrap(); let request_type = basename.split('_').nth(1).unwrap_or_default(); match request_type { - "DocumentQuery" => self.load_expectation::(filename).await?, + "DocumentQuery" => load_expectation::(&mut dapi, filename)?, "GetEpochsInfoRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } "GetDataContractRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } "GetDataContractsRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } "GetDataContractHistoryRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } - "IdentityRequest" => self.load_expectation::(filename).await?, + "IdentityRequest" => load_expectation::(&mut dapi, filename)?, "GetIdentityRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } "GetIdentityBalanceRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } "GetIdentityContractNonceRequest" => { - self.load_expectation::(filename) - .await? - } - "GetIdentityBalanceAndRevisionRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } + "GetIdentityBalanceAndRevisionRequest" => load_expectation::< + proto::GetIdentityBalanceAndRevisionRequest, + >(&mut dapi, filename)?, "GetIdentityKeysRequest" => { - self.load_expectation::(filename) - .await? - } - "GetProtocolVersionUpgradeStateRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } + "GetProtocolVersionUpgradeStateRequest" => load_expectation::< + proto::GetProtocolVersionUpgradeStateRequest, + >(&mut dapi, filename)?, "GetProtocolVersionUpgradeVoteStatusRequest" => { - self.load_expectation::( - filename, - ) - .await? + load_expectation::( + &mut dapi, filename, + )? } "GetContestedResourcesRequest" => { - self.load_expectation::(filename) - .await? - } - "GetContestedResourceVoteStateRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } + "GetContestedResourceVoteStateRequest" => load_expectation::< + proto::GetContestedResourceVoteStateRequest, + >(&mut dapi, filename)?, "GetContestedResourceVotersForIdentityRequest" => { - self.load_expectation::( - filename, - ) - .await? + load_expectation::( + &mut dapi, filename, + )? } "GetContestedResourceIdentityVotesRequest" => { - self.load_expectation::( - filename, - ) - .await? + load_expectation::( + &mut dapi, filename, + )? } "GetVotePollsByEndDateRequest" => { - self.load_expectation::(filename) - .await? - } - "GetPrefundedSpecializedBalanceRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } + "GetPrefundedSpecializedBalanceRequest" => load_expectation::< + proto::GetPrefundedSpecializedBalanceRequest, + >(&mut dapi, filename)?, "GetPathElementsRequest" => { - self.load_expectation::(filename) - .await? - } - "GetTotalCreditsInPlatformRequest" => { - self.load_expectation::(filename) - .await? + load_expectation::(&mut dapi, filename)? } - "EvoNode" => self.load_expectation::(filename).await?, + "GetTotalCreditsInPlatformRequest" => load_expectation::< + proto::GetTotalCreditsInPlatformRequest, + >(&mut dapi, filename)?, + "EvoNode" => load_expectation::(&mut dapi, filename)?, _ => { return Err(Error::Config(format!( "unknown request type {} in {}, missing match arm in load_expectations?", @@ -218,21 +213,6 @@ impl MockDashPlatformSdk { Ok(self) } - async fn load_expectation(&mut self, path: &PathBuf) -> Result<(), Error> { - let data = DumpData::::load(path) - .map_err(|e| { - Error::Config(format!( - "cannot load mock expectations from {}: {}", - path.display(), - e - )) - })? - .deserialize(); - - self.dapi.lock().await.expect(&data.0, &data.1)?; - Ok(()) - } - /// Expect a [Fetch] request and return provided object. /// /// This method is used to define mock expectations for [Fetch] requests. @@ -304,7 +284,7 @@ impl MockDashPlatformSdk { /// ## Generic Parameters /// /// - `O`: Type of the object that will be returned in response to the query. - /// Must implement [FetchMany]. `Vec` must implement [MockResponse]. + /// Must implement [FetchMany]. `Vec` must implement [MockResponse]. /// - `Q`: Type of the query that will be sent to Platform. Must implement [Query] and [Mockable]. /// /// ## Arguments @@ -330,20 +310,23 @@ impl MockDashPlatformSdk { K: Ord, O: FetchMany, Q: Query<>::Request>, - R: FromIterator<(K, Option)> + MockResponse + Send + Default, + R, >( &mut self, query: Q, objects: Option, ) -> Result<&mut Self, Error> where - R: MockResponse, - <>::Request as TransportRequest>::Response: Default, - R: FromProof< + R: FromIterator<(K, Option)> + + MockResponse + + FromProof< >::Request, Request = >::Request, Response = <>::Request as TransportRequest>::Response, - > + Sync, + > + Sync + + Send + + Default, + <>::Request as TransportRequest>::Response: Default, { let grpc_request = query.query(self.prove()).expect("query must be correct"); self.expect(grpc_request, objects).await?; @@ -431,3 +414,25 @@ impl MockDashPlatformSdk { } } } + +/// Load expectation from file and save it to `dapi_guard` mock Dapi client. +/// +/// This function is used to load expectations from files in a directory. +/// It is implemented without reference to the `MockDashPlatformSdk` object +/// to make it easier to use in async context. +fn load_expectation( + dapi_guard: &mut OwnedMutexGuard, + path: &PathBuf, +) -> Result<(), Error> { + let data = DumpData::::load(path) + .map_err(|e| { + Error::Config(format!( + "cannot load mock expectations from {}: {}", + path.display(), + e + )) + })? + .deserialize(); + dapi_guard.expect(&data.0, &data.1)?; + Ok(()) +} diff --git a/packages/rs-sdk/src/platform/block_info_from_metadata.rs b/packages/rs-sdk/src/platform/block_info_from_metadata.rs index 713e1e176d4..5e96e136550 100644 --- a/packages/rs-sdk/src/platform/block_info_from_metadata.rs +++ b/packages/rs-sdk/src/platform/block_info_from_metadata.rs @@ -12,8 +12,8 @@ use drive::error::proof::ProofError; /// /// # Parameters /// - `response_metadata`: A reference to `ResponseMetadata` obtained from a platform response. -/// This metadata includes various block-related information such as time in milliseconds, -/// height, core chain locked height, and epoch. +/// This metadata includes various block-related information such as time in milliseconds, +/// height, core chain locked height, and epoch. /// /// # Returns /// If successful, returns `Ok(BlockInfo)` where `BlockInfo` contains: diff --git a/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs b/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs index ce6d32e0716..1d72c86e07c 100644 --- a/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs +++ b/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs @@ -4,7 +4,6 @@ use dpp::identity::accessors::IdentityGettersV0; use dpp::identity::core_script::CoreScript; use dpp::identity::signer::Signer; use dpp::identity::{Identity, IdentityPublicKey}; -use dpp::prelude::UserFeeIncrease; use crate::platform::transition::broadcast::BroadcastStateTransition; use crate::platform::transition::put_settings::PutSettings; @@ -47,9 +46,7 @@ impl WithdrawFromIdentity for Identity { ) -> Result { let new_identity_nonce = sdk.get_identity_nonce(self.id(), true, settings).await?; let script = address.map(|address| CoreScript::new(address.script_pubkey())); - let user_fee_increase = settings - .map(|settings| settings.user_fee_increase) - .flatten(); + let user_fee_increase = settings.and_then(|settings| settings.user_fee_increase); let state_transition = IdentityCreditWithdrawalTransition::try_from_identity( self, script, diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index abd02e184cc..f7f938703d7 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -7,6 +7,7 @@ use crate::mock::MockResponse; use crate::mock::{provider::GrpcContextProvider, MockDashPlatformSdk}; use crate::platform::transition::put_settings::PutSettings; use crate::platform::{Fetch, Identifier}; +use arc_swap::{ArcSwapAny, ArcSwapOption}; use dapi_grpc::mock::Mockable; use dapi_grpc::platform::v0::{Proof, ResponseMetadata}; use dpp::bincode; @@ -40,6 +41,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(feature = "mocks")] use tokio::sync::{Mutex, MutexGuard}; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use zeroize::Zeroizing; /// How many data contracts fit in the cache. pub const DEFAULT_CONTRACT_CACHE_SIZE: usize = 100; @@ -79,7 +81,6 @@ pub type LastQueryTimestamp = u64; /// ## Examples /// /// See tests/ for examples of using the SDK. -#[derive(Clone)] pub struct Sdk { /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) pub network: Network, @@ -97,7 +98,7 @@ pub struct Sdk { /// ## Panics /// /// Note that setting this to None can panic. - context_provider: Option>>, + context_provider: ArcSwapOption>, /// Cancellation token; once cancelled, all pending requests should be aborted. pub(crate) cancel_token: CancellationToken, @@ -105,6 +106,20 @@ pub struct Sdk { #[cfg(feature = "mocks")] dump_dir: Option, } +impl Clone for Sdk { + fn clone(&self) -> Self { + Self { + network: self.network, + inner: self.inner.clone(), + proofs: self.proofs, + internal_cache: Arc::clone(&self.internal_cache), + context_provider: ArcSwapOption::new(self.context_provider.load_full()), + cancel_token: self.cancel_token.clone(), + #[cfg(feature = "mocks")] + dump_dir: self.dump_dir.clone(), + } + } +} impl Debug for Sdk { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -203,8 +218,7 @@ impl Sdk { O::Request: Mockable, { let provider = self - .context_provider - .as_ref() + .context_provider() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; match self.inner { @@ -243,8 +257,7 @@ impl Sdk { O::Request: Mockable, { let provider = self - .context_provider - .as_ref() + .context_provider() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; match self.inner { @@ -262,8 +275,13 @@ impl Sdk { } } } + + /// Return [ContextProvider] used by the SDK. pub fn context_provider(&self) -> Option { - self.context_provider.as_ref().map(Arc::clone) + let provider_guard = self.context_provider.load(); + let provider = provider_guard.as_ref().map(Arc::clone); + + provider } /// Returns a mutable reference to the `MockDashPlatformSdk` instance. @@ -493,9 +511,9 @@ impl Sdk { /// [ContextProvider] is used to access state information, like data contracts and quorum public keys. /// /// Note that this will overwrite any previous context provider. - pub fn set_context_provider(&mut self, context_provider: C) { + pub fn set_context_provider(&self, context_provider: C) { self.context_provider - .replace(Arc::new(Box::new(context_provider))); + .swap(Some(Arc::new(Box::new(context_provider)))); } /// Returns a future that resolves when the Sdk is cancelled (eg. shutdown was requested). @@ -568,7 +586,7 @@ pub struct SdkBuilder { core_ip: String, core_port: u16, core_user: String, - core_password: String, + core_password: Zeroizing, /// If true, request and verify proofs of the responses. proofs: bool, @@ -604,7 +622,7 @@ impl Default for SdkBuilder { network: Network::Dash, core_ip: "".to_string(), core_port: 0, - core_password: "".to_string(), + core_password: "".to_string().into(), core_user: "".to_string(), proofs: true, @@ -727,7 +745,7 @@ impl SdkBuilder { self.core_ip = ip.to_string(); self.core_port = port; self.core_user = user.to_string(); - self.core_password = password.to_string(); + self.core_password = Zeroizing::from(password.to_string()); self } @@ -772,16 +790,18 @@ impl SdkBuilder { network: self.network, inner:SdkInstance::Dapi { dapi, version:self.version }, proofs:self.proofs, - context_provider: self.context_provider.map(Arc::new), + context_provider: ArcSwapOption::new( self.context_provider.map(Arc::new)), cancel_token: self.cancel_token, #[cfg(feature = "mocks")] dump_dir: self.dump_dir, internal_cache: Default::default(), }; // if context provider is not set correctly (is None), it means we need to fallback to core wallet - if sdk.context_provider.is_none() { + if sdk.context_provider.load().is_none() { #[cfg(feature = "mocks")] if !self.core_ip.is_empty() { + tracing::warn!( + "ContextProvider not set, falling back to a mock one; use SdkBuilder::with_context_provider() to set it up"); let mut context_provider = GrpcContextProvider::new(None, &self.core_ip, self.core_port, &self.core_user, &self.core_password, self.data_contract_cache_size, self.quorum_public_keys_cache_size)?; @@ -792,15 +812,19 @@ impl SdkBuilder { // We have cyclical dependency Sdk <-> GrpcContextProvider, so we just do some // workaround using additional Arc. let context_provider= Arc::new(context_provider); - sdk.context_provider.replace(Arc::new(Box::new(context_provider.clone()))); + sdk.context_provider.swap(Some(Arc::new(Box::new(context_provider.clone())))); context_provider.set_sdk(Some(sdk.clone())); } else{ - tracing::warn!( - "Configure ContextProvider with Sdk::with_context_provider(); otherwise Sdk will fail"); + return Err(Error::Config(concat!( + "context provider is not set, configure it with SdkBuilder::with_context_provider() ", + "or configure Core access with SdkBuilder::with_core() to use mock context provider") + .to_string())); } #[cfg(not(feature = "mocks"))] - tracing::warn!( - "Configure ContextProvider with Sdk::with_context_provider(); otherwise Sdk will fail"); + return Err(Error::Config(concat!( + "context provider is not set, configure it with SdkBuilder::with_context_provider() ", + "or enable `mocks` feature to use mock context provider") + .to_string())); }; sdk @@ -831,13 +855,13 @@ impl SdkBuilder { dump_dir: self.dump_dir.clone(), proofs:self.proofs, internal_cache: Default::default(), - context_provider:Some(Arc::new(context_provider)), + context_provider:ArcSwapAny::new( Some(Arc::new(context_provider))), cancel_token: self.cancel_token, }; let mut guard = mock_sdk.try_lock().expect("mock sdk is in use by another thread and connot be reconfigured"); guard.set_sdk(sdk.clone()); if let Some(ref dump_dir) = self.dump_dir { - pollster::block_on( guard.load_expectations(dump_dir))?; + guard.load_expectations_sync(dump_dir)?; }; sdk diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs new file mode 100644 index 00000000000..d3c066e8cb5 --- /dev/null +++ b/packages/rs-sdk/src/sync.rs @@ -0,0 +1,171 @@ +//! Handle async calls from sync code. +//! +//! This is a workaround for an issue in tokio, where you cannot call `block_on` from sync call that is called +//! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result +//! using a channel. +use drive_proof_verifier::error::ContextProviderError; +use std::{future::Future, sync::mpsc::SendError}; +use tokio::runtime::TryCurrentError; + +#[derive(Debug, thiserror::Error)] +pub enum AsyncError { + /// Not running inside tokio runtime + #[error("not running inside tokio runtime: {0}")] + NotInTokioRuntime(#[from] TryCurrentError), + + /// Cannot receive response from async function + #[error("cannot receive response from async function: {0}")] + RecvError(#[from] std::sync::mpsc::RecvError), + + /// Cannot send response from async function + #[error("cannot send response from async function: {0}")] + SendError(String), + + #[error("asynchronous call from synchronous context failed: {0}")] + #[allow(unused)] + Generic(String), +} + +impl From> for AsyncError { + fn from(error: SendError) -> Self { + Self::SendError(error.to_string()) + } +} + +impl From for ContextProviderError { + fn from(error: AsyncError) -> Self { + ContextProviderError::AsyncError(error.to_string()) + } +} + +impl From for crate::Error { + fn from(error: AsyncError) -> Self { + Self::ContextProviderError(error.into()) + } +} + +/// Blocks on the provided future and returns the result. +/// +/// This function is used to call async functions from sync code. +/// Requires the current thread to be running in a tokio runtime. +/// +/// Due to limitations of tokio runtime, we cannot use `tokio::runtime::Runtime::block_on` if we are already inside a tokio runtime. +/// This function is a workaround for that limitation. +pub fn block_on(fut: F) -> Result +where + F: Future + Send + 'static, + F::Output: Send, +{ + tracing::trace!("block_on: running async function from sync code"); + let rt = tokio::runtime::Handle::try_current()?; + let (tx, rx) = std::sync::mpsc::channel(); + tracing::trace!("block_on: Spawning worker"); + let hdl = rt.spawn(worker(fut, tx)); + tracing::trace!("block_on: Worker spawned"); + let resp = tokio::task::block_in_place(|| rx.recv())?; + + tracing::trace!("Response received"); + if !hdl.is_finished() { + tracing::debug!("async-sync worker future is not finished, aborting; this should not happen, but it's fine"); + hdl.abort(); // cleanup the worker future + } + + Ok(resp) +} + +/// Worker function that runs the provided future and sends the result back to the caller using oneshot channel. +async fn worker( + fut: F, + // response: oneshot::Sender, + response: std::sync::mpsc::Sender, +) -> Result<(), AsyncError> { + tracing::trace!("Worker start"); + let result = fut.await; + tracing::trace!("Worker async function completed, sending response"); + response.send(result)?; + tracing::trace!("Worker response sent"); + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use std::future::Future; + use tokio::{ + runtime::Builder, + sync::mpsc::{self, Receiver}, + }; + + /// Test for block_on with async code that calls sync code, which then calls async code again. + /// + /// Given: An async function that calls a sync function, which then calls another async function. + /// When: The async function is executed using block_on. + /// Then: Other threads can still do some work + #[test] + fn test_block_on_nested_async_sync() { + let rt = Builder::new_multi_thread() + .worker_threads(1) // we should be good with 1 worker thread + .max_blocking_threads(1) // we should be good with 1 blocking thread + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + // we repeat this test a few times, to make sure it's stable + for _repeat in 0..5 { + // Create a Tokio runtime; we use the current thread runtime for this test. + + const MSGS: usize = 10; + let (tx, rx) = mpsc::channel::(1); + + // Spawn new worker that will just count. + let worker = async move { + for count in 0..MSGS { + tx.send(count).await.unwrap(); + } + }; + let worker_join = rt.spawn(worker); + // Define the number of levels of execution + let levels = 4; + + // Define the innermost async function + async fn innermost_async_function( + mut rx: Receiver, + ) -> Result { + for i in 0..MSGS { + let count = rx.recv().await.unwrap(); + assert_eq!(count, i); + } + + Ok(String::from("Success")) + } + + // Define the nested sync function + fn nested_sync_function(fut: F) -> Result + where + F: Future> + Send + 'static, + F::Output: Send, + { + block_on(fut)?.map_err(|e| ContextProviderError::Generic(e.to_string())) + } + + // Define the outer async function + async fn outer_async_function( + levels: usize, + rx: Receiver, + ) -> Result { + let mut result = innermost_async_function(rx).await; + for _ in 0..levels { + result = nested_sync_function(async { result }); + } + result + } + + // Run the outer async function using block_on + let result = rt.block_on(outer_async_function(levels, rx)); + + rt.block_on(worker_join).unwrap(); + // Assert the result + assert_eq!(result.unwrap(), "Success"); + } + } +} diff --git a/packages/rs-sdk/tests/fetch/config.rs b/packages/rs-sdk/tests/fetch/config.rs index 445904795d6..1b6efbc615e 100644 --- a/packages/rs-sdk/tests/fetch/config.rs +++ b/packages/rs-sdk/tests/fetch/config.rs @@ -11,6 +11,7 @@ use dpp::{ use rs_dapi_client::AddressList; use serde::Deserialize; use std::{path::PathBuf, str::FromStr}; +use zeroize::Zeroizing; /// Existing document ID /// @@ -43,7 +44,7 @@ pub struct Config { pub core_user: String, /// Password for Dash Core RPC interface #[serde(default)] - pub core_password: String, + pub core_password: Zeroizing, /// When true, use SSL for the Dash Platform node grpc interface #[serde(default)] pub platform_ssl: bool, @@ -71,6 +72,7 @@ pub struct Config { /// ID of document of the type [`existing_document_type_name`](Config::existing_document_type_name) /// in [`existing_data_contract_id`](Config::existing_data_contract_id). #[serde(default = "Config::default_document_id")] + #[allow(unused)] pub existing_document_id: Identifier, // Hex-encoded ProTxHash of the existing HP masternode #[serde(default = "Config::default_protxhash")] @@ -141,14 +143,14 @@ impl Config { /// ## Feature flags /// /// * `offline-testing` is not set - connect to Platform and generate - /// new test vectors during execution + /// new test vectors during execution /// * `offline-testing` is set - use mock implementation and - /// load existing test vectors from disk + /// load existing test vectors from disk /// /// ## Arguments /// /// * namespace - namespace to use when storing mock expectations; this is used to separate - /// expectations from different tests. + /// expectations from different tests. /// /// When empty string is provided, expectations are stored in the root of the dump directory. pub async fn setup_api(&self, namespace: &str) -> dash_sdk::Sdk { diff --git a/packages/rs-sdk/tests/fetch/contested_resource.rs b/packages/rs-sdk/tests/fetch/contested_resource.rs index 950e1416016..643396d495a 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource.rs @@ -19,7 +19,6 @@ use drive::query::{ vote_polls_by_document_type_query::VotePollsByDocumentTypeQuery, }; use drive_proof_verifier::types::ContestedResource; -use std::panic::catch_unwind; /// Test that we can fetch contested resources /// @@ -304,33 +303,35 @@ async fn contested_resources_fields( tracing::debug!(?expect, "Running test case"); // handle panics to not stop other test cases from running - let unwinded = catch_unwind(|| { - { - pollster::block_on(async { - let mut query = base_query(&cfg); - query_mut_fn(&mut query); - - let (test_case_id, sdk) = - setup_sdk_for_test_case(cfg, query.clone(), "contested_resources_fields").await; - tracing::debug!(test_case_id, ?query, "Executing query"); - - ContestedResource::fetch_many(&sdk, query).await - }) - } - }); - let result = match unwinded { + let join_handle = tokio::task::spawn(async move { + let mut query = base_query(&cfg); + query_mut_fn(&mut query); + + let (test_case_id, sdk) = + setup_sdk_for_test_case(cfg, query.clone(), "contested_resources_fields").await; + tracing::debug!(test_case_id, ?query, "Executing query"); + + ContestedResource::fetch_many(&sdk, query).await + }) + .await; + let result = match join_handle { Ok(r) => r, Err(e) => { - let msg = if let Some(s) = e.downcast_ref::<&str>() { - s.to_string() - } else if let Some(s) = e.downcast_ref::() { - s.to_string() - } else { - format!("unknown panic type: {:?}", std::any::type_name_of_val(&e)) - }; + if e.is_panic() { + let e = e.into_panic(); + let msg = if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = e.downcast_ref::() { + s.to_string() + } else { + format!("unknown panic type: {:?}", std::any::type_name_of_val(&e)) + }; - tracing::error!("PANIC: {}", msg); - Err(Error::Generic(msg)) + tracing::error!("PANIC: {}", msg); + Err(Error::Generic(msg)) + } else { + Err(Error::Generic(format!("JoinError: {:?}", e))) + } } }; diff --git a/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs b/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs index 69b66e0e486..fef6138902b 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs @@ -276,7 +276,7 @@ async fn contested_resource_vote_states_with_limit_PLAN_674() { type MutFn = fn(&mut ContestedDocumentVotePollDriveQuery); #[test_case(|q| q.limit = Some(0), Err("limit 0 out of bounds of [1, 100]"); "limit 0")] -#[test_case(|q| q.limit = Some(std::u16::MAX), Err("limit 65535 out of bounds of [1, 100]"); "limit std::u16::MAX")] +#[test_case(|q| q.limit = Some(u16::MAX), Err("limit 65535 out of bounds of [1, 100]"); "limit u16::MAX")] #[test_case(|q| q.start_at = Some(([0x11; 32], true)), Ok("Contenders { winner: None, contenders: {Identifier("); "start_at does not exist should return next contenders")] #[test_case(|q| q.start_at = Some(([0xff; 32], true)), Ok("Contenders { winner: None, contenders: {}, abstain_vote_tally: None, lock_vote_tally: None }"); "start_at 0xff;32 should return zero contenders")] #[test_case(|q| q.vote_poll.document_type_name = "nx doctype".to_string(), Err(r#"code: InvalidArgument, message: "document type nx doctype not found"#); "non existing document type returns InvalidArgument")] diff --git a/packages/rs-sdk/tests/fetch/mock_fetch.rs b/packages/rs-sdk/tests/fetch/mock_fetch.rs index a5d49160772..1b96614ecd4 100644 --- a/packages/rs-sdk/tests/fetch/mock_fetch.rs +++ b/packages/rs-sdk/tests/fetch/mock_fetch.rs @@ -1,5 +1,4 @@ -//! -//! +//! Tests of mocked Fetch trait implementations. use super::common::{mock_data_contract, mock_document_type}; use dash_sdk::{