From 3ee0e8df50f7098322b89d5c0aa92d31f0f4be10 Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 06:37:42 +0700 Subject: [PATCH 01/18] changed contested resource type to a struct --- packages/rs-drive-proof-verifier/src/proof.rs | 2 +- packages/rs-drive-proof-verifier/src/types.rs | 23 ++++++------------- .../rs-sdk/tests/fetch/contested_resource.rs | 4 ++-- 3 files changed, 10 insertions(+), 19 deletions(-) diff --git a/packages/rs-drive-proof-verifier/src/proof.rs b/packages/rs-drive-proof-verifier/src/proof.rs index 89fef813601..0d7b23e05d3 100644 --- a/packages/rs-drive-proof-verifier/src/proof.rs +++ b/packages/rs-drive-proof-verifier/src/proof.rs @@ -1379,7 +1379,7 @@ impl FromProof<platform::GetContestedResourcesRequest> for ContestedResources { verify_tenderdash_proof(proof, mtd, &root_hash, provider)?; let resources: ContestedResources = - items.into_iter().map(ContestedResource::Value).collect(); + items.into_iter().map(|v| ContestedResource(v)).collect(); Ok((resources.into_option(), mtd.clone(), proof.clone())) } diff --git a/packages/rs-drive-proof-verifier/src/types.rs b/packages/rs-drive-proof-verifier/src/types.rs index d49301720b3..a40fefb98cc 100644 --- a/packages/rs-drive-proof-verifier/src/types.rs +++ b/packages/rs-drive-proof-verifier/src/types.rs @@ -224,11 +224,8 @@ pub struct ElementFetchRequestItem(pub Element); pub type IdentityBalanceAndRevision = (u64, Revision); /// Contested resource values. -#[derive(Debug, derive_more::From, Clone, PartialEq)] -pub enum ContestedResource { - /// Generic [Value] - Value(Value), -} +#[derive(Debug, Clone, PartialEq)] +pub struct ContestedResource(pub Value); impl ContestedResource { /// Get the value. @@ -244,13 +241,9 @@ impl ContestedResource { } } -impl TryInto<Value> for ContestedResource { - type Error = crate::Error; - - fn try_into(self) -> Result<Value, Self::Error> { - match self { - ContestedResource::Value(value) => Ok(value), - } +impl Into<Value> for ContestedResource { + fn into(self) -> Value { + self.0 } } @@ -261,9 +254,7 @@ impl PlatformVersionEncode for ContestedResource { encoder: &mut E, _platform_version: &platform_version::PlatformVersion, ) -> Result<(), bincode::error::EncodeError> { - match self { - ContestedResource::Value(value) => value.encode(encoder), - } + self.0.encode(encoder) } } @@ -273,7 +264,7 @@ impl PlatformVersionedDecode for ContestedResource { decoder: &mut D, _platform_version: &platform_version::PlatformVersion, ) -> Result<Self, bincode::error::DecodeError> { - Ok(ContestedResource::Value(Value::decode(decoder)?)) + Ok(ContestedResource(Value::decode(decoder)?)) } } diff --git a/packages/rs-sdk/tests/fetch/contested_resource.rs b/packages/rs-sdk/tests/fetch/contested_resource.rs index 4f899c21261..3839e220b89 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource.rs @@ -105,7 +105,7 @@ async fn contested_resources_start_at_value() { for inclusive in [true, false] { // when I set start_at_value to some value, for (i, start) in all.0.iter().enumerate() { - let ContestedResource::Value(start_value) = start.clone(); + let ContestedResource(start_value) = start.clone(); let query = VotePollsByDocumentTypeQuery { start_at_value: Some((start_value, inclusive)), @@ -217,7 +217,7 @@ async fn contested_resources_limit_PLAN_656() { ); } - let ContestedResource::Value(last) = + let ContestedResource(last) = rss.0.into_iter().last().expect("last contested resource"); start_at_value = Some((last, false)); From fa72ece802458c30251b367adfc3fc3709563c92 Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 06:40:27 +0700 Subject: [PATCH 02/18] expose drive_proof_verifier types --- packages/rs-sdk/src/lib.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/rs-sdk/src/lib.rs b/packages/rs-sdk/src/lib.rs index 62bd9fea4c4..4b66058cc51 100644 --- a/packages/rs-sdk/src/lib.rs +++ b/packages/rs-sdk/src/lib.rs @@ -77,6 +77,7 @@ pub use dashcore_rpc; pub use dpp; pub use drive; pub use rs_dapi_client as dapi_client; +pub use drive_proof_verifier::types as query_types; /// Version of the SDK pub const VERSION: &str = env!("CARGO_PKG_VERSION"); From 4b4a62d103a2927d862d11d0b46c4cd64ef64680 Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 08:47:27 +0700 Subject: [PATCH 03/18] updated to newer backon crate --- Cargo.lock | 18 ++++++++++++++++-- packages/rs-dapi-client/Cargo.toml | 3 ++- packages/rs-dapi-client/src/dapi_client.rs | 2 +- packages/rs-sdk/Cargo.toml | 1 + packages/rs-sdk/src/lib.rs | 2 +- 5 files changed, 21 insertions(+), 5 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4af5a747995..59824d66a48 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,11 +309,13 @@ dependencies = [ [[package]] name = "backon" -version = "0.5.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e5b65cc81d81fbb8488f36458ab4771be35a722967bbc959df28b47397e3ff" +checksum = "e4fa97bb310c33c811334143cf64c5bb2b7b3c06e453db6b095d7061eff8f113" dependencies = [ "fastrand", + "gloo-timers", + "tokio", ] [[package]] @@ -1954,6 +1956,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "grovedb" version = "2.1.0" diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index e5dc9615744..dca68069367 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -5,6 +5,7 @@ edition = "2021" [features] default = ["mocks", "offline-testing"] +tokio-sleep = ["backon/tokio-sleep"] mocks = [ "dep:sha2", "dep:hex", @@ -19,7 +20,7 @@ dump = ["mocks"] offline-testing = [] [dependencies] -backon = "0.5" +backon = { version = "1.2.0"} dapi-grpc = { path = "../dapi-grpc" } futures = "0.3.28" http-serde = { version = "2.1", optional = true } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 17748ab2b02..a2e6fea0cbf 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -250,7 +250,7 @@ impl DapiRequestExecutor for DapiClient { // Start the routine with retry policy applied: // We allow let_and_return because `result` is used later if dump feature is enabled let result = routine - .retry(&retry_settings) + .retry(retry_settings) .notify(|error, duration| { tracing::warn!( ?error, diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index aa41ed04ae7..6dc1736a3fb 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -57,6 +57,7 @@ test-case = { version = "3.3.1" } [features] default = ["mocks", "offline-testing"] +tokio-sleep = ["rs-dapi-client/tokio-sleep"] mocks = [ "dep:serde", diff --git a/packages/rs-sdk/src/lib.rs b/packages/rs-sdk/src/lib.rs index 4b66058cc51..7f181746108 100644 --- a/packages/rs-sdk/src/lib.rs +++ b/packages/rs-sdk/src/lib.rs @@ -76,8 +76,8 @@ pub use sdk::{RequestSettings, Sdk, SdkBuilder}; pub use dashcore_rpc; pub use dpp; pub use drive; -pub use rs_dapi_client as dapi_client; pub use drive_proof_verifier::types as query_types; +pub use rs_dapi_client as dapi_client; /// Version of the SDK pub const VERSION: &str = env!("CARGO_PKG_VERSION"); From ab91e076c17c61ffe0b1f79c3dbce83d8a6235cd Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 12:06:31 +0700 Subject: [PATCH 04/18] fix: fixed pools to add tls --- packages/rs-dapi-client/Cargo.toml | 2 +- .../rs-dapi-client/src/connection_pool.rs | 12 +-- packages/rs-dapi-client/src/dapi_client.rs | 8 +- packages/rs-dapi-client/src/transport.rs | 4 +- packages/rs-dapi-client/src/transport/grpc.rs | 85 +++++++++++++------ packages/rs-sdk/src/platform/types/evonode.rs | 13 ++- 6 files changed, 89 insertions(+), 35 deletions(-) diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index e5dc9615744..c50332feb41 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -20,7 +20,7 @@ offline-testing = [] [dependencies] backon = "0.5" -dapi-grpc = { path = "../dapi-grpc" } +dapi-grpc = { path = "../dapi-grpc", features = ["client"], default-features = false } futures = "0.3.28" http-serde = { version = "2.1", optional = true } rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/packages/rs-dapi-client/src/connection_pool.rs b/packages/rs-dapi-client/src/connection_pool.rs index 16d8cf030db..a6ca2964946 100644 --- a/packages/rs-dapi-client/src/connection_pool.rs +++ b/packages/rs-dapi-client/src/connection_pool.rs @@ -67,19 +67,21 @@ impl ConnectionPool { /// * `prefix` - Prefix for the item in the pool. Used to distinguish between Core and Platform clients. /// * `uri` - URI of the node. /// * `settings` - Applied request settings. - pub fn get_or_create( + pub fn get_or_create<T>( &self, prefix: PoolPrefix, uri: &Uri, settings: Option<&AppliedRequestSettings>, - create: impl FnOnce() -> PoolItem, - ) -> PoolItem { + create: impl FnOnce() -> Result<PoolItem, T>, + ) -> Result<PoolItem, T> { if let Some(cli) = self.get(prefix, uri, settings) { - return cli; + return Ok(cli); } let cli = create(); - self.put(uri, settings, cli.clone()); + if let Ok(cli) = &cli { + self.put(uri, settings, cli.clone()); + } cli } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 17748ab2b02..5b8eef44e53 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -199,7 +199,13 @@ impl DapiRequestExecutor for DapiClient { address.uri().clone(), &applied_settings, &pool, - ); + ) + .map_err(|e| { + DapiClientError::<<R::Client as TransportClient>::Error>::Transport( + e, + address.clone(), + ) + })?; let response = transport_request .execute_transport(&mut transport_client, &applied_settings) diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index a5459834e1b..600189fc2fe 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -51,12 +51,12 @@ pub trait TransportClient: Send + Sized { type Error: CanRetry + Send + Debug + Mockable; /// Build client using node's url. - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self; + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error>; /// Build client using node's url and [AppliedRequestSettings]. fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self; + ) -> Result<Self, Self::Error>; } diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index 98976ed08e8..37e5eb5852a 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -8,7 +8,7 @@ use crate::{request_settings::AppliedRequestSettings, RequestSettings}; use dapi_grpc::core::v0::core_client::CoreClient; use dapi_grpc::core::v0::{self as core_proto}; use dapi_grpc::platform::v0::{self as platform_proto, platform_client::PlatformClient}; -use dapi_grpc::tonic::transport::Uri; +use dapi_grpc::tonic::transport::{ClientTlsConfig, Uri}; use dapi_grpc::tonic::Streaming; use dapi_grpc::tonic::{transport::Channel, IntoRequest}; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; @@ -18,8 +18,11 @@ pub type PlatformGrpcClient = PlatformClient<Channel>; /// Core Client using gRPC transport. pub type CoreGrpcClient = CoreClient<Channel>; -fn create_channel(uri: Uri, settings: Option<&AppliedRequestSettings>) -> Channel { - let mut builder = Channel::builder(uri); +fn create_channel( + uri: Uri, + settings: Option<&AppliedRequestSettings>, +) -> Result<Channel, dapi_grpc::tonic::transport::Error> { + let mut builder = Channel::builder(uri).tls_config(ClientTlsConfig::new())?; if let Some(settings) = settings { if let Some(timeout) = settings.connect_timeout { @@ -27,50 +30,84 @@ fn create_channel(uri: Uri, settings: Option<&AppliedRequestSettings>) -> Channe } } - builder.connect_lazy() + Ok(builder.connect_lazy()) } impl TransportClient for PlatformGrpcClient { type Error = dapi_grpc::tonic::Status; - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self { - pool.get_or_create(PoolPrefix::Platform, &uri, None, || { - Self::new(create_channel(uri.clone(), None)).into() - }) - .into() + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create(PoolPrefix::Platform, &uri, None, || { + match create_channel(uri.clone(), None) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + } + })? + .into()) } fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self { - pool.get_or_create(PoolPrefix::Platform, &uri, Some(settings), || { - Self::new(create_channel(uri.clone(), Some(settings))).into() - }) - .into() + ) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create( + PoolPrefix::Platform, + &uri, + Some(settings), + || match create_channel(uri.clone(), Some(settings)) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + }, + )? + .into()) } } impl TransportClient for CoreGrpcClient { type Error = dapi_grpc::tonic::Status; - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self { - pool.get_or_create(PoolPrefix::Core, &uri, None, || { - Self::new(create_channel(uri.clone(), None)).into() - }) - .into() + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create(PoolPrefix::Core, &uri, None, || { + match create_channel(uri.clone(), None) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + } + })? + .into()) } fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self { - pool.get_or_create(PoolPrefix::Core, &uri, Some(settings), || { - Self::new(create_channel(uri.clone(), Some(settings))).into() - }) - .into() + ) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create( + PoolPrefix::Core, + &uri, + Some(settings), + || match create_channel(uri.clone(), Some(settings)) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + }, + )? + .into()) } } diff --git a/packages/rs-sdk/src/platform/types/evonode.rs b/packages/rs-sdk/src/platform/types/evonode.rs index 01c0630b493..77e893dfd54 100644 --- a/packages/rs-sdk/src/platform/types/evonode.rs +++ b/packages/rs-sdk/src/platform/types/evonode.rs @@ -10,7 +10,7 @@ use futures::{FutureExt, TryFutureExt}; use rs_dapi_client::transport::{ AppliedRequestSettings, PlatformGrpcClient, TransportClient, TransportRequest, }; -use rs_dapi_client::{Address, ConnectionPool, RequestSettings}; +use rs_dapi_client::{Address, ConnectionPool, DapiClientError, RequestSettings}; #[cfg(feature = "mocks")] use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -74,7 +74,16 @@ impl TransportRequest for EvoNode { // We also create a new client to use with this request, so that the user does not need to // reconfigure SDK to use a single node. let pool = ConnectionPool::new(1); - let mut client = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool); + // We create a new client with the given URI and settings + let client_result = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool); + + // Handle the result manually to create a proper error response + let mut client = match client_result { + Ok(client) => client, + Err(e) => { + return async { Err(e) }.boxed(); + } + }; let mut grpc_request = GetStatusRequest { version: Some(get_status_request::Version::V0(GetStatusRequestV0 {})), } From 2f60720fc4abec478fdb8b3881fc3d433b86f0f5 Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 12:23:41 +0700 Subject: [PATCH 05/18] added root certificates --- packages/rs-dapi-client/src/transport/grpc.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index 37e5eb5852a..da8d9f37e5e 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -22,7 +22,11 @@ fn create_channel( uri: Uri, settings: Option<&AppliedRequestSettings>, ) -> Result<Channel, dapi_grpc::tonic::transport::Error> { - let mut builder = Channel::builder(uri).tls_config(ClientTlsConfig::new())?; + let mut builder = Channel::builder(uri).tls_config( + ClientTlsConfig::new() + .with_native_roots() + .with_webpki_roots(), + )?; if let Some(settings) = settings { if let Some(timeout) = settings.connect_timeout { From 01282fb34b81328897975039019989bbb7677cdb Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 12:41:45 +0700 Subject: [PATCH 06/18] assume http2 --- packages/rs-dapi-client/src/transport/grpc.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index da8d9f37e5e..d5180099d0a 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -25,7 +25,8 @@ fn create_channel( let mut builder = Channel::builder(uri).tls_config( ClientTlsConfig::new() .with_native_roots() - .with_webpki_roots(), + .with_webpki_roots() + .assume_http2(true), )?; if let Some(settings) = settings { From c22bafc6f1550e7025ff73100714ff80b168b31c Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 16:22:04 +0700 Subject: [PATCH 07/18] changed T to E --- packages/rs-dapi-client/src/connection_pool.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/rs-dapi-client/src/connection_pool.rs b/packages/rs-dapi-client/src/connection_pool.rs index a6ca2964946..97dd991d509 100644 --- a/packages/rs-dapi-client/src/connection_pool.rs +++ b/packages/rs-dapi-client/src/connection_pool.rs @@ -67,13 +67,13 @@ impl ConnectionPool { /// * `prefix` - Prefix for the item in the pool. Used to distinguish between Core and Platform clients. /// * `uri` - URI of the node. /// * `settings` - Applied request settings. - pub fn get_or_create<T>( + pub fn get_or_create<E>( &self, prefix: PoolPrefix, uri: &Uri, settings: Option<&AppliedRequestSettings>, - create: impl FnOnce() -> Result<PoolItem, T>, - ) -> Result<PoolItem, T> { + create: impl FnOnce() -> Result<PoolItem, E>, + ) -> Result<PoolItem, E> { if let Some(cli) = self.get(prefix, uri, settings) { return Ok(cli); } From 99216183539ca990526775fd253fee6af4b1e514 Mon Sep 17 00:00:00 2001 From: Quantum Explorer <quantum@dash.org> Date: Tue, 8 Oct 2024 16:23:21 +0700 Subject: [PATCH 08/18] another fix --- Cargo.lock | 13 +++++++++++++ packages/rs-dapi-client/Cargo.toml | 4 +--- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 92ff215c4b9..3ddb838738c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,6 +314,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4fa97bb310c33c811334143cf64c5bb2b7b3c06e453db6b095d7061eff8f113" dependencies = [ "fastrand", + "gloo-timers", "tokio", ] @@ -1955,6 +1956,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "grovedb" version = "2.1.0" diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index a66bb10240b..2851a19acfa 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -20,9 +20,7 @@ dump = ["mocks"] offline-testing = [] [dependencies] -backon = { version = "1.2", default-features = false, features = [ - "tokio-sleep", -] } +backon = { version = "1.2"} dapi-grpc = { path = "../dapi-grpc", features = [ "core", "platform", From f5aa584169dee518d787d088d42ab7c0f3e84afc Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Wed, 9 Oct 2024 15:25:32 +0200 Subject: [PATCH 09/18] refactor(sdk): improve context provider async processing --- Cargo.lock | 20 --- packages/rs-dapi-client/Cargo.toml | 5 +- packages/rs-drive-proof-verifier/src/error.rs | 4 + packages/rs-sdk/Cargo.toml | 2 - .../dash_core_client.rs} | 25 ++- packages/rs-sdk/src/core/mod.rs | 4 + packages/rs-sdk/src/internal/mod.rs | 3 + packages/rs-sdk/src/internal/sync.rs | 32 ++++ packages/rs-sdk/src/lib.rs | 3 +- packages/rs-sdk/src/mock/provider.rs | 15 +- packages/rs-sdk/src/mock/sdk.rs | 156 +++++++++--------- packages/rs-sdk/src/sdk.rs | 44 +++-- .../rs-sdk/tests/fetch/contested_resource.rs | 4 +- 13 files changed, 185 insertions(+), 132 deletions(-) rename packages/rs-sdk/src/{core_client.rs => core/dash_core_client.rs} (89%) create mode 100644 packages/rs-sdk/src/internal/mod.rs create mode 100644 packages/rs-sdk/src/internal/sync.rs diff --git a/Cargo.lock b/Cargo.lock index 3ddb838738c..60c0e39c181 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -314,7 +314,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4fa97bb310c33c811334143cf64c5bb2b7b3c06e453db6b095d7061eff8f113" dependencies = [ "fastrand", - "gloo-timers", "tokio", ] @@ -1128,7 +1127,6 @@ dependencies = [ "hex", "http", "lru", - "pollster", "rs-dapi-client", "sanitize-filename", "serde", @@ -1956,18 +1954,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "gloo-timers" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" -dependencies = [ - "futures-channel", - "futures-core", - "js-sys", - "wasm-bindgen", -] - [[package]] name = "grovedb" version = "2.1.0" @@ -3406,12 +3392,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "pollster" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "22686f4785f02a4fcc856d3b3bb19bf6c8160d103f7a99cc258bddd0251dc7f2" - [[package]] name = "portable-atomic" version = "1.7.0" diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index 2851a19acfa..fb06b6abdcd 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [features] default = ["mocks", "offline-testing"] -tokio-sleep = ["backon/tokio-sleep"] mocks = [ "dep:sha2", "dep:hex", @@ -20,7 +19,9 @@ dump = ["mocks"] offline-testing = [] [dependencies] -backon = { version = "1.2"} +backon = { version = "1.2", default-features = false, features = [ + "tokio-sleep", +] } dapi-grpc = { path = "../dapi-grpc", features = [ "core", "platform", diff --git a/packages/rs-drive-proof-verifier/src/error.rs b/packages/rs-drive-proof-verifier/src/error.rs index 02752a4b128..3203eb73174 100644 --- a/packages/rs-drive-proof-verifier/src/error.rs +++ b/packages/rs-drive-proof-verifier/src/error.rs @@ -106,6 +106,10 @@ pub enum ContextProviderError { /// Core Fork Error #[error("activation fork error: {0}")] ActivationForkError(String), + + /// Async error, eg. when tokio runtime fails + #[error("async error: {0}")] + AsyncError(String), } impl From<drive::error::Error> for Error { diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index 6dc1736a3fb..fe5a429389f 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -36,7 +36,6 @@ derive_more = { version = "1.0", features = ["from"] } dashcore-rpc = { git = "https://github.com/dashpay/rust-dashcore-rpc", tag = "v0.15.4" } lru = { version = "0.12.3", optional = true } bip37-bloom-filter = { git = "https://github.com/dashpay/rs-bip37-bloom-filter", branch = "develop" } -pollster = { version = "0.3.0" } [dev-dependencies] tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } @@ -57,7 +56,6 @@ test-case = { version = "3.3.1" } [features] default = ["mocks", "offline-testing"] -tokio-sleep = ["rs-dapi-client/tokio-sleep"] mocks = [ "dep:serde", diff --git a/packages/rs-sdk/src/core_client.rs b/packages/rs-sdk/src/core/dash_core_client.rs similarity index 89% rename from packages/rs-sdk/src/core_client.rs rename to packages/rs-sdk/src/core/dash_core_client.rs index e68bb6166d3..3b398a29cb8 100644 --- a/packages/rs-sdk/src/core_client.rs +++ b/packages/rs-sdk/src/core/dash_core_client.rs @@ -17,17 +17,29 @@ use std::{fmt::Debug, sync::Mutex}; /// Core RPC client that can be used to retrieve quorum keys from core. /// -/// Implements [`ContextProvider`] trait. -/// /// TODO: This is a temporary implementation, effective until we integrate SPV. -pub struct CoreClient { +pub struct LowLevelDashCoreClient { core: Mutex<Client>, server_address: String, core_user: String, + core_password: String, core_port: u16, } -impl Debug for CoreClient { +impl Clone for LowLevelDashCoreClient { + // As Client does not implement Clone, we just create a new instance of CoreClient here. + fn clone(&self) -> Self { + LowLevelDashCoreClient::new( + &self.server_address, + self.core_port, + &self.core_user, + &self.core_password, + ) + .expect("Failed to clone CoreClient when cloning, this should not happen") + } +} + +impl Debug for LowLevelDashCoreClient { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("CoreClient") .field("server_address", &self.server_address) @@ -37,7 +49,7 @@ impl Debug for CoreClient { } } -impl CoreClient { +impl LowLevelDashCoreClient { /// Create new Dash Core client. /// /// # Arguments @@ -63,13 +75,14 @@ impl CoreClient { core: Mutex::new(core), server_address: server_address.to_string(), core_user: core_user.to_string(), + core_password: core_password.to_string(), core_port, }) } } // Wallet functions -impl CoreClient { +impl LowLevelDashCoreClient { /// List unspent transactions /// /// ## Arguments diff --git a/packages/rs-sdk/src/core/mod.rs b/packages/rs-sdk/src/core/mod.rs index ed7d4726cfd..f642f3b26f6 100644 --- a/packages/rs-sdk/src/core/mod.rs +++ b/packages/rs-sdk/src/core/mod.rs @@ -1,4 +1,8 @@ //! Dash Core SDK implementation. //! //! TODO: This is work in progress. +#[cfg(feature = "mocks")] +mod dash_core_client; mod transaction; +#[cfg(feature = "mocks")] +pub use dash_core_client::LowLevelDashCoreClient; diff --git a/packages/rs-sdk/src/internal/mod.rs b/packages/rs-sdk/src/internal/mod.rs new file mode 100644 index 00000000000..81012bb2824 --- /dev/null +++ b/packages/rs-sdk/src/internal/mod.rs @@ -0,0 +1,3 @@ +//! Various internal utilities used by the SDK. + +pub(crate) mod sync; diff --git a/packages/rs-sdk/src/internal/sync.rs b/packages/rs-sdk/src/internal/sync.rs new file mode 100644 index 00000000000..f88e6367fa5 --- /dev/null +++ b/packages/rs-sdk/src/internal/sync.rs @@ -0,0 +1,32 @@ +//! futures-related utilities to handle async code from sync code. + +use std::future::Future; + +use drive_proof_verifier::error::ContextProviderError; + +#[derive(Debug, thiserror::Error)] +pub(crate) enum AsyncError { + #[error("asynchronous call from synchronous context failed: {0}")] + #[allow(unused)] + Generic(String), +} + +impl From<AsyncError> for ContextProviderError { + fn from(error: AsyncError) -> Self { + ContextProviderError::AsyncError(error.to_string()) + } +} + +impl From<AsyncError> for crate::Error { + fn from(error: AsyncError) -> Self { + Self::ContextProviderError(error.into()) + } +} + +/// Block on the provided future and return the result. +pub(crate) fn block_on<F: Future + Send + 'static>(fut: F) -> Result<F::Output, AsyncError> +where + F::Output: Send, +{ + Ok(futures::executor::block_on(fut)) +} diff --git a/packages/rs-sdk/src/lib.rs b/packages/rs-sdk/src/lib.rs index 7f181746108..63f73272159 100644 --- a/packages/rs-sdk/src/lib.rs +++ b/packages/rs-sdk/src/lib.rs @@ -62,9 +62,8 @@ #![allow(rustdoc::private_intra_doc_links)] pub mod core; -#[cfg(feature = "mocks")] -mod core_client; pub mod error; +mod internal; mod internal_cache; pub mod mock; pub mod platform; diff --git a/packages/rs-sdk/src/mock/provider.rs b/packages/rs-sdk/src/mock/provider.rs index 8c0297bf475..0a097f3ea85 100644 --- a/packages/rs-sdk/src/mock/provider.rs +++ b/packages/rs-sdk/src/mock/provider.rs @@ -1,13 +1,13 @@ //! Example ContextProvider that uses the Core gRPC API to fetch data from Platform. -use crate::core_client::CoreClient; +use crate::core::LowLevelDashCoreClient; +use crate::internal::sync::block_on; use crate::platform::Fetch; use crate::{Error, Sdk}; use arc_swap::ArcSwapAny; use dpp::prelude::{CoreBlockHeight, DataContract, Identifier}; use drive_proof_verifier::error::ContextProviderError; use drive_proof_verifier::ContextProvider; -use pollster::FutureExt; use std::hash::Hash; use std::num::NonZeroUsize; use std::sync::Arc; @@ -17,7 +17,7 @@ use std::sync::Arc; /// Example [ContextProvider] used by the Sdk for testing purposes. pub struct GrpcContextProvider { /// Core client - core: CoreClient, + core: LowLevelDashCoreClient, /// [Sdk] to use when fetching data from Platform /// /// Note that if the `sdk` is `None`, the context provider will not be able to fetch data itself and will rely on @@ -62,7 +62,8 @@ impl GrpcContextProvider { data_contracts_cache_size: NonZeroUsize, quorum_public_keys_cache_size: NonZeroUsize, ) -> Result<Self, Error> { - let core_client = CoreClient::new(core_ip, core_port, core_user, core_password)?; + let core_client = + LowLevelDashCoreClient::new(core_ip, core_port, core_user, core_password)?; Ok(Self { core: core_client, sdk: ArcSwapAny::new(Arc::new(sdk)), @@ -197,9 +198,9 @@ impl ContextProvider for GrpcContextProvider { let sdk_cloned = sdk.clone(); - let data_contract: Option<DataContract> = DataContract::fetch(&sdk_cloned, contract_id) - .block_on() - .map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?; + let data_contract: Option<DataContract> = + block_on(async move { DataContract::fetch(&sdk_cloned, contract_id).await })? + .map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?; if let Some(ref dc) = data_contract { self.data_contracts_cache.put(*data_contract_id, dc.clone()); diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index 02258c0cd13..f89c806cf22 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -2,6 +2,7 @@ //! //! See [MockDashPlatformSdk] for more details. use crate::{ + internal::sync::block_on, platform::{ types::{evonode::EvoNode, identity::IdentityRequest}, DocumentQuery, Fetch, FetchMany, Query, @@ -24,7 +25,7 @@ use rs_dapi_client::{ DapiClient, DumpData, }; use std::{collections::BTreeMap, path::PathBuf, sync::Arc}; -use tokio::sync::Mutex; +use tokio::sync::{Mutex, OwnedMutexGuard}; use super::MockResponse; @@ -82,6 +83,17 @@ impl MockDashPlatformSdk { self.platform_version } + /// Load all expectations from files in a directory asynchronously. + /// + /// See [MockDashPlatformSdk::load_expectations_sync()] for more details. + #[deprecated(note = "use load_expectations_sync")] + pub async fn load_expectations<P: AsRef<std::path::Path> + Send + 'static>( + &mut self, + dir: P, + ) -> Result<&mut Self, Error> { + self.load_expectations_sync(dir) + } + /// Load all expectations from files in a directory. /// /// @@ -89,7 +101,7 @@ impl MockDashPlatformSdk { /// This function can be used to load expectations after the Sdk is created, or use alternative location. /// Expectation files must be prefixed with [DapiClient::DUMP_FILE_PREFIX] and /// have `.json` extension. - pub async fn load_expectations<P: AsRef<std::path::Path>>( + pub fn load_expectations_sync<P: AsRef<std::path::Path>>( &mut self, dir: P, ) -> Result<&mut Self, Error> { @@ -114,97 +126,80 @@ impl MockDashPlatformSdk { .map(|f| f.path()) .collect(); + let mut dapi = block_on(self.dapi.clone().lock_owned())?; + for filename in &files { let basename = filename.file_name().unwrap().to_str().unwrap(); let request_type = basename.split('_').nth(1).unwrap_or_default(); match request_type { - "DocumentQuery" => self.load_expectation::<DocumentQuery>(filename).await?, + "DocumentQuery" => load_expectation::<DocumentQuery>(&mut dapi, filename)?, "GetEpochsInfoRequest" => { - self.load_expectation::<proto::GetEpochsInfoRequest>(filename) - .await? + load_expectation::<proto::GetEpochsInfoRequest>(&mut dapi, filename)? } "GetDataContractRequest" => { - self.load_expectation::<proto::GetDataContractRequest>(filename) - .await? + load_expectation::<proto::GetDataContractRequest>(&mut dapi, filename)? } "GetDataContractsRequest" => { - self.load_expectation::<proto::GetDataContractsRequest>(filename) - .await? + load_expectation::<proto::GetDataContractsRequest>(&mut dapi, filename)? } "GetDataContractHistoryRequest" => { - self.load_expectation::<proto::GetDataContractHistoryRequest>(filename) - .await? + load_expectation::<proto::GetDataContractHistoryRequest>(&mut dapi, filename)? } - "IdentityRequest" => self.load_expectation::<IdentityRequest>(filename).await?, + "IdentityRequest" => load_expectation::<IdentityRequest>(&mut dapi, filename)?, "GetIdentityRequest" => { - self.load_expectation::<proto::GetIdentityRequest>(filename) - .await? + load_expectation::<proto::GetIdentityRequest>(&mut dapi, filename)? } "GetIdentityBalanceRequest" => { - self.load_expectation::<proto::GetIdentityBalanceRequest>(filename) - .await? + load_expectation::<proto::GetIdentityBalanceRequest>(&mut dapi, filename)? } "GetIdentityContractNonceRequest" => { - self.load_expectation::<proto::GetIdentityContractNonceRequest>(filename) - .await? - } - "GetIdentityBalanceAndRevisionRequest" => { - self.load_expectation::<proto::GetIdentityBalanceAndRevisionRequest>(filename) - .await? + load_expectation::<proto::GetIdentityContractNonceRequest>(&mut dapi, filename)? } + "GetIdentityBalanceAndRevisionRequest" => load_expectation::< + proto::GetIdentityBalanceAndRevisionRequest, + >(&mut dapi, filename)?, "GetIdentityKeysRequest" => { - self.load_expectation::<proto::GetIdentityKeysRequest>(filename) - .await? - } - "GetProtocolVersionUpgradeStateRequest" => { - self.load_expectation::<proto::GetProtocolVersionUpgradeStateRequest>(filename) - .await? + load_expectation::<proto::GetIdentityKeysRequest>(&mut dapi, filename)? } + "GetProtocolVersionUpgradeStateRequest" => load_expectation::< + proto::GetProtocolVersionUpgradeStateRequest, + >(&mut dapi, filename)?, "GetProtocolVersionUpgradeVoteStatusRequest" => { - self.load_expectation::<proto::GetProtocolVersionUpgradeVoteStatusRequest>( - filename, - ) - .await? + load_expectation::<proto::GetProtocolVersionUpgradeVoteStatusRequest>( + &mut dapi, filename, + )? } "GetContestedResourcesRequest" => { - self.load_expectation::<proto::GetContestedResourcesRequest>(filename) - .await? - } - "GetContestedResourceVoteStateRequest" => { - self.load_expectation::<proto::GetContestedResourceVoteStateRequest>(filename) - .await? + load_expectation::<proto::GetContestedResourcesRequest>(&mut dapi, filename)? } + "GetContestedResourceVoteStateRequest" => load_expectation::< + proto::GetContestedResourceVoteStateRequest, + >(&mut dapi, filename)?, "GetContestedResourceVotersForIdentityRequest" => { - self.load_expectation::<proto::GetContestedResourceVotersForIdentityRequest>( - filename, - ) - .await? + load_expectation::<proto::GetContestedResourceVotersForIdentityRequest>( + &mut dapi, filename, + )? } "GetContestedResourceIdentityVotesRequest" => { - self.load_expectation::<proto::GetContestedResourceIdentityVotesRequest>( - filename, - ) - .await? + load_expectation::<proto::GetContestedResourceIdentityVotesRequest>( + &mut dapi, filename, + )? } "GetVotePollsByEndDateRequest" => { - self.load_expectation::<proto::GetVotePollsByEndDateRequest>(filename) - .await? - } - "GetPrefundedSpecializedBalanceRequest" => { - self.load_expectation::<proto::GetPrefundedSpecializedBalanceRequest>(filename) - .await? + load_expectation::<proto::GetVotePollsByEndDateRequest>(&mut dapi, filename)? } + "GetPrefundedSpecializedBalanceRequest" => load_expectation::< + proto::GetPrefundedSpecializedBalanceRequest, + >(&mut dapi, filename)?, "GetPathElementsRequest" => { - self.load_expectation::<proto::GetPathElementsRequest>(filename) - .await? - } - "GetTotalCreditsInPlatformRequest" => { - self.load_expectation::<proto::GetTotalCreditsInPlatformRequest>(filename) - .await? + load_expectation::<proto::GetPathElementsRequest>(&mut dapi, filename)? } - "EvoNode" => self.load_expectation::<EvoNode>(filename).await?, + "GetTotalCreditsInPlatformRequest" => load_expectation::< + proto::GetTotalCreditsInPlatformRequest, + >(&mut dapi, filename)?, + "EvoNode" => load_expectation::<EvoNode>(&mut dapi, filename)?, _ => { return Err(Error::Config(format!( "unknown request type {} in {}, missing match arm in load_expectations?", @@ -218,21 +213,6 @@ impl MockDashPlatformSdk { Ok(self) } - async fn load_expectation<T: TransportRequest>(&mut self, path: &PathBuf) -> Result<(), Error> { - let data = DumpData::<T>::load(path) - .map_err(|e| { - Error::Config(format!( - "cannot load mock expectations from {}: {}", - path.display(), - e - )) - })? - .deserialize(); - - self.dapi.lock().await.expect(&data.0, &data.1)?; - Ok(()) - } - /// Expect a [Fetch] request and return provided object. /// /// This method is used to define mock expectations for [Fetch] requests. @@ -304,7 +284,7 @@ impl MockDashPlatformSdk { /// ## Generic Parameters /// /// - `O`: Type of the object that will be returned in response to the query. - /// Must implement [FetchMany]. `Vec<O>` must implement [MockResponse]. + /// Must implement [FetchMany]. `Vec<O>` must implement [MockResponse]. /// - `Q`: Type of the query that will be sent to Platform. Must implement [Query] and [Mockable]. /// /// ## Arguments @@ -330,14 +310,14 @@ impl MockDashPlatformSdk { K: Ord, O: FetchMany<K, R>, Q: Query<<O as FetchMany<K, R>>::Request>, - R: FromIterator<(K, Option<O>)> + MockResponse + Send + Default, + R, >( &mut self, query: Q, objects: Option<R>, ) -> Result<&mut Self, Error> where - R: MockResponse, + R: FromIterator<(K, Option<O>)> + MockResponse + Send + Default + MockResponse, <<O as FetchMany<K, R>>::Request as TransportRequest>::Response: Default, R: FromProof< <O as FetchMany<K, R>>::Request, @@ -431,3 +411,25 @@ impl MockDashPlatformSdk { } } } + +/// Load expectation from file and save it to `dapi_guard` mock Dapi client. +/// +/// This function is used to load expectations from files in a directory. +/// It is implemented without reference to the `MockDashPlatformSdk` object +/// to make it easier to use in async context. +fn load_expectation<T: TransportRequest>( + dapi_guard: &mut OwnedMutexGuard<MockDapiClient>, + path: &PathBuf, +) -> Result<(), Error> { + let data = DumpData::<T>::load(path) + .map_err(|e| { + Error::Config(format!( + "cannot load mock expectations from {}: {}", + path.display(), + e + )) + })? + .deserialize(); + dapi_guard.expect(&data.0, &data.1)?; + Ok(()) +} diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index abd02e184cc..5a033f9e414 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -7,6 +7,7 @@ use crate::mock::MockResponse; use crate::mock::{provider::GrpcContextProvider, MockDashPlatformSdk}; use crate::platform::transition::put_settings::PutSettings; use crate::platform::{Fetch, Identifier}; +use arc_swap::{ArcSwapAny, ArcSwapOption}; use dapi_grpc::mock::Mockable; use dapi_grpc::platform::v0::{Proof, ResponseMetadata}; use dpp::bincode; @@ -79,7 +80,6 @@ pub type LastQueryTimestamp = u64; /// ## Examples /// /// See tests/ for examples of using the SDK. -#[derive(Clone)] pub struct Sdk { /// The network that the sdk is configured for (Dash (mainnet), Testnet, Devnet, Regtest) pub network: Network, @@ -97,7 +97,7 @@ pub struct Sdk { /// ## Panics /// /// Note that setting this to None can panic. - context_provider: Option<Arc<Box<dyn ContextProvider>>>, + context_provider: ArcSwapOption<Box<dyn ContextProvider>>, /// Cancellation token; once cancelled, all pending requests should be aborted. pub(crate) cancel_token: CancellationToken, @@ -105,6 +105,20 @@ pub struct Sdk { #[cfg(feature = "mocks")] dump_dir: Option<PathBuf>, } +impl Clone for Sdk { + fn clone(&self) -> Self { + Self { + network: self.network, + inner: self.inner.clone(), + proofs: self.proofs, + internal_cache: Arc::clone(&self.internal_cache), + context_provider: ArcSwapOption::new(self.context_provider.load_full()), + cancel_token: self.cancel_token.clone(), + #[cfg(feature = "mocks")] + dump_dir: self.dump_dir.clone(), + } + } +} impl Debug for Sdk { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -202,8 +216,8 @@ impl Sdk { where O::Request: Mockable, { - let provider = self - .context_provider + let provider_guard = self.context_provider.load(); + let provider = provider_guard .as_ref() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; @@ -242,8 +256,8 @@ impl Sdk { where O::Request: Mockable, { - let provider = self - .context_provider + let provider_guard = self.context_provider.load(); + let provider = provider_guard .as_ref() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; @@ -263,7 +277,9 @@ impl Sdk { } } pub fn context_provider(&self) -> Option<impl ContextProvider> { - self.context_provider.as_ref().map(Arc::clone) + let provider_guard = self.context_provider.load(); + let provider = provider_guard.as_ref().map(Arc::clone); + provider } /// Returns a mutable reference to the `MockDashPlatformSdk` instance. @@ -493,9 +509,9 @@ impl Sdk { /// [ContextProvider] is used to access state information, like data contracts and quorum public keys. /// /// Note that this will overwrite any previous context provider. - pub fn set_context_provider<C: ContextProvider + 'static>(&mut self, context_provider: C) { + pub fn set_context_provider<C: ContextProvider + 'static>(&self, context_provider: C) { self.context_provider - .replace(Arc::new(Box::new(context_provider))); + .swap(Some(Arc::new(Box::new(context_provider)))); } /// Returns a future that resolves when the Sdk is cancelled (eg. shutdown was requested). @@ -772,14 +788,14 @@ impl SdkBuilder { network: self.network, inner:SdkInstance::Dapi { dapi, version:self.version }, proofs:self.proofs, - context_provider: self.context_provider.map(Arc::new), + context_provider: ArcSwapOption::new( self.context_provider.map(Arc::new)), cancel_token: self.cancel_token, #[cfg(feature = "mocks")] dump_dir: self.dump_dir, internal_cache: Default::default(), }; // if context provider is not set correctly (is None), it means we need to fallback to core wallet - if sdk.context_provider.is_none() { + if sdk.context_provider.load().is_none() { #[cfg(feature = "mocks")] if !self.core_ip.is_empty() { let mut context_provider = GrpcContextProvider::new(None, @@ -792,7 +808,7 @@ impl SdkBuilder { // We have cyclical dependency Sdk <-> GrpcContextProvider, so we just do some // workaround using additional Arc. let context_provider= Arc::new(context_provider); - sdk.context_provider.replace(Arc::new(Box::new(context_provider.clone()))); + sdk.context_provider.swap(Some(Arc::new(Box::new(context_provider.clone())))); context_provider.set_sdk(Some(sdk.clone())); } else{ tracing::warn!( @@ -831,13 +847,13 @@ impl SdkBuilder { dump_dir: self.dump_dir.clone(), proofs:self.proofs, internal_cache: Default::default(), - context_provider:Some(Arc::new(context_provider)), + context_provider:ArcSwapAny::new( Some(Arc::new(context_provider))), cancel_token: self.cancel_token, }; let mut guard = mock_sdk.try_lock().expect("mock sdk is in use by another thread and connot be reconfigured"); guard.set_sdk(sdk.clone()); if let Some(ref dump_dir) = self.dump_dir { - pollster::block_on( guard.load_expectations(dump_dir))?; + guard.load_expectations_sync(dump_dir)?; }; sdk diff --git a/packages/rs-sdk/tests/fetch/contested_resource.rs b/packages/rs-sdk/tests/fetch/contested_resource.rs index 3839e220b89..8510791764c 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource.rs @@ -278,7 +278,7 @@ async fn contested_resources_limit_PLAN_656() { }, Ok("ContestedResources([])".into()); "Non-existing end_index_values returns error")] #[test_case::test_case(|q| q.end_index_values = vec![Value::Array(vec![0.into(), 1.into()])], Err("incorrect index values error: too many end index values were provided"); "wrong type of end_index_values should return InvalidArgument")] #[test_case::test_case(|q| q.limit = Some(0), Err(r#"code: InvalidArgument"#); "limit 0 returns InvalidArgument")] -#[test_case::test_case(|q| q.limit = Some(std::u16::MAX), Err(r#"code: InvalidArgument"#); "limit std::u16::MAX returns InvalidArgument")] +#[test_case::test_case(|q| q.limit = Some(u16::MAX), Err(r#"code: InvalidArgument"#); "limit u16::MAX returns InvalidArgument")] // Disabled due to bug PLAN-656 // #[test_case::test_case(|q| { // q.start_index_values = vec![Value::Text("dash".to_string())]; @@ -306,7 +306,7 @@ async fn contested_resources_fields( // handle panics to not stop other test cases from running let unwinded = catch_unwind(|| { { - pollster::block_on(async { + futures::executor::block_on(async { let mut query = base_query(&cfg); query_mut_fn(&mut query); From f58e08cc79acf395a2923ed0c8c9fdc554d541db Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:36:46 +0200 Subject: [PATCH 10/18] refactor(sdk): store passwords in Zeroizing<String> --- Cargo.lock | 16 ++++++++++++++++ packages/rs-sdk/Cargo.toml | 2 ++ packages/rs-sdk/examples/read_contract.rs | 5 +++-- packages/rs-sdk/src/core/dash_core_client.rs | 5 +++-- packages/rs-sdk/src/sdk.rs | 7 ++++--- packages/rs-sdk/tests/fetch/config.rs | 9 +++++---- 6 files changed, 33 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 60c0e39c181..725a80f57a7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1138,6 +1138,7 @@ dependencies = [ "tokio-util", "tracing", "tracing-subscriber", + "zeroize", ] [[package]] @@ -5649,6 +5650,21 @@ name = "zeroize" version = "1.8.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ced3678a2879b30306d323f4542626697a464a97c0a07c9aebf7ebca65cd4dde" +dependencies = [ + "serde", + "zeroize_derive", +] + +[[package]] +name = "zeroize_derive" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.75", +] [[package]] name = "zip" diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index fe5a429389f..7d7a4bb25a9 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -36,6 +36,7 @@ derive_more = { version = "1.0", features = ["from"] } dashcore-rpc = { git = "https://github.com/dashpay/rust-dashcore-rpc", tag = "v0.15.4" } lru = { version = "0.12.3", optional = true } bip37-bloom-filter = { git = "https://github.com/dashpay/rs-bip37-bloom-filter", branch = "develop" } +zeroize = { version = "1.8", features = ["derive"] } [dev-dependencies] tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } @@ -69,6 +70,7 @@ mocks = [ "dep:dotenvy", "dep:envy", "dep:lru", + "zeroize/serde", ] # Run integration tests using test vectors from `tests/vectors/` instead of connecting to live Dash Platform. diff --git a/packages/rs-sdk/examples/read_contract.rs b/packages/rs-sdk/examples/read_contract.rs index ca37f1cfa7d..7ac2cc333db 100644 --- a/packages/rs-sdk/examples/read_contract.rs +++ b/packages/rs-sdk/examples/read_contract.rs @@ -4,6 +4,7 @@ use clap::Parser; use dash_sdk::{mock::provider::GrpcContextProvider, platform::Fetch, Sdk, SdkBuilder}; use dpp::prelude::{DataContract, Identifier}; use rs_dapi_client::AddressList; +use zeroize::Zeroizing; #[derive(clap::Parser, Debug)] #[command(version)] @@ -22,7 +23,7 @@ pub struct Config { // Dash Core RPC password #[arg(short = 'p', long)] - pub core_password: String, + pub core_password: Zeroizing<String>, /// Dash Platform DAPI port #[arg(short = 'd', long)] @@ -86,7 +87,7 @@ fn setup_sdk(config: &Config) -> Sdk { .expect("parse uri"); // Now, we create the Sdk with the wallet and context provider. - let mut sdk = SdkBuilder::new(AddressList::from_iter([uri])) + let sdk = SdkBuilder::new(AddressList::from_iter([uri])) .build() .expect("cannot build sdk"); diff --git a/packages/rs-sdk/src/core/dash_core_client.rs b/packages/rs-sdk/src/core/dash_core_client.rs index 3b398a29cb8..d59af4207c5 100644 --- a/packages/rs-sdk/src/core/dash_core_client.rs +++ b/packages/rs-sdk/src/core/dash_core_client.rs @@ -14,6 +14,7 @@ use dpp::dashcore::ProTxHash; use dpp::prelude::CoreBlockHeight; use drive_proof_verifier::error::ContextProviderError; use std::{fmt::Debug, sync::Mutex}; +use zeroize::Zeroizing; /// Core RPC client that can be used to retrieve quorum keys from core. /// @@ -22,7 +23,7 @@ pub struct LowLevelDashCoreClient { core: Mutex<Client>, server_address: String, core_user: String, - core_password: String, + core_password: Zeroizing<String>, core_port: u16, } @@ -75,7 +76,7 @@ impl LowLevelDashCoreClient { core: Mutex::new(core), server_address: server_address.to_string(), core_user: core_user.to_string(), - core_password: core_password.to_string(), + core_password: core_password.to_string().into(), core_port, }) } diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 5a033f9e414..b4a36d4fc3c 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -41,6 +41,7 @@ use std::time::{SystemTime, UNIX_EPOCH}; #[cfg(feature = "mocks")] use tokio::sync::{Mutex, MutexGuard}; use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +use zeroize::Zeroizing; /// How many data contracts fit in the cache. pub const DEFAULT_CONTRACT_CACHE_SIZE: usize = 100; @@ -584,7 +585,7 @@ pub struct SdkBuilder { core_ip: String, core_port: u16, core_user: String, - core_password: String, + core_password: Zeroizing<String>, /// If true, request and verify proofs of the responses. proofs: bool, @@ -620,7 +621,7 @@ impl Default for SdkBuilder { network: Network::Dash, core_ip: "".to_string(), core_port: 0, - core_password: "".to_string(), + core_password: "".to_string().into(), core_user: "".to_string(), proofs: true, @@ -743,7 +744,7 @@ impl SdkBuilder { self.core_ip = ip.to_string(); self.core_port = port; self.core_user = user.to_string(); - self.core_password = password.to_string(); + self.core_password = Zeroizing::from(password.to_string()); self } diff --git a/packages/rs-sdk/tests/fetch/config.rs b/packages/rs-sdk/tests/fetch/config.rs index 445904795d6..c05b9cef2d5 100644 --- a/packages/rs-sdk/tests/fetch/config.rs +++ b/packages/rs-sdk/tests/fetch/config.rs @@ -11,6 +11,7 @@ use dpp::{ use rs_dapi_client::AddressList; use serde::Deserialize; use std::{path::PathBuf, str::FromStr}; +use zeroize::Zeroizing; /// Existing document ID /// @@ -43,7 +44,7 @@ pub struct Config { pub core_user: String, /// Password for Dash Core RPC interface #[serde(default)] - pub core_password: String, + pub core_password: Zeroizing<String>, /// When true, use SSL for the Dash Platform node grpc interface #[serde(default)] pub platform_ssl: bool, @@ -141,14 +142,14 @@ impl Config { /// ## Feature flags /// /// * `offline-testing` is not set - connect to Platform and generate - /// new test vectors during execution + /// new test vectors during execution /// * `offline-testing` is set - use mock implementation and - /// load existing test vectors from disk + /// load existing test vectors from disk /// /// ## Arguments /// /// * namespace - namespace to use when storing mock expectations; this is used to separate - /// expectations from different tests. + /// expectations from different tests. /// /// When empty string is provided, expectations are stored in the root of the dump directory. pub async fn setup_api(&self, namespace: &str) -> dash_sdk::Sdk { From 4c004a7165dae960c42ab2611afe4839982a4bc4 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Wed, 9 Oct 2024 16:45:00 +0200 Subject: [PATCH 11/18] chore: rabbit's feedback --- packages/rs-sdk/src/mock/sdk.rs | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index f89c806cf22..4f1c228ef5e 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -86,7 +86,7 @@ impl MockDashPlatformSdk { /// Load all expectations from files in a directory asynchronously. /// /// See [MockDashPlatformSdk::load_expectations_sync()] for more details. - #[deprecated(note = "use load_expectations_sync")] + #[deprecated(since = "1.4.0", note = "use load_expectations_sync")] pub async fn load_expectations<P: AsRef<std::path::Path> + Send + 'static>( &mut self, dir: P, @@ -317,13 +317,16 @@ impl MockDashPlatformSdk { objects: Option<R>, ) -> Result<&mut Self, Error> where - R: FromIterator<(K, Option<O>)> + MockResponse + Send + Default + MockResponse, - <<O as FetchMany<K, R>>::Request as TransportRequest>::Response: Default, - R: FromProof< + R: FromIterator<(K, Option<O>)> + + MockResponse + + FromProof< <O as FetchMany<K, R>>::Request, Request = <O as FetchMany<K, R>>::Request, Response = <<O as FetchMany<K, R>>::Request as TransportRequest>::Response, - > + Sync, + > + Sync + + Send + + Default, + <<O as FetchMany<K, R>>::Request as TransportRequest>::Response: Default, { let grpc_request = query.query(self.prove()).expect("query must be correct"); self.expect(grpc_request, objects).await?; From 612970fec614fccf92855ce71b8ef7d7a08b83e5 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 11:55:48 +0200 Subject: [PATCH 12/18] fix(sdk): deadlock when executing async futures within sync context --- packages/rs-sdk/src/internal/sync.rs | 148 +++++++++++++++++- .../rs-sdk/tests/fetch/contested_resource.rs | 51 +++--- 2 files changed, 167 insertions(+), 32 deletions(-) diff --git a/packages/rs-sdk/src/internal/sync.rs b/packages/rs-sdk/src/internal/sync.rs index f88e6367fa5..878c8bf044e 100644 --- a/packages/rs-sdk/src/internal/sync.rs +++ b/packages/rs-sdk/src/internal/sync.rs @@ -1,8 +1,10 @@ -//! futures-related utilities to handle async code from sync code. - -use std::future::Future; - +//! Handle async calls from sync code. +//! +//! This is a workaround for an issue in tokio, where you cannot call `block_on` from sync call that is called +//! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result +//! using a channel. use drive_proof_verifier::error::ContextProviderError; +use std::future::Future; #[derive(Debug, thiserror::Error)] pub(crate) enum AsyncError { @@ -23,10 +25,142 @@ impl From<AsyncError> for crate::Error { } } -/// Block on the provided future and return the result. -pub(crate) fn block_on<F: Future + Send + 'static>(fut: F) -> Result<F::Output, AsyncError> +/// Blocks on the provided future and returns the result. +/// +/// This function is used to call async functions from sync code. +/// Requires the current thread to be running in a tokio runtime. +/// +/// Due to limitations of tokio runtime, we cannot use `tokio::runtime::Runtime::block_on` if we are already inside a tokio runtime. +/// This function is a workaround for that limitation. +pub(crate) fn block_on<F>(fut: F) -> Result<F::Output, ContextProviderError> where + F: Future + Send + 'static, F::Output: Send, { - Ok(futures::executor::block_on(fut)) + tracing::trace!("block_on: running async function from sync code"); + let rt = tokio::runtime::Handle::try_current().map_err(|e| { + ContextProviderError::AsyncError(format!( + "sync-async error: cannot get current tokio runtime handle: {:?}", + e + )) + })?; + let (tx, rx) = std::sync::mpsc::channel(); + tracing::trace!("block_on: Spawning worker"); + let hdl = rt.spawn(worker(fut, tx)); + tracing::trace!("block_on: Worker spawned"); + let recv = tokio::task::block_in_place(|| rx.recv()); + + let resp = recv.map_err(|e| { + ContextProviderError::AsyncError(format!( + "sync-async error: cannot receive response from async function: {:?}", + e + )) + })?; + + tracing::trace!("Response received"); + if !hdl.is_finished() { + tracing::debug!("async-sync worker future is not finished, aborting; this should not happen, but it's fine"); + hdl.abort(); // cleanup the worker future + } + + Ok(resp) +} + +/// Worker function that runs the provided future and sends the result back to the caller using oneshot channel. +async fn worker<F: Future>( + fut: F, + // response: oneshot::Sender<F::Output>, + response: std::sync::mpsc::Sender<F::Output>, +) -> Result<(), drive_proof_verifier::error::ContextProviderError> { + tracing::trace!("Worker start"); + let result = fut.await; + tracing::trace!("Worker async function completed, sending response"); + response.send(result).map_err(|e| { + ContextProviderError::Generic(format!("sync-async error: response cannot be sent: {}", e)) + })?; + tracing::trace!("Worker response sent"); + + Ok(()) +} + +#[cfg(test)] +mod test { + use super::*; + use std::future::Future; + use tokio::{ + runtime::Builder, + sync::mpsc::{self, Receiver}, + }; + + /// Test for block_on with async code that calls sync code, which then calls async code again. + /// + /// Given: An async function that calls a sync function, which then calls another async function. + /// When: The async function is executed using block_on. + /// Then: Other threads can still do some work + #[test] + fn test_block_on_nested_async_sync() { + let rt = Builder::new_multi_thread() + .worker_threads(1) // we should be good with 1 worker thread + .max_blocking_threads(1) // we should be good with 1 blocking thread + .enable_all() + .build() + .expect("Failed to create Tokio runtime"); + // we repeat this test a few times, to make sure it's stable + for _repeat in 0..5 { + // Create a Tokio runtime; we use the current thread runtime for this test. + + const MSGS: usize = 10; + let (tx, rx) = mpsc::channel::<usize>(1); + + // Spawn new worker that will just count. + let worker = async move { + for count in 0..MSGS { + tx.send(count).await.unwrap(); + } + }; + let worker_join = rt.spawn(worker); + // Define the number of levels of execution + let levels = 4; + + // Define the innermost async function + async fn innermost_async_function( + mut rx: Receiver<usize>, + ) -> Result<String, ContextProviderError> { + for i in 0..MSGS { + let count = rx.recv().await.unwrap(); + assert_eq!(count, i); + } + + Ok(String::from("Success")) + } + + // Define the nested sync function + fn nested_sync_function<F>(fut: F) -> Result<String, ContextProviderError> + where + F: Future<Output = Result<String, ContextProviderError>> + Send + 'static, + F::Output: Send, + { + block_on(fut)?.map_err(|e| ContextProviderError::Generic(e.to_string())) + } + + // Define the outer async function + async fn outer_async_function( + levels: usize, + rx: Receiver<usize>, + ) -> Result<String, ContextProviderError> { + let mut result = innermost_async_function(rx).await; + for _ in 0..levels { + result = nested_sync_function(async { result }); + } + result + } + + // Run the outer async function using block_on + let result = rt.block_on(outer_async_function(levels, rx)); + + rt.block_on(worker_join).unwrap(); + // Assert the result + assert_eq!(result.unwrap(), "Success"); + } + } } diff --git a/packages/rs-sdk/tests/fetch/contested_resource.rs b/packages/rs-sdk/tests/fetch/contested_resource.rs index 075e45f2439..643396d495a 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource.rs @@ -19,7 +19,6 @@ use drive::query::{ vote_polls_by_document_type_query::VotePollsByDocumentTypeQuery, }; use drive_proof_verifier::types::ContestedResource; -use std::panic::catch_unwind; /// Test that we can fetch contested resources /// @@ -304,33 +303,35 @@ async fn contested_resources_fields( tracing::debug!(?expect, "Running test case"); // handle panics to not stop other test cases from running - let unwinded = catch_unwind(|| { - { - futures::executor::block_on(async { - let mut query = base_query(&cfg); - query_mut_fn(&mut query); - - let (test_case_id, sdk) = - setup_sdk_for_test_case(cfg, query.clone(), "contested_resources_fields").await; - tracing::debug!(test_case_id, ?query, "Executing query"); - - ContestedResource::fetch_many(&sdk, query).await - }) - } - }); - let result = match unwinded { + let join_handle = tokio::task::spawn(async move { + let mut query = base_query(&cfg); + query_mut_fn(&mut query); + + let (test_case_id, sdk) = + setup_sdk_for_test_case(cfg, query.clone(), "contested_resources_fields").await; + tracing::debug!(test_case_id, ?query, "Executing query"); + + ContestedResource::fetch_many(&sdk, query).await + }) + .await; + let result = match join_handle { Ok(r) => r, Err(e) => { - let msg = if let Some(s) = e.downcast_ref::<&str>() { - s.to_string() - } else if let Some(s) = e.downcast_ref::<String>() { - s.to_string() - } else { - format!("unknown panic type: {:?}", std::any::type_name_of_val(&e)) - }; + if e.is_panic() { + let e = e.into_panic(); + let msg = if let Some(s) = e.downcast_ref::<&str>() { + s.to_string() + } else if let Some(s) = e.downcast_ref::<String>() { + s.to_string() + } else { + format!("unknown panic type: {:?}", std::any::type_name_of_val(&e)) + }; - tracing::error!("PANIC: {}", msg); - Err(Error::Generic(msg)) + tracing::error!("PANIC: {}", msg); + Err(Error::Generic(msg)) + } else { + Err(Error::Generic(format!("JoinError: {:?}", e))) + } } }; From d9035bd27362d41066c1b6d76c9307e0fbd4b331 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 12:05:29 +0200 Subject: [PATCH 13/18] refactor(sdk): move sync to main and pub --- packages/rs-sdk/src/internal/mod.rs | 3 --- packages/rs-sdk/src/lib.rs | 2 +- packages/rs-sdk/src/mock/provider.rs | 2 +- packages/rs-sdk/src/mock/sdk.rs | 2 +- packages/rs-sdk/src/{internal => }/sync.rs | 4 ++-- 5 files changed, 5 insertions(+), 8 deletions(-) delete mode 100644 packages/rs-sdk/src/internal/mod.rs rename packages/rs-sdk/src/{internal => }/sync.rs (98%) diff --git a/packages/rs-sdk/src/internal/mod.rs b/packages/rs-sdk/src/internal/mod.rs deleted file mode 100644 index 81012bb2824..00000000000 --- a/packages/rs-sdk/src/internal/mod.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! Various internal utilities used by the SDK. - -pub(crate) mod sync; diff --git a/packages/rs-sdk/src/lib.rs b/packages/rs-sdk/src/lib.rs index 63f73272159..1f928ab6db9 100644 --- a/packages/rs-sdk/src/lib.rs +++ b/packages/rs-sdk/src/lib.rs @@ -63,7 +63,6 @@ pub mod core; pub mod error; -mod internal; mod internal_cache; pub mod mock; pub mod platform; @@ -77,6 +76,7 @@ pub use dpp; pub use drive; pub use drive_proof_verifier::types as query_types; pub use rs_dapi_client as dapi_client; +pub mod sync; /// Version of the SDK pub const VERSION: &str = env!("CARGO_PKG_VERSION"); diff --git a/packages/rs-sdk/src/mock/provider.rs b/packages/rs-sdk/src/mock/provider.rs index 0a097f3ea85..879c4137ebe 100644 --- a/packages/rs-sdk/src/mock/provider.rs +++ b/packages/rs-sdk/src/mock/provider.rs @@ -1,8 +1,8 @@ //! Example ContextProvider that uses the Core gRPC API to fetch data from Platform. use crate::core::LowLevelDashCoreClient; -use crate::internal::sync::block_on; use crate::platform::Fetch; +use crate::sync::block_on; use crate::{Error, Sdk}; use arc_swap::ArcSwapAny; use dpp::prelude::{CoreBlockHeight, DataContract, Identifier}; diff --git a/packages/rs-sdk/src/mock/sdk.rs b/packages/rs-sdk/src/mock/sdk.rs index 4f1c228ef5e..bc9c3927716 100644 --- a/packages/rs-sdk/src/mock/sdk.rs +++ b/packages/rs-sdk/src/mock/sdk.rs @@ -2,11 +2,11 @@ //! //! See [MockDashPlatformSdk] for more details. use crate::{ - internal::sync::block_on, platform::{ types::{evonode::EvoNode, identity::IdentityRequest}, DocumentQuery, Fetch, FetchMany, Query, }, + sync::block_on, Error, Sdk, }; use arc_swap::ArcSwapOption; diff --git a/packages/rs-sdk/src/internal/sync.rs b/packages/rs-sdk/src/sync.rs similarity index 98% rename from packages/rs-sdk/src/internal/sync.rs rename to packages/rs-sdk/src/sync.rs index 878c8bf044e..7ad84afeb4b 100644 --- a/packages/rs-sdk/src/internal/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -7,7 +7,7 @@ use drive_proof_verifier::error::ContextProviderError; use std::future::Future; #[derive(Debug, thiserror::Error)] -pub(crate) enum AsyncError { +pub enum AsyncError { #[error("asynchronous call from synchronous context failed: {0}")] #[allow(unused)] Generic(String), @@ -32,7 +32,7 @@ impl From<AsyncError> for crate::Error { /// /// Due to limitations of tokio runtime, we cannot use `tokio::runtime::Runtime::block_on` if we are already inside a tokio runtime. /// This function is a workaround for that limitation. -pub(crate) fn block_on<F>(fut: F) -> Result<F::Output, ContextProviderError> +pub fn block_on<F>(fut: F) -> Result<F::Output, ContextProviderError> where F: Future + Send + 'static, F::Output: Send, From 6994a5c495060ca93e2a4cc32478421e2f691c6f Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 12:06:02 +0200 Subject: [PATCH 14/18] chore(sdk): Async error handling --- packages/rs-sdk/src/sync.rs | 45 ++++++++++++++++++++----------------- 1 file changed, 25 insertions(+), 20 deletions(-) diff --git a/packages/rs-sdk/src/sync.rs b/packages/rs-sdk/src/sync.rs index 7ad84afeb4b..d3c066e8cb5 100644 --- a/packages/rs-sdk/src/sync.rs +++ b/packages/rs-sdk/src/sync.rs @@ -4,15 +4,34 @@ //! inside a tokio runtime. This module spawns async futures in active tokio runtime, and retrieves the result //! using a channel. use drive_proof_verifier::error::ContextProviderError; -use std::future::Future; +use std::{future::Future, sync::mpsc::SendError}; +use tokio::runtime::TryCurrentError; #[derive(Debug, thiserror::Error)] pub enum AsyncError { + /// Not running inside tokio runtime + #[error("not running inside tokio runtime: {0}")] + NotInTokioRuntime(#[from] TryCurrentError), + + /// Cannot receive response from async function + #[error("cannot receive response from async function: {0}")] + RecvError(#[from] std::sync::mpsc::RecvError), + + /// Cannot send response from async function + #[error("cannot send response from async function: {0}")] + SendError(String), + #[error("asynchronous call from synchronous context failed: {0}")] #[allow(unused)] Generic(String), } +impl<T> From<SendError<T>> for AsyncError { + fn from(error: SendError<T>) -> Self { + Self::SendError(error.to_string()) + } +} + impl From<AsyncError> for ContextProviderError { fn from(error: AsyncError) -> Self { ContextProviderError::AsyncError(error.to_string()) @@ -32,30 +51,18 @@ impl From<AsyncError> for crate::Error { /// /// Due to limitations of tokio runtime, we cannot use `tokio::runtime::Runtime::block_on` if we are already inside a tokio runtime. /// This function is a workaround for that limitation. -pub fn block_on<F>(fut: F) -> Result<F::Output, ContextProviderError> +pub fn block_on<F>(fut: F) -> Result<F::Output, AsyncError> where F: Future + Send + 'static, F::Output: Send, { tracing::trace!("block_on: running async function from sync code"); - let rt = tokio::runtime::Handle::try_current().map_err(|e| { - ContextProviderError::AsyncError(format!( - "sync-async error: cannot get current tokio runtime handle: {:?}", - e - )) - })?; + let rt = tokio::runtime::Handle::try_current()?; let (tx, rx) = std::sync::mpsc::channel(); tracing::trace!("block_on: Spawning worker"); let hdl = rt.spawn(worker(fut, tx)); tracing::trace!("block_on: Worker spawned"); - let recv = tokio::task::block_in_place(|| rx.recv()); - - let resp = recv.map_err(|e| { - ContextProviderError::AsyncError(format!( - "sync-async error: cannot receive response from async function: {:?}", - e - )) - })?; + let resp = tokio::task::block_in_place(|| rx.recv())?; tracing::trace!("Response received"); if !hdl.is_finished() { @@ -71,13 +78,11 @@ async fn worker<F: Future>( fut: F, // response: oneshot::Sender<F::Output>, response: std::sync::mpsc::Sender<F::Output>, -) -> Result<(), drive_proof_verifier::error::ContextProviderError> { +) -> Result<(), AsyncError> { tracing::trace!("Worker start"); let result = fut.await; tracing::trace!("Worker async function completed, sending response"); - response.send(result).map_err(|e| { - ContextProviderError::Generic(format!("sync-async error: response cannot be sent: {}", e)) - })?; + response.send(result)?; tracing::trace!("Worker response sent"); Ok(()) From 909921c004f81b1f026f2c77c6c3d68989095a5a Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 12:42:51 +0200 Subject: [PATCH 15/18] chore: apply feedback --- packages/rs-sdk/src/sdk.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index b4a36d4fc3c..665208a3dab 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -799,6 +799,8 @@ impl SdkBuilder { if sdk.context_provider.load().is_none() { #[cfg(feature = "mocks")] if !self.core_ip.is_empty() { + tracing::warn!( + "ContextProvider not set, falling back to a mock one; use SdkBuilder::with_context_provider() to set it up"); let mut context_provider = GrpcContextProvider::new(None, &self.core_ip, self.core_port, &self.core_user, &self.core_password, self.data_contract_cache_size, self.quorum_public_keys_cache_size)?; @@ -812,12 +814,16 @@ impl SdkBuilder { sdk.context_provider.swap(Some(Arc::new(Box::new(context_provider.clone())))); context_provider.set_sdk(Some(sdk.clone())); } else{ - tracing::warn!( - "Configure ContextProvider with Sdk::with_context_provider(); otherwise Sdk will fail"); + return Err(Error::Config(concat!( + "context provider is not set, configure it with SdkBuilder::with_context_provider() ", + "or configure Core access with SdkBuilder::with_core() to use mock context provider") + .to_string())); } #[cfg(not(feature = "mocks"))] - tracing::warn!( - "Configure ContextProvider with Sdk::with_context_provider(); otherwise Sdk will fail"); + return Err(Error::Config(concat!( + "context provider is not set, configure it with SdkBuilder::with_context_provider() ", + "or enable `mocks` feature to use mock context provider") + .to_string())); }; sdk From cc11cd89c0d36433a56a3232ecb3627a8f31c04b Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:14:53 +0200 Subject: [PATCH 16/18] refactor: minor context provider refactoring --- packages/rs-sdk/src/sdk.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index 665208a3dab..a6b56409255 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -257,9 +257,8 @@ impl Sdk { where O::Request: Mockable, { - let provider_guard = self.context_provider.load(); - let provider = provider_guard - .as_ref() + let provider = self + .context_provider() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; match self.inner { @@ -277,9 +276,12 @@ impl Sdk { } } } + + /// Return [ContextProvider] used by the SDK. pub fn context_provider(&self) -> Option<impl ContextProvider> { let provider_guard = self.context_provider.load(); let provider = provider_guard.as_ref().map(Arc::clone); + provider } From 00620f82f35b41b449f1c2c0c90ecae3a3fa0810 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 13:23:00 +0200 Subject: [PATCH 17/18] chore: apply linter issues --- packages/rs-sdk/Cargo.toml | 2 +- packages/rs-sdk/src/platform/block_info_from_metadata.rs | 4 ++-- .../rs-sdk/src/platform/transition/withdraw_from_identity.rs | 5 +---- packages/rs-sdk/tests/fetch/config.rs | 1 + packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs | 2 +- packages/rs-sdk/tests/fetch/mock_fetch.rs | 3 +-- 6 files changed, 7 insertions(+), 10 deletions(-) diff --git a/packages/rs-sdk/Cargo.toml b/packages/rs-sdk/Cargo.toml index 6caca193d2a..d15e3bb3653 100644 --- a/packages/rs-sdk/Cargo.toml +++ b/packages/rs-sdk/Cargo.toml @@ -18,7 +18,7 @@ drive-proof-verifier = { path = "../rs-drive-proof-verifier" } dapi-grpc-macros = { path = "../rs-dapi-grpc-macros" } http = { version = "1.1" } thiserror = "1.0.64" -tokio = { version = "1.40", features = ["macros"] } +tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] } tokio-util = { version = "0.7.12" } async-trait = { version = "0.1.83" } ciborium = { git = "https://github.com/qrayven/ciborium", branch = "feat-ser-null-as-undefined" } diff --git a/packages/rs-sdk/src/platform/block_info_from_metadata.rs b/packages/rs-sdk/src/platform/block_info_from_metadata.rs index 713e1e176d4..5e96e136550 100644 --- a/packages/rs-sdk/src/platform/block_info_from_metadata.rs +++ b/packages/rs-sdk/src/platform/block_info_from_metadata.rs @@ -12,8 +12,8 @@ use drive::error::proof::ProofError; /// /// # Parameters /// - `response_metadata`: A reference to `ResponseMetadata` obtained from a platform response. -/// This metadata includes various block-related information such as time in milliseconds, -/// height, core chain locked height, and epoch. +/// This metadata includes various block-related information such as time in milliseconds, +/// height, core chain locked height, and epoch. /// /// # Returns /// If successful, returns `Ok(BlockInfo)` where `BlockInfo` contains: diff --git a/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs b/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs index ce6d32e0716..1d72c86e07c 100644 --- a/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs +++ b/packages/rs-sdk/src/platform/transition/withdraw_from_identity.rs @@ -4,7 +4,6 @@ use dpp::identity::accessors::IdentityGettersV0; use dpp::identity::core_script::CoreScript; use dpp::identity::signer::Signer; use dpp::identity::{Identity, IdentityPublicKey}; -use dpp::prelude::UserFeeIncrease; use crate::platform::transition::broadcast::BroadcastStateTransition; use crate::platform::transition::put_settings::PutSettings; @@ -47,9 +46,7 @@ impl WithdrawFromIdentity for Identity { ) -> Result<u64, Error> { let new_identity_nonce = sdk.get_identity_nonce(self.id(), true, settings).await?; let script = address.map(|address| CoreScript::new(address.script_pubkey())); - let user_fee_increase = settings - .map(|settings| settings.user_fee_increase) - .flatten(); + let user_fee_increase = settings.and_then(|settings| settings.user_fee_increase); let state_transition = IdentityCreditWithdrawalTransition::try_from_identity( self, script, diff --git a/packages/rs-sdk/tests/fetch/config.rs b/packages/rs-sdk/tests/fetch/config.rs index c05b9cef2d5..1b6efbc615e 100644 --- a/packages/rs-sdk/tests/fetch/config.rs +++ b/packages/rs-sdk/tests/fetch/config.rs @@ -72,6 +72,7 @@ pub struct Config { /// ID of document of the type [`existing_document_type_name`](Config::existing_document_type_name) /// in [`existing_data_contract_id`](Config::existing_data_contract_id). #[serde(default = "Config::default_document_id")] + #[allow(unused)] pub existing_document_id: Identifier, // Hex-encoded ProTxHash of the existing HP masternode #[serde(default = "Config::default_protxhash")] diff --git a/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs b/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs index 69b66e0e486..fef6138902b 100644 --- a/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs +++ b/packages/rs-sdk/tests/fetch/contested_resource_vote_state.rs @@ -276,7 +276,7 @@ async fn contested_resource_vote_states_with_limit_PLAN_674() { type MutFn = fn(&mut ContestedDocumentVotePollDriveQuery); #[test_case(|q| q.limit = Some(0), Err("limit 0 out of bounds of [1, 100]"); "limit 0")] -#[test_case(|q| q.limit = Some(std::u16::MAX), Err("limit 65535 out of bounds of [1, 100]"); "limit std::u16::MAX")] +#[test_case(|q| q.limit = Some(u16::MAX), Err("limit 65535 out of bounds of [1, 100]"); "limit u16::MAX")] #[test_case(|q| q.start_at = Some(([0x11; 32], true)), Ok("Contenders { winner: None, contenders: {Identifier("); "start_at does not exist should return next contenders")] #[test_case(|q| q.start_at = Some(([0xff; 32], true)), Ok("Contenders { winner: None, contenders: {}, abstain_vote_tally: None, lock_vote_tally: None }"); "start_at 0xff;32 should return zero contenders")] #[test_case(|q| q.vote_poll.document_type_name = "nx doctype".to_string(), Err(r#"code: InvalidArgument, message: "document type nx doctype not found"#); "non existing document type returns InvalidArgument")] diff --git a/packages/rs-sdk/tests/fetch/mock_fetch.rs b/packages/rs-sdk/tests/fetch/mock_fetch.rs index a5d49160772..1b96614ecd4 100644 --- a/packages/rs-sdk/tests/fetch/mock_fetch.rs +++ b/packages/rs-sdk/tests/fetch/mock_fetch.rs @@ -1,5 +1,4 @@ -//! -//! +//! Tests of mocked Fetch trait implementations. use super::common::{mock_data_contract, mock_document_type}; use dash_sdk::{ From a91528827637216d376020a7461e51853458da43 Mon Sep 17 00:00:00 2001 From: Lukasz Klimek <842586+lklimek@users.noreply.github.com> Date: Thu, 10 Oct 2024 14:07:30 +0200 Subject: [PATCH 18/18] refactor(sdk): simplify context provider load --- packages/rs-sdk/src/sdk.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/rs-sdk/src/sdk.rs b/packages/rs-sdk/src/sdk.rs index a6b56409255..f7f938703d7 100644 --- a/packages/rs-sdk/src/sdk.rs +++ b/packages/rs-sdk/src/sdk.rs @@ -217,9 +217,8 @@ impl Sdk { where O::Request: Mockable, { - let provider_guard = self.context_provider.load(); - let provider = provider_guard - .as_ref() + let provider = self + .context_provider() .ok_or(drive_proof_verifier::Error::ContextProviderNotSet)?; match self.inner {