Skip to content

Commit

Permalink
refactor(sdk): improve context provider async processing
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Oct 9, 2024
1 parent 9921618 commit f5aa584
Show file tree
Hide file tree
Showing 13 changed files with 185 additions and 132 deletions.
20 changes: 0 additions & 20 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 3 additions & 2 deletions packages/rs-dapi-client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ edition = "2021"

[features]
default = ["mocks", "offline-testing"]
tokio-sleep = ["backon/tokio-sleep"]
mocks = [
"dep:sha2",
"dep:hex",
Expand All @@ -20,7 +19,9 @@ dump = ["mocks"]
offline-testing = []

[dependencies]
backon = { version = "1.2"}
backon = { version = "1.2", default-features = false, features = [
"tokio-sleep",
] }
dapi-grpc = { path = "../dapi-grpc", features = [
"core",
"platform",
Expand Down
4 changes: 4 additions & 0 deletions packages/rs-drive-proof-verifier/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,10 @@ pub enum ContextProviderError {
/// Core Fork Error
#[error("activation fork error: {0}")]
ActivationForkError(String),

/// Async error, eg. when tokio runtime fails
#[error("async error: {0}")]
AsyncError(String),
}

impl From<drive::error::Error> for Error {
Expand Down
2 changes: 0 additions & 2 deletions packages/rs-sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ derive_more = { version = "1.0", features = ["from"] }
dashcore-rpc = { git = "https://github.com/dashpay/rust-dashcore-rpc", tag = "v0.15.4" }
lru = { version = "0.12.3", optional = true }
bip37-bloom-filter = { git = "https://github.com/dashpay/rs-bip37-bloom-filter", branch = "develop" }
pollster = { version = "0.3.0" }

[dev-dependencies]
tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] }
Expand All @@ -57,7 +56,6 @@ test-case = { version = "3.3.1" }

[features]
default = ["mocks", "offline-testing"]
tokio-sleep = ["rs-dapi-client/tokio-sleep"]

mocks = [
"dep:serde",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,29 @@ use std::{fmt::Debug, sync::Mutex};

/// Core RPC client that can be used to retrieve quorum keys from core.
///
/// Implements [`ContextProvider`] trait.
///
/// TODO: This is a temporary implementation, effective until we integrate SPV.
pub struct CoreClient {
pub struct LowLevelDashCoreClient {
core: Mutex<Client>,
server_address: String,
core_user: String,
core_password: String,
core_port: u16,
}

impl Debug for CoreClient {
impl Clone for LowLevelDashCoreClient {
// As Client does not implement Clone, we just create a new instance of CoreClient here.
fn clone(&self) -> Self {
LowLevelDashCoreClient::new(
&self.server_address,
self.core_port,
&self.core_user,
&self.core_password,
)
.expect("Failed to clone CoreClient when cloning, this should not happen")
}
}

impl Debug for LowLevelDashCoreClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CoreClient")
.field("server_address", &self.server_address)
Expand All @@ -37,7 +49,7 @@ impl Debug for CoreClient {
}
}

impl CoreClient {
impl LowLevelDashCoreClient {
/// Create new Dash Core client.
///
/// # Arguments
Expand All @@ -63,13 +75,14 @@ impl CoreClient {
core: Mutex::new(core),
server_address: server_address.to_string(),
core_user: core_user.to_string(),
core_password: core_password.to_string(),
core_port,
})
}
}

// Wallet functions
impl CoreClient {
impl LowLevelDashCoreClient {
/// List unspent transactions
///
/// ## Arguments
Expand Down
4 changes: 4 additions & 0 deletions packages/rs-sdk/src/core/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
//! Dash Core SDK implementation.
//!
//! TODO: This is work in progress.
#[cfg(feature = "mocks")]
mod dash_core_client;
mod transaction;
#[cfg(feature = "mocks")]
pub use dash_core_client::LowLevelDashCoreClient;
3 changes: 3 additions & 0 deletions packages/rs-sdk/src/internal/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
//! Various internal utilities used by the SDK.
pub(crate) mod sync;
32 changes: 32 additions & 0 deletions packages/rs-sdk/src/internal/sync.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
//! futures-related utilities to handle async code from sync code.
use std::future::Future;

use drive_proof_verifier::error::ContextProviderError;

#[derive(Debug, thiserror::Error)]
pub(crate) enum AsyncError {
#[error("asynchronous call from synchronous context failed: {0}")]
#[allow(unused)]
Generic(String),
}

impl From<AsyncError> for ContextProviderError {
fn from(error: AsyncError) -> Self {
ContextProviderError::AsyncError(error.to_string())
}
}

impl From<AsyncError> for crate::Error {
fn from(error: AsyncError) -> Self {
Self::ContextProviderError(error.into())
}
}

/// Block on the provided future and return the result.
pub(crate) fn block_on<F: Future + Send + 'static>(fut: F) -> Result<F::Output, AsyncError>
where
F::Output: Send,
{
Ok(futures::executor::block_on(fut))
}
3 changes: 1 addition & 2 deletions packages/rs-sdk/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,8 @@
#![allow(rustdoc::private_intra_doc_links)]

pub mod core;
#[cfg(feature = "mocks")]
mod core_client;
pub mod error;
mod internal;
mod internal_cache;
pub mod mock;
pub mod platform;
Expand Down
15 changes: 8 additions & 7 deletions packages/rs-sdk/src/mock/provider.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Example ContextProvider that uses the Core gRPC API to fetch data from Platform.
use crate::core_client::CoreClient;
use crate::core::LowLevelDashCoreClient;
use crate::internal::sync::block_on;
use crate::platform::Fetch;
use crate::{Error, Sdk};
use arc_swap::ArcSwapAny;
use dpp::prelude::{CoreBlockHeight, DataContract, Identifier};
use drive_proof_verifier::error::ContextProviderError;
use drive_proof_verifier::ContextProvider;
use pollster::FutureExt;
use std::hash::Hash;
use std::num::NonZeroUsize;
use std::sync::Arc;
Expand All @@ -17,7 +17,7 @@ use std::sync::Arc;
/// Example [ContextProvider] used by the Sdk for testing purposes.
pub struct GrpcContextProvider {
/// Core client
core: CoreClient,
core: LowLevelDashCoreClient,
/// [Sdk] to use when fetching data from Platform
///
/// Note that if the `sdk` is `None`, the context provider will not be able to fetch data itself and will rely on
Expand Down Expand Up @@ -62,7 +62,8 @@ impl GrpcContextProvider {
data_contracts_cache_size: NonZeroUsize,
quorum_public_keys_cache_size: NonZeroUsize,
) -> Result<Self, Error> {
let core_client = CoreClient::new(core_ip, core_port, core_user, core_password)?;
let core_client =
LowLevelDashCoreClient::new(core_ip, core_port, core_user, core_password)?;
Ok(Self {
core: core_client,
sdk: ArcSwapAny::new(Arc::new(sdk)),
Expand Down Expand Up @@ -197,9 +198,9 @@ impl ContextProvider for GrpcContextProvider {

let sdk_cloned = sdk.clone();

let data_contract: Option<DataContract> = DataContract::fetch(&sdk_cloned, contract_id)
.block_on()
.map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?;
let data_contract: Option<DataContract> =
block_on(async move { DataContract::fetch(&sdk_cloned, contract_id).await })?
.map_err(|e| ContextProviderError::DataContractFailure(e.to_string()))?;

if let Some(ref dc) = data_contract {
self.data_contracts_cache.put(*data_contract_id, dc.clone());
Expand Down
Loading

0 comments on commit f5aa584

Please sign in to comment.