diff --git a/Cargo.lock b/Cargo.lock index 4af5a74799..92ff215c4b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -309,11 +309,12 @@ dependencies = [ [[package]] name = "backon" -version = "0.5.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "33e5b65cc81d81fbb8488f36458ab4771be35a722967bbc959df28b47397e3ff" +checksum = "e4fa97bb310c33c811334143cf64c5bb2b7b3c06e453db6b095d7061eff8f113" dependencies = [ "fastrand", + "tokio", ] [[package]] @@ -3955,9 +3956,9 @@ dependencies = [ [[package]] name = "rustls-native-certs" -version = "0.7.2" +version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04182dffc9091a404e0fc069ea5cd60e5b866c3adf881eff99a32d048242dffa" +checksum = "fcaf18a4f2be7326cd874a5fa579fae794320a0f388d365dca7e480e55f83f8a" dependencies = [ "openssl-probe", "rustls-pemfile", @@ -4791,9 +4792,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.15" +version = "0.1.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" dependencies = [ "futures-core", "pin-project-lite", @@ -4884,9 +4885,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38659f4a91aba8598d27821589f5db7dddd94601e7a01b1e485a50e5484c7401" +checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" dependencies = [ "async-stream", "async-trait", diff --git a/packages/rs-dapi-client/Cargo.toml b/packages/rs-dapi-client/Cargo.toml index e5dc961574..fb06b6abdc 100644 --- a/packages/rs-dapi-client/Cargo.toml +++ b/packages/rs-dapi-client/Cargo.toml @@ -19,8 +19,14 @@ dump = ["mocks"] offline-testing = [] [dependencies] -backon = "0.5" -dapi-grpc = { path = "../dapi-grpc" } +backon = { version = "1.2", default-features = false, features = [ + "tokio-sleep", +] } +dapi-grpc = { path = "../dapi-grpc", features = [ + "core", + "platform", + "client", +], default-features = false } futures = "0.3.28" http-serde = { version = "2.1", optional = true } rand = { version = "0.8.5", features = ["small_rng"] } diff --git a/packages/rs-dapi-client/src/connection_pool.rs b/packages/rs-dapi-client/src/connection_pool.rs index 16d8cf030d..97dd991d50 100644 --- a/packages/rs-dapi-client/src/connection_pool.rs +++ b/packages/rs-dapi-client/src/connection_pool.rs @@ -67,19 +67,21 @@ impl ConnectionPool { /// * `prefix` - Prefix for the item in the pool. Used to distinguish between Core and Platform clients. /// * `uri` - URI of the node. /// * `settings` - Applied request settings. - pub fn get_or_create( + pub fn get_or_create<E>( &self, prefix: PoolPrefix, uri: &Uri, settings: Option<&AppliedRequestSettings>, - create: impl FnOnce() -> PoolItem, - ) -> PoolItem { + create: impl FnOnce() -> Result<PoolItem, E>, + ) -> Result<PoolItem, E> { if let Some(cli) = self.get(prefix, uri, settings) { - return cli; + return Ok(cli); } let cli = create(); - self.put(uri, settings, cli.clone()); + if let Ok(cli) = &cli { + self.put(uri, settings, cli.clone()); + } cli } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 17748ab2b0..372b28bc3f 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -199,7 +199,13 @@ impl DapiRequestExecutor for DapiClient { address.uri().clone(), &applied_settings, &pool, - ); + ) + .map_err(|e| { + DapiClientError::<<R::Client as TransportClient>::Error>::Transport( + e, + address.clone(), + ) + })?; let response = transport_request .execute_transport(&mut transport_client, &applied_settings) @@ -250,7 +256,7 @@ impl DapiRequestExecutor for DapiClient { // Start the routine with retry policy applied: // We allow let_and_return because `result` is used later if dump feature is enabled let result = routine - .retry(&retry_settings) + .retry(retry_settings) .notify(|error, duration| { tracing::warn!( ?error, diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index a5459834e1..600189fc2f 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -51,12 +51,12 @@ pub trait TransportClient: Send + Sized { type Error: CanRetry + Send + Debug + Mockable; /// Build client using node's url. - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self; + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error>; /// Build client using node's url and [AppliedRequestSettings]. fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self; + ) -> Result<Self, Self::Error>; } diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index 98976ed08e..d5180099d0 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -8,7 +8,7 @@ use crate::{request_settings::AppliedRequestSettings, RequestSettings}; use dapi_grpc::core::v0::core_client::CoreClient; use dapi_grpc::core::v0::{self as core_proto}; use dapi_grpc::platform::v0::{self as platform_proto, platform_client::PlatformClient}; -use dapi_grpc::tonic::transport::Uri; +use dapi_grpc::tonic::transport::{ClientTlsConfig, Uri}; use dapi_grpc::tonic::Streaming; use dapi_grpc::tonic::{transport::Channel, IntoRequest}; use futures::{future::BoxFuture, FutureExt, TryFutureExt}; @@ -18,8 +18,16 @@ pub type PlatformGrpcClient = PlatformClient<Channel>; /// Core Client using gRPC transport. pub type CoreGrpcClient = CoreClient<Channel>; -fn create_channel(uri: Uri, settings: Option<&AppliedRequestSettings>) -> Channel { - let mut builder = Channel::builder(uri); +fn create_channel( + uri: Uri, + settings: Option<&AppliedRequestSettings>, +) -> Result<Channel, dapi_grpc::tonic::transport::Error> { + let mut builder = Channel::builder(uri).tls_config( + ClientTlsConfig::new() + .with_native_roots() + .with_webpki_roots() + .assume_http2(true), + )?; if let Some(settings) = settings { if let Some(timeout) = settings.connect_timeout { @@ -27,50 +35,84 @@ fn create_channel(uri: Uri, settings: Option<&AppliedRequestSettings>) -> Channe } } - builder.connect_lazy() + Ok(builder.connect_lazy()) } impl TransportClient for PlatformGrpcClient { type Error = dapi_grpc::tonic::Status; - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self { - pool.get_or_create(PoolPrefix::Platform, &uri, None, || { - Self::new(create_channel(uri.clone(), None)).into() - }) - .into() + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create(PoolPrefix::Platform, &uri, None, || { + match create_channel(uri.clone(), None) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + } + })? + .into()) } fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self { - pool.get_or_create(PoolPrefix::Platform, &uri, Some(settings), || { - Self::new(create_channel(uri.clone(), Some(settings))).into() - }) - .into() + ) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create( + PoolPrefix::Platform, + &uri, + Some(settings), + || match create_channel(uri.clone(), Some(settings)) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + }, + )? + .into()) } } impl TransportClient for CoreGrpcClient { type Error = dapi_grpc::tonic::Status; - fn with_uri(uri: Uri, pool: &ConnectionPool) -> Self { - pool.get_or_create(PoolPrefix::Core, &uri, None, || { - Self::new(create_channel(uri.clone(), None)).into() - }) - .into() + fn with_uri(uri: Uri, pool: &ConnectionPool) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create(PoolPrefix::Core, &uri, None, || { + match create_channel(uri.clone(), None) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + } + })? + .into()) } fn with_uri_and_settings( uri: Uri, settings: &AppliedRequestSettings, pool: &ConnectionPool, - ) -> Self { - pool.get_or_create(PoolPrefix::Core, &uri, Some(settings), || { - Self::new(create_channel(uri.clone(), Some(settings))).into() - }) - .into() + ) -> Result<Self, Self::Error> { + Ok(pool + .get_or_create( + PoolPrefix::Core, + &uri, + Some(settings), + || match create_channel(uri.clone(), Some(settings)) { + Ok(channel) => Ok(Self::new(channel).into()), + Err(e) => Err(dapi_grpc::tonic::Status::failed_precondition(format!( + "Channel creation failed: {}", + e + ))), + }, + )? + .into()) } } diff --git a/packages/rs-sdk/src/platform/types/evonode.rs b/packages/rs-sdk/src/platform/types/evonode.rs index 01c0630b49..77e893dfd5 100644 --- a/packages/rs-sdk/src/platform/types/evonode.rs +++ b/packages/rs-sdk/src/platform/types/evonode.rs @@ -10,7 +10,7 @@ use futures::{FutureExt, TryFutureExt}; use rs_dapi_client::transport::{ AppliedRequestSettings, PlatformGrpcClient, TransportClient, TransportRequest, }; -use rs_dapi_client::{Address, ConnectionPool, RequestSettings}; +use rs_dapi_client::{Address, ConnectionPool, DapiClientError, RequestSettings}; #[cfg(feature = "mocks")] use serde::{Deserialize, Serialize}; use std::fmt::Debug; @@ -74,7 +74,16 @@ impl TransportRequest for EvoNode { // We also create a new client to use with this request, so that the user does not need to // reconfigure SDK to use a single node. let pool = ConnectionPool::new(1); - let mut client = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool); + // We create a new client with the given URI and settings + let client_result = Self::Client::with_uri_and_settings(uri.clone(), settings, &pool); + + // Handle the result manually to create a proper error response + let mut client = match client_result { + Ok(client) => client, + Err(e) => { + return async { Err(e) }.boxed(); + } + }; let mut grpc_request = GetStatusRequest { version: Some(get_status_request::Version::V0(GetStatusRequestV0 {})), }