From f3dcdd0efa87a1f039dc6f14be167de8a9a3d867 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 6 Aug 2024 23:01:27 +0200 Subject: [PATCH 01/11] feat: add native subxt rpc reconn client --- Cargo.lock | 19 +- Cargo.toml | 1 + subxt/Cargo.toml | 16 +- .../backend/rpc/reconnecting_rpc_client.rs | 270 -------- .../rpc/reconnecting_rpc_client/mod.rs | 630 ++++++++++++++++++ .../rpc/reconnecting_rpc_client/platform.rs | 77 +++ .../rpc/reconnecting_rpc_client/tests.rs | 285 ++++++++ .../rpc/reconnecting_rpc_client/utils.rs | 103 +++ subxt/src/macros.rs | 2 +- 9 files changed, 1108 insertions(+), 295 deletions(-) delete mode 100644 subxt/src/backend/rpc/reconnecting_rpc_client.rs create mode 100644 subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs create mode 100644 subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs create mode 100644 subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs create mode 100644 subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 79d256b1fa..645b4c0ba5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3477,23 +3477,6 @@ dependencies = [ "crossbeam-utils", ] -[[package]] -name = "reconnecting-jsonrpsee-ws-client" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06fa4f17e09edfc3131636082faaec633c7baa269396b4004040bc6c52f49f65" -dependencies = [ - "cfg_aliases", - "finito", - "futures", - "jsonrpsee", - "serde_json", - "thiserror", - "tokio", - "tracing", - "wasm-bindgen-futures", -] - [[package]] name = "redox_syscall" version = "0.5.1" @@ -4847,6 +4830,7 @@ dependencies = [ "bitvec", "derive-where", "either", + "finito", "frame-metadata 16.0.0", "futures", "getrandom", @@ -4856,7 +4840,6 @@ dependencies = [ "jsonrpsee", "parity-scale-codec", "primitive-types", - "reconnecting-jsonrpsee-ws-client", "scale-bits", "scale-decode", "scale-encode", diff --git a/Cargo.toml b/Cargo.toml index aae32ab408..b8ecaf2033 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -77,6 +77,7 @@ console_error_panic_hook = "0.1.7" darling = "0.20.10" derive-where = "1.2.7" either = { version = "1.13.0", default-features = false } +finito = { version = "0.1.0", default-features = false } frame-metadata = { version = "16.0.0", default-features = false } futures = { version = "0.3.30", default-features = false, features = ["std"] } getrandom = { version = "0.2", default-features = false } diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 24f84b23f6..c161c2fa9d 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -27,9 +27,10 @@ default = ["jsonrpsee", "native"] native = [ "jsonrpsee?/async-client", "jsonrpsee?/client-ws-transport-tls", + "jsonrpsee?/ws-client", "subxt-lightclient?/native", "tokio-util", - "reconnecting-jsonrpsee-ws-client?/native", + "tokio?/sync", ] # Enable this for web/wasm builds. @@ -37,15 +38,17 @@ native = [ web = [ "jsonrpsee?/async-wasm-client", "jsonrpsee?/client-web-transport", + "jsonrpsee?/wasm-client", "getrandom/js", "subxt-lightclient?/web", "subxt-macro/web", "instant/wasm-bindgen", - "reconnecting-jsonrpsee-ws-client?/web", + "tokio?/sync", + "finito?/wasm-bindgen", ] # Enable this to use the reconnecting rpc client -unstable-reconnecting-rpc-client = ["dep:reconnecting-jsonrpsee-ws-client"] +unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio"] # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = [ @@ -100,9 +103,6 @@ subxt-core = { workspace = true, features = ["std"] } subxt-metadata = { workspace = true, features = ["std"] } subxt-lightclient = { workspace = true, optional = true, default-features = false } -# Reconnecting jsonrpc ws client -reconnecting-jsonrpsee-ws-client = { version = "0.4.3", optional = true, default-features = false } - # For parsing urls to disallow insecure schemes url = { workspace = true } @@ -112,6 +112,10 @@ getrandom = { workspace = true, optional = true } # Included if "native" feature is enabled tokio-util = { workspace = true, features = ["compat"], optional = true } +# Included if the reconnecting rpc client feature is enabled +tokio = { workspace = true, optional = true } +finito = { workspace = true, optional = true } + [dev-dependencies] bitvec = { workspace = true } codec = { workspace = true, features = ["derive", "bit-vec"] } diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client.rs b/subxt/src/backend/rpc/reconnecting_rpc_client.rs deleted file mode 100644 index bfefa461c5..0000000000 --- a/subxt/src/backend/rpc/reconnecting_rpc_client.rs +++ /dev/null @@ -1,270 +0,0 @@ -// Copyright 2019-2024 Parity Technologies (UK) Ltd. -// This file is dual-licensed as Apache-2.0 or GPL-3.0. -// see LICENSE for license details. - -use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; -use crate::error::RpcError; -use futures::{Future, FutureExt, StreamExt, TryStreamExt}; -use reconnecting_jsonrpsee_ws_client::{CallRetryPolicy, Client as InnerClient, SubscriptionId}; -use serde_json::value::RawValue; -use std::time::Duration; - -pub use reconnecting_jsonrpsee_ws_client::{ - ExponentialBackoff, FibonacciBackoff, FixedInterval, IdKind, -}; - -#[cfg(feature = "native")] -use reconnecting_jsonrpsee_ws_client::{HeaderMap, PingConfig}; - -/// Builder for [`Client`]. -#[derive(Debug, Clone)] -pub struct Builder

{ - max_request_size: u32, - max_response_size: u32, - retry_policy: P, - max_redirections: u32, - id_kind: IdKind, - max_log_len: u32, - max_concurrent_requests: u32, - request_timeout: Duration, - connection_timeout: Duration, - #[cfg(feature = "native")] - ping_config: Option, - #[cfg(feature = "native")] - headers: HeaderMap, -} - -impl Default for Builder { - fn default() -> Self { - Self { - max_request_size: 10 * 1024 * 1024, - max_response_size: 10 * 1024 * 1024, - retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)), - max_redirections: 5, - id_kind: IdKind::Number, - max_log_len: 1024, - max_concurrent_requests: 1024, - request_timeout: Duration::from_secs(60), - connection_timeout: Duration::from_secs(10), - #[cfg(feature = "native")] - ping_config: Some(PingConfig::new()), - #[cfg(feature = "native")] - headers: HeaderMap::new(), - } - } -} - -impl Builder { - /// Create a new builder. - pub fn new() -> Self { - Self::default() - } -} - -impl

Builder

-where - P: Iterator + Send + Sync + 'static + Clone, -{ - /// Configure the min response size a for websocket message. - /// - /// Default: 10MB - pub fn max_request_size(mut self, max: u32) -> Self { - self.max_request_size = max; - self - } - - /// Configure the max response size a for websocket message. - /// - /// Default: 10MB - pub fn max_response_size(mut self, max: u32) -> Self { - self.max_response_size = max; - self - } - - /// Set the max number of redirections to perform until a connection is regarded as failed. - /// - /// Default: 5 - pub fn max_redirections(mut self, redirect: u32) -> Self { - self.max_redirections = redirect; - self - } - - /// Configure how many concurrent method calls are allowed. - /// - /// Default: 1024 - pub fn max_concurrent_requests(mut self, max: u32) -> Self { - self.max_concurrent_requests = max; - self - } - - /// Configure how long until a method call is regarded as failed. - /// - /// Default: 1 minute - pub fn request_timeout(mut self, timeout: Duration) -> Self { - self.request_timeout = timeout; - self - } - - /// Set connection timeout for the WebSocket handshake - /// - /// Default: 10 seconds - pub fn connection_timeout(mut self, timeout: Duration) -> Self { - self.connection_timeout = timeout; - self - } - - /// Configure the data type of the request object ID - /// - /// Default: number - pub fn id_format(mut self, kind: IdKind) -> Self { - self.id_kind = kind; - self - } - - /// Set maximum length for logging calls and responses. - /// Logs bigger than this limit will be truncated. - /// - /// Default: 1024 - pub fn set_max_logging_length(mut self, max: u32) -> Self { - self.max_log_len = max; - self - } - - /// Configure which retry policy to use. - /// - /// Default: Exponential backoff 10ms - pub fn retry_policy + Send + Sync + 'static + Clone>( - self, - retry_policy: T, - ) -> Builder { - Builder { - max_request_size: self.max_request_size, - max_response_size: self.max_response_size, - retry_policy, - max_redirections: self.max_redirections, - max_log_len: self.max_log_len, - id_kind: self.id_kind, - max_concurrent_requests: self.max_concurrent_requests, - request_timeout: self.request_timeout, - connection_timeout: self.connection_timeout, - #[cfg(feature = "native")] - ping_config: self.ping_config, - #[cfg(feature = "native")] - headers: self.headers, - } - } - - #[cfg(feature = "native")] - #[cfg_attr(docsrs, doc(cfg(feature = "native")))] - /// Configure the WebSocket ping/pong interval. - /// - /// Default: 30 seconds. - pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self { - self.ping_config = Some(ping_config); - self - } - - #[cfg(feature = "native")] - #[cfg_attr(docsrs, doc(cfg(feature = "native")))] - /// Disable WebSocket ping/pongs. - /// - /// Default: 30 seconds. - pub fn disable_ws_ping(mut self) -> Self { - self.ping_config = None; - self - } - - #[cfg(feature = "native")] - #[cfg_attr(docsrs, doc(cfg(native)))] - /// Configure custom headers to use in the WebSocket handshake. - pub fn set_headers(mut self, headers: HeaderMap) -> Self { - self.headers = headers; - self - } - - /// Build and connect to the target. - pub async fn build(self, url: String) -> Result { - let client = InnerClient::builder() - .retry_policy(self.retry_policy) - .build(url) - .await - .map_err(|e| RpcError::ClientError(Box::new(e)))?; - - Ok(Client(client)) - } -} - -/// Reconnecting rpc client. -#[derive(Debug, Clone)] -pub struct Client(InnerClient); - -impl Client { - /// Create a builder. - pub fn builder() -> Builder { - Builder::new() - } - - /// A future that resolves when the client has initiated a reconnection. - /// This method returns another future that resolves when the client has reconnected. - /// - /// This may be called multiple times. - pub async fn reconnect_initiated(&self) -> impl Future + '_ { - self.0.reconnect_started().await; - self.0.reconnected() - } - - /// Get how many times the client has reconnected successfully. - pub fn reconnect_count(&self) -> usize { - self.0.reconnect_count() - } -} - -impl RpcClientT for Client { - fn request_raw<'a>( - &'a self, - method: &'a str, - params: Option>, - ) -> RawRpcFuture<'a, Box> { - async { - self.0 - .request_raw_with_policy(method.to_string(), params, CallRetryPolicy::Drop) - .await - .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) - } - .boxed() - } - - fn subscribe_raw<'a>( - &'a self, - sub: &'a str, - params: Option>, - unsub: &'a str, - ) -> RawRpcFuture<'a, RawRpcSubscription> { - async { - let sub = self - .0 - .subscribe_raw_with_policy( - sub.to_string(), - params, - unsub.to_string(), - CallRetryPolicy::Drop, - ) - .await - .map_err(|e| RpcError::ClientError(Box::new(e)))?; - - let id = match sub.id() { - SubscriptionId::Num(n) => n.to_string(), - SubscriptionId::Str(s) => s.to_string(), - }; - let stream = sub - .map_err(|e| RpcError::DisconnectedWillReconnect(e.to_string())) - .boxed(); - - Ok(RawRpcSubscription { - stream, - id: Some(id), - }) - } - .boxed() - } -} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs new file mode 100644 index 0000000000..819877218c --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs @@ -0,0 +1,630 @@ +// Copyright 2019-2024 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! # reconnecting-jsonrpsee-ws-client +//! + +mod platform; +#[cfg(test)] +mod tests; +mod utils; + +use std::{ + pin::Pin, + sync::Arc, + task::{self, Poll}, + time::Duration, +}; + +use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; +use crate::error::RpcError as SubxtRpcError; + +use finito::Retry; +use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt}; +use jsonrpsee::core::{ + client::{ + Client as WsClient, ClientT, Subscription as RpcSubscription, SubscriptionClientT, + SubscriptionKind, + }, + traits::ToRpcParams, +}; +use serde_json::value::RawValue; +use tokio::sync::{ + mpsc::{self, UnboundedReceiver, UnboundedSender}, + oneshot, Notify, +}; +use utils::display_close_reason; +use utils::{reconnect_channel, ReconnectRx, ReconnectTx}; + +// re-exports +pub use finito::{ExponentialBackoff, FibonacciBackoff, FixedInterval}; +pub use jsonrpsee::core::client::IdKind; +pub use jsonrpsee::{core::client::error::Error as RpcError, rpc_params, types::SubscriptionId}; + +#[cfg(feature = "native")] +pub use jsonrpsee::ws_client::{HeaderMap, PingConfig}; + +const LOG_TARGET: &str = "subxt-reconnecting-rpc-client"; + +/// Method result. +pub type MethodResult = Result, Error>; +/// Subscription result. +pub type SubscriptionResult = Result, DisconnectedWillReconnect>; + +/// The connection was closed, reconnect initiated and the subscription was dropped. +#[derive(Debug, thiserror::Error)] +#[error("The connection was closed because of `{0:?}` and reconnect initiated")] +pub struct DisconnectedWillReconnect(String); + +/// Serialized JSON-RPC params. +#[derive(Debug, Clone)] +pub struct RpcParams(Option>); + +impl RpcParams { + /// Create new [`RpcParams`] from JSON. + pub fn new(json: Option>) -> Self { + Self(json) + } +} + +impl ToRpcParams for RpcParams { + fn to_rpc_params(self) -> Result>, serde_json::Error> { + Ok(self.0) + } +} + +#[derive(Debug)] +enum Op { + Call { + method: String, + params: RpcParams, + send_back: oneshot::Sender, + }, + Subscription { + subscribe_method: String, + params: RpcParams, + unsubscribe_method: String, + send_back: oneshot::Sender>, + }, +} + +/// Error that can occur when for a RPC call or subscription. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// The client was dropped by the user. + #[error("The client was dropped")] + Dropped, + /// The connection was closed and reconnect initiated. + #[error(transparent)] + DisconnectedWillReconnect(#[from] DisconnectedWillReconnect), + /// Other rpc error. + #[error("{0}")] + RpcError(RpcError), +} + +/// Represent a single subscription. +pub struct Subscription { + id: SubscriptionId<'static>, + stream: mpsc::UnboundedReceiver, +} + +impl Subscription { + /// Returns the next notification from the stream. + /// This may return `None` if the subscription has been terminated, + /// which may happen if the channel becomes full or is dropped. + /// + /// **Note:** This has an identical signature to the [`StreamExt::next`] + /// method (and delegates to that). Import [`StreamExt`] if you'd like + /// access to other stream combinator methods. + #[allow(clippy::should_implement_trait)] + pub async fn next(&mut self) -> Option { + StreamExt::next(self).await + } + + /// Get the subscription ID. + pub fn id(&self) -> SubscriptionId<'static> { + self.id.clone() + } +} + +impl Stream for Subscription { + type Item = SubscriptionResult; + + fn poll_next( + mut self: Pin<&mut Self>, + cx: &mut task::Context<'_>, + ) -> task::Poll> { + match self.stream.poll_recv(cx) { + Poll::Ready(Some(msg)) => Poll::Ready(Some(msg)), + Poll::Ready(None) => Poll::Ready(None), + Poll::Pending => Poll::Pending, + } + } +} + +impl std::fmt::Debug for Subscription { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_fmt(format_args!("Subscription {:?}", self.id)) + } +} + +/// JSON-RPC client that reconnects automatically and may loose +/// subscription notifications when it reconnects. +#[derive(Clone, Debug)] +pub struct Client { + tx: mpsc::UnboundedSender, + reconnect: ReconnectRx, +} + +/// Builder for [`Client`]. +#[derive(Clone, Debug)] +pub struct ClientBuilder

{ + max_request_size: u32, + max_response_size: u32, + retry_policy: P, + #[cfg(feature = "native")] + ping_config: Option, + #[cfg(feature = "native")] + // web doesn't support custom headers + // https://stackoverflow.com/a/4361358/6394734 + headers: HeaderMap, + max_redirections: u32, + id_kind: IdKind, + max_log_len: u32, + max_concurrent_requests: u32, + request_timeout: Duration, + connection_timeout: Duration, +} + +impl Default for ClientBuilder { + fn default() -> Self { + Self { + max_request_size: 10 * 1024 * 1024, + max_response_size: 10 * 1024 * 1024, + retry_policy: ExponentialBackoff::from_millis(10).max_delay(Duration::from_secs(60)), + #[cfg(feature = "native")] + ping_config: Some(PingConfig::new()), + #[cfg(feature = "native")] + headers: HeaderMap::new(), + max_redirections: 5, + id_kind: IdKind::Number, + max_log_len: 1024, + max_concurrent_requests: 1024, + request_timeout: Duration::from_secs(60), + connection_timeout: Duration::from_secs(10), + } + } +} + +impl ClientBuilder { + /// Create a new builder. + pub fn new() -> Self { + Self::default() + } +} + +impl

ClientBuilder

+where + P: Iterator + Send + Sync + 'static + Clone, +{ + /// Configure the min response size a for websocket message. + /// + /// Default: 10MB + pub fn max_request_size(mut self, max: u32) -> Self { + self.max_request_size = max; + self + } + + /// Configure the max response size a for websocket message. + /// + /// Default: 10MB + pub fn max_response_size(mut self, max: u32) -> Self { + self.max_response_size = max; + self + } + + /// Set the max number of redirections to perform until a connection is regarded as failed. + /// + /// Default: 5 + pub fn max_redirections(mut self, redirect: u32) -> Self { + self.max_redirections = redirect; + self + } + + /// Configure how many concurrent method calls are allowed. + /// + /// Default: 1024 + pub fn max_concurrent_requests(mut self, max: u32) -> Self { + self.max_concurrent_requests = max; + self + } + + /// Configure how long until a method call is regarded as failed. + /// + /// Default: 1 minute + pub fn request_timeout(mut self, timeout: Duration) -> Self { + self.request_timeout = timeout; + self + } + + /// Set connection timeout for the WebSocket handshake + /// + /// Default: 10 seconds + pub fn connection_timeout(mut self, timeout: Duration) -> Self { + self.connection_timeout = timeout; + self + } + + /// Configure the data type of the request object ID + /// + /// Default: number + pub fn id_format(mut self, kind: IdKind) -> Self { + self.id_kind = kind; + self + } + + /// Set maximum length for logging calls and responses. + /// Logs bigger than this limit will be truncated. + /// + /// Default: 1024 + pub fn set_max_logging_length(mut self, max: u32) -> Self { + self.max_log_len = max; + self + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(feature = "native")))] + /// Configure custom headers to use in the WebSocket handshake. + pub fn set_headers(mut self, headers: HeaderMap) -> Self { + self.headers = headers; + self + } + + /// Configure which retry policy to use when a connection is lost. + /// + /// Default: Exponential backoff 10ms + pub fn retry_policy(self, retry_policy: T) -> ClientBuilder { + ClientBuilder { + max_request_size: self.max_request_size, + max_response_size: self.max_response_size, + retry_policy, + #[cfg(feature = "native")] + ping_config: self.ping_config, + #[cfg(feature = "native")] + headers: self.headers, + max_redirections: self.max_redirections, + max_log_len: self.max_log_len, + id_kind: self.id_kind, + max_concurrent_requests: self.max_concurrent_requests, + request_timeout: self.request_timeout, + connection_timeout: self.connection_timeout, + } + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(feature = "native")))] + /// Configure the WebSocket ping/pong interval. + /// + /// Default: 30 seconds. + pub fn enable_ws_ping(mut self, ping_config: PingConfig) -> Self { + self.ping_config = Some(ping_config); + self + } + + #[cfg(feature = "native")] + #[cfg_attr(docsrs, doc(cfg(feature = "native")))] + /// Disable WebSocket ping/pongs. + /// + /// Default: 30 seconds. + pub fn disable_ws_ping(mut self) -> Self { + self.ping_config = None; + self + } + + /// Build and connect to the target. + pub async fn build(self, url: String) -> Result { + let (tx, rx) = mpsc::unbounded_channel(); + let client = Retry::new(self.retry_policy.clone(), || { + platform::ws_client(url.as_ref(), &self) + }) + .await?; + let (reconn_tx, reconn_rx) = reconnect_channel(); + + platform::spawn(background_task(client, rx, url, reconn_tx, self)); + + Ok(Client { + tx, + reconnect: reconn_rx, + }) + } +} + +impl Client { + /// Perform a JSON-RPC method call. + pub async fn request( + &self, + method: String, + params: Option>, + ) -> Result, Error> { + let (tx, rx) = oneshot::channel(); + self.tx + .send(Op::Call { + method, + params: RpcParams(params), + send_back: tx, + }) + .map_err(|_| Error::Dropped)?; + + rx.await.map_err(|_| Error::Dropped)? + } + + /// Perform a JSON-RPC subscription. + pub async fn subscribe( + &self, + subscribe_method: String, + params: Option>, + unsubscribe_method: String, + ) -> Result { + let (tx, rx) = oneshot::channel(); + self.tx + .send(Op::Subscription { + subscribe_method, + params: RpcParams::new(params), + unsubscribe_method, + send_back: tx, + }) + .map_err(|_| Error::Dropped)?; + rx.await.map_err(|_| Error::Dropped)? + } + + /// A future that resolves when the client has initiated a reconnection. + /// This method returns another future that resolves when the client has reconnected. + /// + /// This may be called multiple times. + pub async fn reconnect_initiated(&self) -> impl Future + '_ { + self.reconnect.reconnect_started().await; + self.reconnect.reconnected() + } + + /// Get how many times the client has reconnected successfully. + pub fn reconnect_count(&self) -> usize { + self.reconnect.count() + } +} + +impl Client { + /// Create a builder. + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } +} + +impl RpcClientT for Client { + fn request_raw<'a>( + &'a self, + method: &'a str, + params: Option>, + ) -> RawRpcFuture<'a, Box> { + async { + self.request(method.to_string(), params) + .await + .map_err(|e| SubxtRpcError::DisconnectedWillReconnect(e.to_string())) + } + .boxed() + } + + fn subscribe_raw<'a>( + &'a self, + sub: &'a str, + params: Option>, + unsub: &'a str, + ) -> RawRpcFuture<'a, RawRpcSubscription> { + async { + let sub = self + .subscribe(sub.to_string(), params, unsub.to_string()) + .await + .map_err(|e| SubxtRpcError::ClientError(Box::new(e)))?; + + let id = match sub.id() { + SubscriptionId::Num(n) => n.to_string(), + SubscriptionId::Str(s) => s.to_string(), + }; + let stream = sub + .map_err(|e| SubxtRpcError::DisconnectedWillReconnect(e.to_string())) + .boxed(); + + Ok(RawRpcSubscription { + stream, + id: Some(id), + }) + } + .boxed() + } +} + +async fn background_task

( + mut client: Arc, + mut rx: UnboundedReceiver, + url: String, + reconn: ReconnectTx, + client_builder: ClientBuilder

, +) where + P: Iterator + Send + 'static + Clone, +{ + let disconnect = Arc::new(tokio::sync::Notify::new()); + + loop { + tokio::select! { + // An incoming JSON-RPC call to dispatch. + next_message = rx.recv() => { + match next_message { + None => break, + Some(op) => { + tokio::spawn(dispatch_call(client.clone(), op, disconnect.clone())); + } + }; + } + // The connection was terminated and try to reconnect. + _ = client.on_disconnect() => { + let params = ReconnectParams { + url: &url, + reconnect: reconn.clone(), + client_builder: &client_builder, + close_reason: client.disconnect_reason().await, + }; + + client = match reconnect(params).await { + Ok(client) => client, + Err(e) => { + tracing::debug!(target: LOG_TARGET, "Failed to reconnect: {e}; terminating the connection"); + break; + } + }; + } + } + } + + disconnect.notify_waiters(); +} + +async fn dispatch_call(client: Arc, op: Op, on_disconnect: Arc) { + match op { + Op::Call { + method, + params, + send_back, + } => { + match client.request::, _>(&method, params).await { + Ok(rp) => { + // Fails only if the request is dropped by the client. + let _ = send_back.send(Ok(rp)); + } + Err(RpcError::RestartNeeded(e)) => { + // Fails only if the request is dropped by the client. + let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into())); + } + Err(e) => { + // Fails only if the request is dropped by the client. + let _ = send_back.send(Err(Error::RpcError(e))); + } + } + } + Op::Subscription { + subscribe_method, + params, + unsubscribe_method, + send_back, + } => { + match client + .subscribe::, _>( + &subscribe_method, + params.clone(), + &unsubscribe_method, + ) + .await + { + Ok(sub) => { + let (tx, rx) = mpsc::unbounded_channel(); + let sub_id = match sub.kind() { + SubscriptionKind::Subscription(id) => id.clone().into_owned(), + _ => unreachable!("No method subscriptions possible in this crate; qed"), + }; + + platform::spawn(subscription_handler( + tx.clone(), + sub, + on_disconnect.clone(), + client.clone(), + )); + + let stream = Subscription { + id: sub_id, + stream: rx, + }; + + // Fails only if the request is dropped by the client. + let _ = send_back.send(Ok(stream)); + } + Err(RpcError::RestartNeeded(e)) => { + // Fails only if the request is dropped by the client. + let _ = send_back.send(Err(DisconnectedWillReconnect(e.to_string()).into())); + } + Err(e) => { + // Fails only if the request is dropped. + let _ = send_back.send(Err(Error::RpcError(e))); + } + } + } + } +} + +/// Handler for each individual subscription. +async fn subscription_handler( + sub_tx: UnboundedSender, + mut rpc_sub: RpcSubscription>, + client_closed: Arc, + client: Arc, +) { + loop { + tokio::select! { + next_msg = rpc_sub.next() => { + let Some(notif) = next_msg else { + let close = client.disconnect_reason().await; + _ = sub_tx.send(Err(DisconnectedWillReconnect(close.to_string()))); + break; + }; + + let msg = notif.expect("RawValue is valid JSON; qed"); + + // Fails only if subscription was closed by the user. + if sub_tx.send(Ok(msg)).is_err() { + break; + } + } + // This channel indices whether the subscription was closed by user. + _ = sub_tx.closed() => { + break; + } + // This channel indicates whether the main task has been closed. + // at this point no further messages are processed. + _ = client_closed.notified() => { + break; + } + } + } +} + +struct ReconnectParams<'a, P> { + url: &'a str, + reconnect: ReconnectTx, + client_builder: &'a ClientBuilder

, + close_reason: RpcError, +} + +async fn reconnect

(params: ReconnectParams<'_, P>) -> Result, RpcError> +where + P: Iterator + Send + 'static + Clone, +{ + let ReconnectParams { + url, + reconnect, + client_builder, + close_reason, + } = params; + + let retry_policy = client_builder.retry_policy.clone(); + + tracing::debug!(target: LOG_TARGET, "Connection to {url} was closed: `{}`; starting to reconnect", display_close_reason(&close_reason)); + reconnect.reconnect_initiated(); + + let client = Retry::new(retry_policy.clone(), || { + platform::ws_client(url, client_builder) + }) + .await?; + + reconnect.reconnected(); + tracing::debug!(target: LOG_TARGET, "Connection to {url} was successfully re-established"); + + Ok(client) +} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs new file mode 100644 index 0000000000..043fef3de9 --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs @@ -0,0 +1,77 @@ +// Copyright 2019-2024 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +use crate::backend::rpc::reconnecting_rpc_client::{ClientBuilder, RpcError}; +use jsonrpsee::core::client::Client; +use std::sync::Arc; + +#[cfg(feature = "native")] +pub use tokio::spawn; + +#[cfg(feature = "web")] +pub use wasm_bindgen_futures::spawn_local as spawn; + +#[cfg(feature = "native")] +pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { + use jsonrpsee::ws_client::WsClientBuilder; + + let ClientBuilder { + max_request_size, + max_response_size, + ping_config, + headers, + max_redirections, + id_kind, + max_concurrent_requests, + max_log_len, + request_timeout, + connection_timeout, + .. + } = builder; + + let mut ws_client_builder = WsClientBuilder::new() + .max_request_size(*max_request_size) + .max_response_size(*max_response_size) + .set_headers(headers.clone()) + .max_redirections(*max_redirections as usize) + .max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS) + .max_concurrent_requests(*max_concurrent_requests as usize) + .set_max_logging_length(*max_log_len) + .set_tcp_no_delay(true) + .request_timeout(*request_timeout) + .connection_timeout(*connection_timeout) + .id_format(*id_kind); + + if let Some(ping) = ping_config { + ws_client_builder = ws_client_builder.enable_ws_ping(*ping); + } + + let client = ws_client_builder.build(url).await?; + + Ok(Arc::new(client)) +} + +#[cfg(feature = "web")] +pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { + use jsonrpsee::wasm_client::WasmClientBuilder; + + let ClientBuilder { + id_kind, + max_concurrent_requests, + max_log_len, + request_timeout, + .. + } = builder; + + let ws_client_builder = WasmClientBuilder::new() + .max_buffer_capacity_per_subscription(tokio::sync::Semaphore::MAX_PERMITS) + .max_concurrent_requests(*max_concurrent_requests as usize) + .set_max_logging_length(*max_log_len) + .request_timeout(*request_timeout) + .id_format(*id_kind); + + let client = ws_client_builder.build(url).await?; + + Ok(Arc::new(client)) +} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs new file mode 100644 index 0000000000..99317c34fb --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -0,0 +1,285 @@ +// Copyright 2019-2024 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +#![cfg(test)] + +use super::*; +use futures::{future::Either, stream::FuturesUnordered, FutureExt, TryStreamExt}; + +use jsonrpsee_server::{ + http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse, RpcModule, + RpcServiceBuilder, ServerConfig, SubscriptionMessage, +}; +use tower::BoxError; +use tracing_subscriber::util::SubscriberInitExt; + +fn init_logger() { + let filter = tracing_subscriber::EnvFilter::from_default_env(); + let _ = tracing_subscriber::fmt() + .with_env_filter(filter) + .finish() + .try_init(); +} + +#[tokio::test] +async fn call_works() { + init_logger(); + let (_handle, addr) = run_server().await.unwrap(); + + let client = Client::builder().build(addr).await.unwrap(); + + assert!(client.request("say_hello".to_string(), None).await.is_ok(),) +} + +#[tokio::test] +async fn sub_works() { + init_logger(); + let (_handle, addr) = run_server().await.unwrap(); + + let client = Client::builder() + .retry_policy(ExponentialBackoff::from_millis(50)) + .build(addr) + .await + .unwrap(); + + let mut sub = client + .subscribe( + "subscribe_lo".to_string(), + None, + "unsubscribe_lo".to_string(), + ) + .await + .unwrap(); + + assert!(sub.next().await.is_some()); +} + +#[tokio::test] +async fn sub_with_reconnect() { + init_logger(); + let (handle, addr) = run_server().await.unwrap(); + let client = Client::builder().build(addr.clone()).await.unwrap(); + + let mut sub = client + .subscribe( + "subscribe_lo".to_string(), + None, + "unsubscribe_lo".to_string(), + ) + .await + .unwrap(); + + let _ = handle.send(()); + client.reconnect_initiated().await; + + assert!(matches!(sub.next().await, Some(Ok(_)))); + assert!(matches!( + sub.next().await, + Some(Err(DisconnectedWillReconnect(_))) + )); + + // Restart the server. + let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); + client.reconnected().await; + + assert_eq!(client.reconnect_count(), 1); + + // Subscription should work after reconnect. + let mut sub = client + .subscribe( + "subscribe_lo".to_string(), + None, + "unsubscribe_lo".to_string(), + ) + .await + .unwrap(); + + assert!(matches!(sub.next().await, Some(Ok(_)))); +} + +#[tokio::test] +async fn call_with_reconnect() { + init_logger(); + let (handle, addr) = run_server_with_settings(None, true).await.unwrap(); + + let client = Arc::new(Client::builder().build(addr.clone()).await.unwrap()); + + let req_fut = client.request("say_hello".to_string(), None).boxed(); + let timeout_fut = tokio::time::sleep(Duration::from_secs(5)); + + // If the call isn't replied in 5 secs then it's regarded as it's still pending. + let req_fut = match futures::future::select(Box::pin(timeout_fut), req_fut).await { + Either::Left((_, f)) => f, + Either::Right(_) => panic!("RPC call finished"), + }; + + // Close the connection with a pending call. + let _ = handle.send(()); + client.reconnect_initiated().await; + + // Restart the server + let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); + + client.reconnected().await; + + // This call should fail because reconnect. + assert!(req_fut.await.is_err()); + // Future call should work after reconnect. + assert!(client.request("say_hello".to_string(), None).await.is_ok()); +} + +async fn run_server() -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> { + run_server_with_settings(None, false).await +} + +async fn run_server_with_settings( + url: Option<&str>, + dont_respond_to_method_calls: bool, +) -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> { + use jsonrpsee_server::HttpRequest; + + let sockaddr = match url { + Some(url) => url.strip_prefix("ws://").unwrap(), + None => "127.0.0.1:0", + }; + + let mut i = 0; + + let listener = loop { + if let Ok(l) = tokio::net::TcpListener::bind(sockaddr).await { + break l; + } + tokio::time::sleep(Duration::from_millis(100)).await; + + if i >= 10 { + panic!("Addr already in use"); + } + + i += 1; + }; + + let mut module = RpcModule::new(()); + + if dont_respond_to_method_calls { + module.register_async_method("say_hello", |_, _, _| async { + futures::future::pending::<()>().await; + "timeout" + })?; + } else { + module.register_async_method("say_hello", |_, _, _| async { "lo" })?; + } + + module.register_subscription( + "subscribe_lo", + "subscribe_lo", + "unsubscribe_lo", + |_params, pending, _ctx, _| async move { + let sink = pending.accept().await.unwrap(); + let i = 0; + + loop { + if sink + .send(SubscriptionMessage::from_json(&i).unwrap()) + .await + .is_err() + { + break; + } + tokio::time::sleep(std::time::Duration::from_secs(6)).await; + } + }, + )?; + + let (tx, mut rx) = tokio::sync::broadcast::channel(4); + let tx2 = tx.clone(); + let (stop_handle, server_handle) = stop_channel(); + let addr = listener.local_addr().expect("Could not find local addr"); + + tokio::spawn(async move { + loop { + let sock = tokio::select! { + res = listener.accept() => { + match res { + Ok((stream, _remote_addr)) => stream, + Err(e) => { + tracing::error!("Failed to accept connection: {:?}", e); + continue; + } + } + } + _ = rx.recv() => { + break + } + }; + + let module = module.clone(); + let rx2 = tx2.subscribe(); + let tx2 = tx2.clone(); + let stop_handle2 = stop_handle.clone(); + + let svc = tower::service_fn(move |req: HttpRequest| { + let module = module.clone(); + let tx = tx2.clone(); + let stop_handle = stop_handle2.clone(); + + let conn_permit = ConnectionGuard::new(1).try_acquire().unwrap(); + + if ws::is_upgrade_request(&req) { + let rpc_service = RpcServiceBuilder::new(); + let conn = ConnectionState::new(stop_handle, 1, conn_permit); + + async move { + let mut rx = tx.subscribe(); + + let (rp, conn_fut) = + ws::connect(req, ServerConfig::default(), module, conn, rpc_service) + .await + .unwrap(); + + tokio::spawn(async move { + tokio::select! { + _ = conn_fut => (), + _ = rx.recv() => {}, + } + }); + + Ok::<_, BoxError>(rp) + } + .boxed() + } else { + async { Ok(http::response::denied()) }.boxed() + } + }); + + tokio::spawn(serve_with_graceful_shutdown(sock, svc, rx2)); + } + + drop(server_handle); + }); + + Ok((tx, format!("ws://{}", addr))) +} + +async fn serve_with_graceful_shutdown( + io: I, + service: S, + mut rx: tokio::sync::broadcast::Receiver<()>, +) where + S: tower::Service, Response = HttpResponse> + + Clone + + Send + + 'static, + S::Future: Send, + S::Response: Send, + S::Error: Into, + B: http_body::Body + Send + 'static, + B::Error: Into, + I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, +{ + if let Err(e) = + jsonrpsee_server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await + { + tracing::error!("Error while serving: {:?}", e); + } +} diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs new file mode 100644 index 0000000000..5b11969b49 --- /dev/null +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs @@ -0,0 +1,103 @@ +// Copyright 2019-2024 Parity Technologies (UK) Ltd. +// This file is dual-licensed as Apache-2.0 or GPL-3.0. +// see LICENSE for license details. + +//! Utils. + +use crate::backend::rpc::reconnecting_rpc_client::RpcError; +use std::{ + sync::atomic::{AtomicUsize, Ordering}, + sync::Arc, +}; +use tokio::sync::Notify; + +#[derive(Clone, Debug)] +pub struct ReconnectCounter(Arc); + +impl Default for ReconnectCounter { + fn default() -> Self { + Self::new() + } +} + +impl ReconnectCounter { + pub fn new() -> Self { + Self(Arc::new(AtomicUsize::new(0))) + } + + pub fn get(&self) -> usize { + self.0.load(Ordering::SeqCst) + } + + pub fn inc(&self) { + self.0.fetch_add(1, Ordering::SeqCst); + } +} + +pub fn reconnect_channel() -> (ReconnectTx, ReconnectRx) { + let count = ReconnectCounter::new(); + let reconn_init = Arc::new(Notify::new()); + let reconn_compl = Arc::new(Notify::new()); + ( + ReconnectTx { + reconn_init: reconn_init.clone(), + reconn_compl: reconn_compl.clone(), + count: count.clone(), + }, + ReconnectRx { + reconn_init, + reconn_compl, + count, + }, + ) +} + +#[derive(Debug, Clone)] +pub struct ReconnectTx { + reconn_init: Arc, + reconn_compl: Arc, + count: ReconnectCounter, +} + +impl ReconnectTx { + pub fn reconnect_initiated(&self) { + self.reconn_init.notify_one(); + } + + pub fn reconnected(&self) { + self.reconn_compl.notify_one(); + self.count.inc(); + } + + pub fn count(&self) -> usize { + self.count.get() + } +} + +#[derive(Debug, Clone)] +pub struct ReconnectRx { + reconn_init: Arc, + reconn_compl: Arc, + count: ReconnectCounter, +} + +impl ReconnectRx { + pub async fn reconnect_started(&self) { + self.reconn_init.notified().await; + } + + pub async fn reconnected(&self) { + self.reconn_compl.notified().await; + } + + pub fn count(&self) -> usize { + self.count.get() + } +} + +pub fn display_close_reason(err: &RpcError) -> String { + match err { + RpcError::RestartNeeded(e) => e.to_string(), + other => other.to_string(), + } +} diff --git a/subxt/src/macros.rs b/subxt/src/macros.rs index 8e5a8283df..84cad40d2c 100644 --- a/subxt/src/macros.rs +++ b/subxt/src/macros.rs @@ -56,7 +56,7 @@ macro_rules! cfg_jsonrpsee_web { macro_rules! cfg_reconnecting_rpc_client { ($($item:item)*) => { $( - #[cfg(feature = "unstable-reconnecting-rpc-client")] + #[cfg(all(feature = "unstable-reconnecting-rpc-client", any(feature = "native", feature = "web")))] #[cfg_attr(docsrs, doc(cfg(feature = "unstable-reconnecting-rpc-client")))] $item )* From 901ce74c9200ab0d842682717e2e229cea5c6286 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 22 Aug 2024 10:18:15 +0200 Subject: [PATCH 02/11] add jsonrpsee dep to reconnecting-client --- subxt/Cargo.toml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index c161c2fa9d..340719bd7a 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -48,7 +48,7 @@ web = [ ] # Enable this to use the reconnecting rpc client -unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio"] +unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio", "jsonrpsee"] # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = [ @@ -113,6 +113,8 @@ getrandom = { workspace = true, optional = true } tokio-util = { workspace = true, features = ["compat"], optional = true } # Included if the reconnecting rpc client feature is enabled +# Only the `tokio/sync` is used in the reconnecting rpc client +# and that compiles both for native and web. tokio = { workspace = true, optional = true } finito = { workspace = true, optional = true } From 63eefcc1a1106a9569886efc82dcc1c362ca804f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 22 Aug 2024 10:22:12 +0200 Subject: [PATCH 03/11] Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs --- subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index 99317c34fb..bb9ee559cf 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -6,7 +6,6 @@ use super::*; use futures::{future::Either, stream::FuturesUnordered, FutureExt, TryStreamExt}; - use jsonrpsee_server::{ http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse, RpcModule, RpcServiceBuilder, ServerConfig, SubscriptionMessage, From 01b53194e95d33742bbb56a0fb4ea8dc7a116de9 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 23 Aug 2024 15:24:11 +0200 Subject: [PATCH 04/11] fix grumbles --- .github/workflows/rust.yml | 2 +- Cargo.lock | 50 +++++++++++++ subxt/Cargo.toml | 5 ++ .../rpc/reconnecting_rpc_client/mod.rs | 70 ++++++++++++++----- .../rpc/reconnecting_rpc_client/tests.rs | 39 ++++------- .../rpc/reconnecting_rpc_client/utils.rs | 12 ++-- 6 files changed, 127 insertions(+), 51 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index f049cf6d91..59bec22cc8 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -295,7 +295,7 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: nextest - args: run --workspace + args: run --workspace --features unstable-reconnecting-rpc-client - if: "failure()" uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5 diff --git a/Cargo.lock b/Cargo.lock index d5cd3b292a..e99970257e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2163,6 +2163,12 @@ version = "1.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904" +[[package]] +name = "httpdate" +version = "1.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df3b46402a9d5adb4c86a0cf463f42e19994e3ee891101b1841f30a545cb49a9" + [[package]] name = "hyper" version = "1.3.1" @@ -2176,6 +2182,7 @@ dependencies = [ "http 1.1.0", "http-body", "httparse", + "httpdate", "itoa", "pin-project-lite", "smallvec", @@ -2459,9 +2466,11 @@ dependencies = [ "jsonrpsee-client-transport", "jsonrpsee-core", "jsonrpsee-http-client", + "jsonrpsee-server", "jsonrpsee-types", "jsonrpsee-wasm-client", "jsonrpsee-ws-client", + "tokio", ] [[package]] @@ -2505,7 +2514,9 @@ dependencies = [ "http-body", "http-body-util", "jsonrpsee-types", + "parking_lot", "pin-project", + "rand", "rustc-hash", "serde", "serde_json", @@ -2541,6 +2552,34 @@ dependencies = [ "url", ] +[[package]] +name = "jsonrpsee-server" +version = "0.23.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "654afab2e92e5d88ebd8a39d6074483f3f2bfdf91c5ac57fe285e7127cdd4f51" +dependencies = [ + "anyhow", + "futures-util", + "http 1.1.0", + "http-body", + "http-body-util", + "hyper", + "hyper-util", + "jsonrpsee-core", + "jsonrpsee-types", + "pin-project", + "route-recognizer", + "serde", + "serde_json", + "soketto 0.8.0", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tower", + "tracing", +] + [[package]] name = "jsonrpsee-types" version = "0.23.2" @@ -3584,6 +3623,12 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "route-recognizer" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afab94fb28594581f62d981211a9a4d53cc8130bbcbbb89a0440d9b8e81a7746" + [[package]] name = "rustc-demangle" version = "0.1.23" @@ -4363,6 +4408,7 @@ dependencies = [ "base64 0.22.0", "bytes", "futures", + "http 1.1.0", "httparse", "log", "rand", @@ -4835,6 +4881,8 @@ dependencies = [ "futures", "getrandom", "hex", + "http-body", + "hyper", "impl-serde", "instant", "jsonrpsee", @@ -4859,6 +4907,7 @@ dependencies = [ "thiserror", "tokio", "tokio-util", + "tower", "tracing", "tracing-subscriber 0.3.18", "url", @@ -5221,6 +5270,7 @@ dependencies = [ "futures-core", "pin-project-lite", "tokio", + "tokio-util", ] [[package]] diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 340719bd7a..110eda6281 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -133,6 +133,11 @@ subxt-signer = { path = "../signer", features = ["unstable-eth"] } # the light-client wlll emit INFO logs with # `GrandPa warp sync finished` and `Finalized block runtime ready.` tracing-subscriber = { workspace = true } +# These deps are needed to test the reconnecting rpc client +jsonrpsee = { workspace = true, features = ["server"] } +tower = "0.4" +hyper = "1" +http-body = "1" [[example]] name = "light_client_basic" diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs index 819877218c..e81661a3a5 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs @@ -4,6 +4,47 @@ //! # reconnecting-jsonrpsee-ws-client //! +//! A simple reconnecting JSON-RPC WebSocket client for subxt which +//! automatically reconnects when the connection is lost but +//! it doesn't retain subscriptions and pending method calls when it reconnects. +//! +//! The logic which action to take for individual calls and subscriptions are +//! handled by the subxt backend implementations. +//! +//! # Example +//! +//! ```no_run +//! use std::time::Duration; +//! use futures::StreamExt; +//! use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExpontentialBackoff}; +//! use subxt::{OnlineClient, PolkadotConfig}; +//! +//! async fn main() { +//! let rpc = Client::builder() +//! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)) +//! .build("ws://localhost:9944".to_string()) +//! .await +//! .unwrap(); +//! +//! let subxt_client: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await.unwrap(); +//! let blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap(); +//! +//! while let Some(block) = blocks_sub.next().await { +//! let block = match block { +//! Ok(b) => b, +//! Err(e) => { +//! if e.is_disconnected_will_reconnect() { +//! println!("The RPC connection was lost and we may have missed a few blocks"); +//! continue; +//! } else { +//! panic!("Error: {}", e); +//! } +//! } +//! }; +//! println!("Block #{} ({})", block.number(), block.hash()); +//! } +//! } +//! ``` mod platform; #[cfg(test)] @@ -57,16 +98,9 @@ pub type SubscriptionResult = Result, DisconnectedWillReconnect>; #[error("The connection was closed because of `{0:?}` and reconnect initiated")] pub struct DisconnectedWillReconnect(String); -/// Serialized JSON-RPC params. +/// New-type pattern which implements [`ToRpcParams`] that is required by jsonrpsee. #[derive(Debug, Clone)] -pub struct RpcParams(Option>); - -impl RpcParams { - /// Create new [`RpcParams`] from JSON. - pub fn new(json: Option>) -> Self { - Self(json) - } -} +struct RpcParams(Option>); impl ToRpcParams for RpcParams { fn to_rpc_params(self) -> Result>, serde_json::Error> { @@ -145,7 +179,9 @@ impl Stream for Subscription { impl std::fmt::Debug for Subscription { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.write_fmt(format_args!("Subscription {:?}", self.id)) + f.debug_struct("Subscription") + .field("id", &self.id) + .finish() } } @@ -341,6 +377,11 @@ where } impl Client { + /// Create a builder. + pub fn builder() -> ClientBuilder { + ClientBuilder::new() + } + /// Perform a JSON-RPC method call. pub async fn request( &self, @@ -370,7 +411,7 @@ impl Client { self.tx .send(Op::Subscription { subscribe_method, - params: RpcParams::new(params), + params: RpcParams(params), unsubscribe_method, send_back: tx, }) @@ -393,13 +434,6 @@ impl Client { } } -impl Client { - /// Create a builder. - pub fn builder() -> ClientBuilder { - ClientBuilder::new() - } -} - impl RpcClientT for Client { fn request_raw<'a>( &'a self, diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index 99317c34fb..e5ff0781cf 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -5,26 +5,17 @@ #![cfg(test)] use super::*; -use futures::{future::Either, stream::FuturesUnordered, FutureExt, TryStreamExt}; +use futures::{future::Either, FutureExt}; -use jsonrpsee_server::{ +use jsonrpsee::core::BoxError; +use jsonrpsee::server::{ http, stop_channel, ws, ConnectionGuard, ConnectionState, HttpRequest, HttpResponse, RpcModule, RpcServiceBuilder, ServerConfig, SubscriptionMessage, }; -use tower::BoxError; -use tracing_subscriber::util::SubscriberInitExt; - -fn init_logger() { - let filter = tracing_subscriber::EnvFilter::from_default_env(); - let _ = tracing_subscriber::fmt() - .with_env_filter(filter) - .finish() - .try_init(); -} #[tokio::test] async fn call_works() { - init_logger(); + tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); let client = Client::builder().build(addr).await.unwrap(); @@ -34,7 +25,7 @@ async fn call_works() { #[tokio::test] async fn sub_works() { - init_logger(); + tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); let client = Client::builder() @@ -57,7 +48,7 @@ async fn sub_works() { #[tokio::test] async fn sub_with_reconnect() { - init_logger(); + tracing_subscriber::fmt::init(); let (handle, addr) = run_server().await.unwrap(); let client = Client::builder().build(addr.clone()).await.unwrap(); @@ -71,7 +62,7 @@ async fn sub_with_reconnect() { .unwrap(); let _ = handle.send(()); - client.reconnect_initiated().await; + let reconnected = client.reconnect_initiated().await; assert!(matches!(sub.next().await, Some(Ok(_)))); assert!(matches!( @@ -81,7 +72,7 @@ async fn sub_with_reconnect() { // Restart the server. let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); - client.reconnected().await; + reconnected.await; assert_eq!(client.reconnect_count(), 1); @@ -100,7 +91,7 @@ async fn sub_with_reconnect() { #[tokio::test] async fn call_with_reconnect() { - init_logger(); + tracing_subscriber::fmt::init(); let (handle, addr) = run_server_with_settings(None, true).await.unwrap(); let client = Arc::new(Client::builder().build(addr.clone()).await.unwrap()); @@ -116,12 +107,12 @@ async fn call_with_reconnect() { // Close the connection with a pending call. let _ = handle.send(()); - client.reconnect_initiated().await; + let reconnected = client.reconnect_initiated().await; // Restart the server let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); - client.reconnected().await; + reconnected.await; // This call should fail because reconnect. assert!(req_fut.await.is_err()); @@ -129,15 +120,15 @@ async fn call_with_reconnect() { assert!(client.request("say_hello".to_string(), None).await.is_ok()); } -async fn run_server() -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> { +async fn run_server() -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> { run_server_with_settings(None, false).await } async fn run_server_with_settings( url: Option<&str>, dont_respond_to_method_calls: bool, -) -> anyhow::Result<(tokio::sync::broadcast::Sender<()>, String)> { - use jsonrpsee_server::HttpRequest; +) -> Result<(tokio::sync::broadcast::Sender<()>, String), BoxError> { + use jsonrpsee::server::HttpRequest; let sockaddr = match url { Some(url) => url.strip_prefix("ws://").unwrap(), @@ -278,7 +269,7 @@ async fn serve_with_graceful_shutdown( I: tokio::io::AsyncRead + tokio::io::AsyncWrite + Send + Unpin + 'static, { if let Err(e) = - jsonrpsee_server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await + jsonrpsee::server::serve_with_graceful_shutdown(io, service, rx.recv().map(|_| ())).await { tracing::error!("Error while serving: {:?}", e); } diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs index 5b11969b49..679c8c51cb 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs @@ -12,7 +12,7 @@ use std::{ use tokio::sync::Notify; #[derive(Clone, Debug)] -pub struct ReconnectCounter(Arc); +pub(crate) struct ReconnectCounter(Arc); impl Default for ReconnectCounter { fn default() -> Self { @@ -34,7 +34,7 @@ impl ReconnectCounter { } } -pub fn reconnect_channel() -> (ReconnectTx, ReconnectRx) { +pub(crate) fn reconnect_channel() -> (ReconnectTx, ReconnectRx) { let count = ReconnectCounter::new(); let reconn_init = Arc::new(Notify::new()); let reconn_compl = Arc::new(Notify::new()); @@ -53,7 +53,7 @@ pub fn reconnect_channel() -> (ReconnectTx, ReconnectRx) { } #[derive(Debug, Clone)] -pub struct ReconnectTx { +pub(crate) struct ReconnectTx { reconn_init: Arc, reconn_compl: Arc, count: ReconnectCounter, @@ -68,14 +68,10 @@ impl ReconnectTx { self.reconn_compl.notify_one(); self.count.inc(); } - - pub fn count(&self) -> usize { - self.count.get() - } } #[derive(Debug, Clone)] -pub struct ReconnectRx { +pub(crate) struct ReconnectRx { reconn_init: Arc, reconn_compl: Arc, count: ReconnectCounter, From 7aef2782dc5bd9d24c58dbe3ae9099f27985083a Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 23 Aug 2024 16:08:59 +0200 Subject: [PATCH 05/11] add simple wasm test for reconnecting client --- .github/workflows/rust.yml | 2 +- Cargo.lock | 1 + subxt/Cargo.toml | 3 +- .../rpc/reconnecting_rpc_client/mod.rs | 35 ++++++++++--------- .../rpc/reconnecting_rpc_client/platform.rs | 16 ++++++--- testing/wasm-rpc-tests/Cargo.lock | 27 ++------------ testing/wasm-rpc-tests/Cargo.toml | 2 +- testing/wasm-rpc-tests/tests/wasm.rs | 13 ++++++- 8 files changed, 49 insertions(+), 50 deletions(-) diff --git a/.github/workflows/rust.yml b/.github/workflows/rust.yml index 59bec22cc8..e1819ecf19 100644 --- a/.github/workflows/rust.yml +++ b/.github/workflows/rust.yml @@ -261,7 +261,7 @@ jobs: uses: actions-rs/cargo@v1.0.3 with: command: test - args: --doc + args: --doc --features unstable-reconnecting-rpc-client - if: "failure()" uses: "andymckay/cancel-action@a955d435292c0d409d104b57d8e78435a93a6ef1" # v0.5 diff --git a/Cargo.lock b/Cargo.lock index e99970257e..df2c6bd996 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4911,6 +4911,7 @@ dependencies = [ "tracing", "tracing-subscriber 0.3.18", "url", + "wasm-bindgen-futures", ] [[package]] diff --git a/subxt/Cargo.toml b/subxt/Cargo.toml index 110eda6281..0ccffb7d7b 100644 --- a/subxt/Cargo.toml +++ b/subxt/Cargo.toml @@ -48,7 +48,7 @@ web = [ ] # Enable this to use the reconnecting rpc client -unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio", "jsonrpsee"] +unstable-reconnecting-rpc-client = ["dep:finito", "dep:tokio", "jsonrpsee", "wasm-bindgen-futures"] # Enable this to use jsonrpsee (allowing for example `OnlineClient::from_url`). jsonrpsee = [ @@ -117,6 +117,7 @@ tokio-util = { workspace = true, features = ["compat"], optional = true } # and that compiles both for native and web. tokio = { workspace = true, optional = true } finito = { workspace = true, optional = true } +wasm-bindgen-futures = { workspace = true, optional = true } [dev-dependencies] bitvec = { workspace = true } diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs index e81661a3a5..f7ace2b0a9 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs @@ -20,7 +20,7 @@ //! use subxt::{OnlineClient, PolkadotConfig}; //! //! async fn main() { -//! let rpc = Client::builder() +//! let rpc = RpcClient::builder() //! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)) //! .build("ws://localhost:9944".to_string()) //! .await @@ -70,6 +70,7 @@ use jsonrpsee::core::{ }, traits::ToRpcParams, }; +use platform::spawn; use serde_json::value::RawValue; use tokio::sync::{ mpsc::{self, UnboundedReceiver, UnboundedSender}, @@ -188,14 +189,14 @@ impl std::fmt::Debug for Subscription { /// JSON-RPC client that reconnects automatically and may loose /// subscription notifications when it reconnects. #[derive(Clone, Debug)] -pub struct Client { +pub struct RpcClient { tx: mpsc::UnboundedSender, reconnect: ReconnectRx, } /// Builder for [`Client`]. #[derive(Clone, Debug)] -pub struct ClientBuilder

{ +pub struct RpcClientBuilder

{ max_request_size: u32, max_response_size: u32, retry_policy: P, @@ -213,7 +214,7 @@ pub struct ClientBuilder

{ connection_timeout: Duration, } -impl Default for ClientBuilder { +impl Default for RpcClientBuilder { fn default() -> Self { Self { max_request_size: 10 * 1024 * 1024, @@ -233,14 +234,14 @@ impl Default for ClientBuilder { } } -impl ClientBuilder { +impl RpcClientBuilder { /// Create a new builder. pub fn new() -> Self { Self::default() } } -impl

ClientBuilder

+impl

RpcClientBuilder

where P: Iterator + Send + Sync + 'static + Clone, { @@ -320,8 +321,8 @@ where /// Configure which retry policy to use when a connection is lost. /// /// Default: Exponential backoff 10ms - pub fn retry_policy(self, retry_policy: T) -> ClientBuilder { - ClientBuilder { + pub fn retry_policy(self, retry_policy: T) -> RpcClientBuilder { + RpcClientBuilder { max_request_size: self.max_request_size, max_response_size: self.max_response_size, retry_policy, @@ -359,7 +360,7 @@ where } /// Build and connect to the target. - pub async fn build(self, url: String) -> Result { + pub async fn build(self, url: String) -> Result { let (tx, rx) = mpsc::unbounded_channel(); let client = Retry::new(self.retry_policy.clone(), || { platform::ws_client(url.as_ref(), &self) @@ -369,17 +370,17 @@ where platform::spawn(background_task(client, rx, url, reconn_tx, self)); - Ok(Client { + Ok(RpcClient { tx, reconnect: reconn_rx, }) } } -impl Client { +impl RpcClient { /// Create a builder. - pub fn builder() -> ClientBuilder { - ClientBuilder::new() + pub fn builder() -> RpcClientBuilder { + RpcClientBuilder::new() } /// Perform a JSON-RPC method call. @@ -434,7 +435,7 @@ impl Client { } } -impl RpcClientT for Client { +impl RpcClientT for RpcClient { fn request_raw<'a>( &'a self, method: &'a str, @@ -482,7 +483,7 @@ async fn background_task

( mut rx: UnboundedReceiver, url: String, reconn: ReconnectTx, - client_builder: ClientBuilder

, + client_builder: RpcClientBuilder

, ) where P: Iterator + Send + 'static + Clone, { @@ -495,7 +496,7 @@ async fn background_task

( match next_message { None => break, Some(op) => { - tokio::spawn(dispatch_call(client.clone(), op, disconnect.clone())); + spawn(dispatch_call(client.clone(), op, disconnect.clone())); } }; } @@ -632,7 +633,7 @@ async fn subscription_handler( struct ReconnectParams<'a, P> { url: &'a str, reconnect: ReconnectTx, - client_builder: &'a ClientBuilder

, + client_builder: &'a RpcClientBuilder

, close_reason: RpcError, } diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs index 043fef3de9..2123ba8cbd 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/platform.rs @@ -2,7 +2,7 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -use crate::backend::rpc::reconnecting_rpc_client::{ClientBuilder, RpcError}; +use crate::backend::rpc::reconnecting_rpc_client::{RpcClientBuilder, RpcError}; use jsonrpsee::core::client::Client; use std::sync::Arc; @@ -13,10 +13,13 @@ pub use tokio::spawn; pub use wasm_bindgen_futures::spawn_local as spawn; #[cfg(feature = "native")] -pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { +pub async fn ws_client

( + url: &str, + builder: &RpcClientBuilder

, +) -> Result, RpcError> { use jsonrpsee::ws_client::WsClientBuilder; - let ClientBuilder { + let RpcClientBuilder { max_request_size, max_response_size, ping_config, @@ -53,10 +56,13 @@ pub async fn ws_client

(url: &str, builder: &ClientBuilder

) -> Result(url: &str, builder: &ClientBuilder

) -> Result, RpcError> { +pub async fn ws_client

( + url: &str, + builder: &RpcClientBuilder

, +) -> Result, RpcError> { use jsonrpsee::wasm_client::WasmClientBuilder; - let ClientBuilder { + let RpcClientBuilder { id_kind, max_concurrent_requests, max_log_len, diff --git a/testing/wasm-rpc-tests/Cargo.lock b/testing/wasm-rpc-tests/Cargo.lock index c32ad2358c..9e7f52ba09 100644 --- a/testing/wasm-rpc-tests/Cargo.lock +++ b/testing/wasm-rpc-tests/Cargo.lock @@ -307,12 +307,6 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" -[[package]] -name = "cfg_aliases" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" - [[package]] name = "chacha20" version = "0.9.1" @@ -1793,23 +1787,6 @@ dependencies = [ "getrandom", ] -[[package]] -name = "reconnecting-jsonrpsee-ws-client" -version = "0.4.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "06fa4f17e09edfc3131636082faaec633c7baa269396b4004040bc6c52f49f65" -dependencies = [ - "cfg_aliases", - "finito", - "futures", - "jsonrpsee", - "serde_json", - "thiserror", - "tokio", - "tracing", - "wasm-bindgen-futures", -] - [[package]] name = "ring" version = "0.17.8" @@ -2494,6 +2471,7 @@ dependencies = [ "async-trait", "derive-where", "either", + "finito", "frame-metadata 16.0.0", "futures", "getrandom", @@ -2503,7 +2481,6 @@ dependencies = [ "jsonrpsee", "parity-scale-codec", "primitive-types", - "reconnecting-jsonrpsee-ws-client", "scale-bits", "scale-decode", "scale-encode", @@ -2517,8 +2494,10 @@ dependencies = [ "subxt-macro", "subxt-metadata", "thiserror", + "tokio", "tracing", "url", + "wasm-bindgen-futures", ] [[package]] diff --git a/testing/wasm-rpc-tests/Cargo.toml b/testing/wasm-rpc-tests/Cargo.toml index 7880c9964a..103091739c 100644 --- a/testing/wasm-rpc-tests/Cargo.toml +++ b/testing/wasm-rpc-tests/Cargo.toml @@ -15,4 +15,4 @@ futures-util = "0.3.30" # This crate is not a part of the workspace, because it # requires the "jsonrpsee web" features to be enabled, which we don't # want enabled for workspace builds in general. -subxt = { path = "../../subxt", default-features = false, features = ["web", "jsonrpsee"] } +subxt = { path = "../../subxt", default-features = false, features = ["web", "jsonrpsee", "unstable-reconnecting-rpc-client"] } diff --git a/testing/wasm-rpc-tests/tests/wasm.rs b/testing/wasm-rpc-tests/tests/wasm.rs index 6f8ae1e891..e708c0be33 100644 --- a/testing/wasm-rpc-tests/tests/wasm.rs +++ b/testing/wasm-rpc-tests/tests/wasm.rs @@ -1,6 +1,7 @@ #![cfg(target_arch = "wasm32")] use subxt::config::SubstrateConfig; +use subxt::backend::rpc::reconnecting_rpc_client::RpcClient as ReconnectingRpcClient; use wasm_bindgen_test::*; wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); @@ -32,5 +33,15 @@ async fn wasm_ws_transport_works() { .unwrap(); let mut stream = client.backend().stream_best_block_headers().await.unwrap(); - stream.next().await; + assert!(stream.next().await.is_some()); } + +#[wasm_bindgen_test] +async fn reconnecting_rpc_client_ws_transport_works() { + let rpc = ReconnectingRpcClient::builder().build("ws://127.0.0.1:9944".to_string()).await.unwrap(); + let client = subxt::client::OnlineClient::::from_rpc_client(rpc.clone()).await.unwrap(); + + let mut stream = client.backend().stream_best_block_headers().await.unwrap(); + assert!(stream.next().await.is_some()); +} + From 9f18d599bff3a957ce64ca45327bb73ab6d13cd2 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 23 Aug 2024 16:39:59 +0200 Subject: [PATCH 06/11] fix test build --- subxt/examples/setup_reconnecting_rpc_client.rs | 4 ++-- subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs | 7 ++++--- subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs | 10 ++++------ 3 files changed, 10 insertions(+), 11 deletions(-) diff --git a/subxt/examples/setup_reconnecting_rpc_client.rs b/subxt/examples/setup_reconnecting_rpc_client.rs index c393f2cffa..b61f7716fe 100644 --- a/subxt/examples/setup_reconnecting_rpc_client.rs +++ b/subxt/examples/setup_reconnecting_rpc_client.rs @@ -9,7 +9,7 @@ use std::time::Duration; use futures::StreamExt; -use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExponentialBackoff}; +use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff}; use subxt::{OnlineClient, PolkadotConfig}; // Generate an interface that we can use from the node's metadata. @@ -21,7 +21,7 @@ async fn main() -> Result<(), Box> { tracing_subscriber::fmt::init(); // Create a new client with with a reconnecting RPC client. - let rpc = Client::builder() + let rpc = RpcClient::builder() // Reconnect with exponential backoff // // This API is "iterator-like" and we use `take` to limit the number of retries. diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs index f7ace2b0a9..6fd5029b87 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs @@ -16,18 +16,19 @@ //! ```no_run //! use std::time::Duration; //! use futures::StreamExt; -//! use subxt::backend::rpc::reconnecting_rpc_client::{Client, ExpontentialBackoff}; +//! use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff}; //! use subxt::{OnlineClient, PolkadotConfig}; //! +//! #[tokio::main] //! async fn main() { //! let rpc = RpcClient::builder() -//! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10)) +//! .retry_policy(ExponentialBackoff::from_millis(100).max_delay(Duration::from_secs(10))) //! .build("ws://localhost:9944".to_string()) //! .await //! .unwrap(); //! //! let subxt_client: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await.unwrap(); -//! let blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap(); +//! let mut blocks_sub = subxt_client.blocks().subscribe_finalized().await.unwrap(); //! //! while let Some(block) = blocks_sub.next().await { //! let block = match block { diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index e5ff0781cf..7577a51bdb 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -2,8 +2,6 @@ // This file is dual-licensed as Apache-2.0 or GPL-3.0. // see LICENSE for license details. -#![cfg(test)] - use super::*; use futures::{future::Either, FutureExt}; @@ -18,7 +16,7 @@ async fn call_works() { tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); - let client = Client::builder().build(addr).await.unwrap(); + let client = RpcClient::builder().build(addr).await.unwrap(); assert!(client.request("say_hello".to_string(), None).await.is_ok(),) } @@ -28,7 +26,7 @@ async fn sub_works() { tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); - let client = Client::builder() + let client = RpcClient::builder() .retry_policy(ExponentialBackoff::from_millis(50)) .build(addr) .await @@ -50,7 +48,7 @@ async fn sub_works() { async fn sub_with_reconnect() { tracing_subscriber::fmt::init(); let (handle, addr) = run_server().await.unwrap(); - let client = Client::builder().build(addr.clone()).await.unwrap(); + let client = RpcClient::builder().build(addr.clone()).await.unwrap(); let mut sub = client .subscribe( @@ -94,7 +92,7 @@ async fn call_with_reconnect() { tracing_subscriber::fmt::init(); let (handle, addr) = run_server_with_settings(None, true).await.unwrap(); - let client = Arc::new(Client::builder().build(addr.clone()).await.unwrap()); + let client = Arc::new(RpcClient::builder().build(addr.clone()).await.unwrap()); let req_fut = client.request("say_hello".to_string(), None).boxed(); let timeout_fut = tokio::time::sleep(Duration::from_secs(5)); From d9424712f3b4b32e1ecd7788389a879c0d33ac0d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 23 Aug 2024 16:55:47 +0200 Subject: [PATCH 07/11] cargo fmt --- subxt/examples/setup_reconnecting_rpc_client.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/subxt/examples/setup_reconnecting_rpc_client.rs b/subxt/examples/setup_reconnecting_rpc_client.rs index b61f7716fe..030e664a71 100644 --- a/subxt/examples/setup_reconnecting_rpc_client.rs +++ b/subxt/examples/setup_reconnecting_rpc_client.rs @@ -9,7 +9,7 @@ use std::time::Duration; use futures::StreamExt; -use subxt::backend::rpc::reconnecting_rpc_client::{RpcClient, ExponentialBackoff}; +use subxt::backend::rpc::reconnecting_rpc_client::{ExponentialBackoff, RpcClient}; use subxt::{OnlineClient, PolkadotConfig}; // Generate an interface that we can use from the node's metadata. From f0d8282c975cb480a3ebf5f07fd64008ba6e9f64 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 27 Aug 2024 11:55:39 +0200 Subject: [PATCH 08/11] remove reconnect apis --- .../examples/setup_reconnecting_rpc_client.rs | 18 ---- .../rpc/reconnecting_rpc_client/mod.rs | 32 +------ .../rpc/reconnecting_rpc_client/tests.rs | 12 +-- .../rpc/reconnecting_rpc_client/utils.rs | 85 ------------------- 4 files changed, 10 insertions(+), 137 deletions(-) diff --git a/subxt/examples/setup_reconnecting_rpc_client.rs b/subxt/examples/setup_reconnecting_rpc_client.rs index 030e664a71..42ca1a766e 100644 --- a/subxt/examples/setup_reconnecting_rpc_client.rs +++ b/subxt/examples/setup_reconnecting_rpc_client.rs @@ -53,22 +53,6 @@ async fn main() -> Result<(), Box> { let api: OnlineClient = OnlineClient::from_rpc_client(rpc.clone()).await?; - // Optionally print if the RPC client reconnects. - let rpc2 = rpc.clone(); - tokio::spawn(async move { - loop { - // The connection was lost and the client is trying to reconnect. - let reconnected = rpc2.reconnect_initiated().await; - let now = std::time::Instant::now(); - // The connection was re-established. - reconnected.await; - println!( - "RPC client reconnection took `{}s`", - now.elapsed().as_secs() - ); - } - }); - // Run for at most 100 blocks and print a bunch of information about it. // // The subscription is automatically re-started when the RPC client has reconnected. @@ -96,7 +80,5 @@ async fn main() -> Result<(), Box> { println!("Block #{block_number} ({block_hash})"); } - println!("RPC client reconnected `{}` times", rpc.reconnect_count()); - Ok(()) } diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs index 6fd5029b87..875f3f34c1 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/mod.rs @@ -63,7 +63,7 @@ use super::{RawRpcFuture, RawRpcSubscription, RpcClientT}; use crate::error::RpcError as SubxtRpcError; use finito::Retry; -use futures::{Future, FutureExt, Stream, StreamExt, TryStreamExt}; +use futures::{FutureExt, Stream, StreamExt, TryStreamExt}; use jsonrpsee::core::{ client::{ Client as WsClient, ClientT, Subscription as RpcSubscription, SubscriptionClientT, @@ -78,7 +78,6 @@ use tokio::sync::{ oneshot, Notify, }; use utils::display_close_reason; -use utils::{reconnect_channel, ReconnectRx, ReconnectTx}; // re-exports pub use finito::{ExponentialBackoff, FibonacciBackoff, FixedInterval}; @@ -192,7 +191,6 @@ impl std::fmt::Debug for Subscription { #[derive(Clone, Debug)] pub struct RpcClient { tx: mpsc::UnboundedSender, - reconnect: ReconnectRx, } /// Builder for [`Client`]. @@ -367,14 +365,10 @@ where platform::ws_client(url.as_ref(), &self) }) .await?; - let (reconn_tx, reconn_rx) = reconnect_channel(); - platform::spawn(background_task(client, rx, url, reconn_tx, self)); + platform::spawn(background_task(client, rx, url, self)); - Ok(RpcClient { - tx, - reconnect: reconn_rx, - }) + Ok(RpcClient { tx }) } } @@ -420,20 +414,6 @@ impl RpcClient { .map_err(|_| Error::Dropped)?; rx.await.map_err(|_| Error::Dropped)? } - - /// A future that resolves when the client has initiated a reconnection. - /// This method returns another future that resolves when the client has reconnected. - /// - /// This may be called multiple times. - pub async fn reconnect_initiated(&self) -> impl Future + '_ { - self.reconnect.reconnect_started().await; - self.reconnect.reconnected() - } - - /// Get how many times the client has reconnected successfully. - pub fn reconnect_count(&self) -> usize { - self.reconnect.count() - } } impl RpcClientT for RpcClient { @@ -483,7 +463,6 @@ async fn background_task

( mut client: Arc, mut rx: UnboundedReceiver, url: String, - reconn: ReconnectTx, client_builder: RpcClientBuilder

, ) where P: Iterator + Send + 'static + Clone, @@ -505,7 +484,6 @@ async fn background_task

( _ = client.on_disconnect() => { let params = ReconnectParams { url: &url, - reconnect: reconn.clone(), client_builder: &client_builder, close_reason: client.disconnect_reason().await, }; @@ -633,7 +611,6 @@ async fn subscription_handler( struct ReconnectParams<'a, P> { url: &'a str, - reconnect: ReconnectTx, client_builder: &'a RpcClientBuilder

, close_reason: RpcError, } @@ -644,7 +621,6 @@ where { let ReconnectParams { url, - reconnect, client_builder, close_reason, } = params; @@ -652,14 +628,12 @@ where let retry_policy = client_builder.retry_policy.clone(); tracing::debug!(target: LOG_TARGET, "Connection to {url} was closed: `{}`; starting to reconnect", display_close_reason(&close_reason)); - reconnect.reconnect_initiated(); let client = Retry::new(retry_policy.clone(), || { platform::ws_client(url, client_builder) }) .await?; - reconnect.reconnected(); tracing::debug!(target: LOG_TARGET, "Connection to {url} was successfully re-established"); Ok(client) diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index 7577a51bdb..48d00b3064 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -60,7 +60,9 @@ async fn sub_with_reconnect() { .unwrap(); let _ = handle.send(()); - let reconnected = client.reconnect_initiated().await; + + // Hack to wait for the server to restart. + tokio::time::sleep(Duration::from_millis(100)).await; assert!(matches!(sub.next().await, Some(Ok(_)))); assert!(matches!( @@ -70,9 +72,9 @@ async fn sub_with_reconnect() { // Restart the server. let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); - reconnected.await; - assert_eq!(client.reconnect_count(), 1); + // Hack to wait for the server to restart. + tokio::time::sleep(Duration::from_millis(100)).await; // Subscription should work after reconnect. let mut sub = client @@ -105,12 +107,12 @@ async fn call_with_reconnect() { // Close the connection with a pending call. let _ = handle.send(()); - let reconnected = client.reconnect_initiated().await; // Restart the server let (_handle, _) = run_server_with_settings(Some(&addr), false).await.unwrap(); - reconnected.await; + // Hack to wait for the server to restart. + tokio::time::sleep(Duration::from_millis(100)).await; // This call should fail because reconnect. assert!(req_fut.await.is_err()); diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs index 679c8c51cb..880708dcdf 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/utils.rs @@ -5,91 +5,6 @@ //! Utils. use crate::backend::rpc::reconnecting_rpc_client::RpcError; -use std::{ - sync::atomic::{AtomicUsize, Ordering}, - sync::Arc, -}; -use tokio::sync::Notify; - -#[derive(Clone, Debug)] -pub(crate) struct ReconnectCounter(Arc); - -impl Default for ReconnectCounter { - fn default() -> Self { - Self::new() - } -} - -impl ReconnectCounter { - pub fn new() -> Self { - Self(Arc::new(AtomicUsize::new(0))) - } - - pub fn get(&self) -> usize { - self.0.load(Ordering::SeqCst) - } - - pub fn inc(&self) { - self.0.fetch_add(1, Ordering::SeqCst); - } -} - -pub(crate) fn reconnect_channel() -> (ReconnectTx, ReconnectRx) { - let count = ReconnectCounter::new(); - let reconn_init = Arc::new(Notify::new()); - let reconn_compl = Arc::new(Notify::new()); - ( - ReconnectTx { - reconn_init: reconn_init.clone(), - reconn_compl: reconn_compl.clone(), - count: count.clone(), - }, - ReconnectRx { - reconn_init, - reconn_compl, - count, - }, - ) -} - -#[derive(Debug, Clone)] -pub(crate) struct ReconnectTx { - reconn_init: Arc, - reconn_compl: Arc, - count: ReconnectCounter, -} - -impl ReconnectTx { - pub fn reconnect_initiated(&self) { - self.reconn_init.notify_one(); - } - - pub fn reconnected(&self) { - self.reconn_compl.notify_one(); - self.count.inc(); - } -} - -#[derive(Debug, Clone)] -pub(crate) struct ReconnectRx { - reconn_init: Arc, - reconn_compl: Arc, - count: ReconnectCounter, -} - -impl ReconnectRx { - pub async fn reconnect_started(&self) { - self.reconn_init.notified().await; - } - - pub async fn reconnected(&self) { - self.reconn_compl.notified().await; - } - - pub fn count(&self) -> usize { - self.count.get() - } -} pub fn display_close_reason(err: &RpcError) -> String { match err { From fe6e62d2387201863696b8f629e26732217cae30 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 27 Aug 2024 14:54:33 +0200 Subject: [PATCH 09/11] Update testing/wasm-rpc-tests/tests/wasm.rs --- testing/wasm-rpc-tests/tests/wasm.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/testing/wasm-rpc-tests/tests/wasm.rs b/testing/wasm-rpc-tests/tests/wasm.rs index e708c0be33..a62102333d 100644 --- a/testing/wasm-rpc-tests/tests/wasm.rs +++ b/testing/wasm-rpc-tests/tests/wasm.rs @@ -40,7 +40,6 @@ async fn wasm_ws_transport_works() { async fn reconnecting_rpc_client_ws_transport_works() { let rpc = ReconnectingRpcClient::builder().build("ws://127.0.0.1:9944".to_string()).await.unwrap(); let client = subxt::client::OnlineClient::::from_rpc_client(rpc.clone()).await.unwrap(); - let mut stream = client.backend().stream_best_block_headers().await.unwrap(); assert!(stream.next().await.is_some()); } From 7f271ca3facc9545b9c1ba25578cf2adc748963f Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 27 Aug 2024 14:54:56 +0200 Subject: [PATCH 10/11] Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs --- subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index 48d00b3064..b215ce8305 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -15,7 +15,6 @@ use jsonrpsee::server::{ async fn call_works() { tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); - let client = RpcClient::builder().build(addr).await.unwrap(); assert!(client.request("say_hello".to_string(), None).await.is_ok(),) From 37727c08d17d81b2d6f905e536bbf51a3c5a9a2d Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Tue, 27 Aug 2024 14:55:17 +0200 Subject: [PATCH 11/11] Update subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs --- subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs index b215ce8305..f5571aadb7 100644 --- a/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs +++ b/subxt/src/backend/rpc/reconnecting_rpc_client/tests.rs @@ -16,7 +16,6 @@ async fn call_works() { tracing_subscriber::fmt::init(); let (_handle, addr) = run_server().await.unwrap(); let client = RpcClient::builder().build(addr).await.unwrap(); - assert!(client.request("say_hello".to_string(), None).await.is_ok(),) }