From 958aee9a6153c1391dc7b16d0ea9f2df93f44d15 Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Thu, 23 May 2024 18:28:49 +0200 Subject: [PATCH 1/2] refactor(client): remove MaxSlots limit --- client/http-client/src/client.rs | 20 ++------ core/src/client/async_client/mod.rs | 15 +++--- core/src/client/error.rs | 3 -- core/src/client/mod.rs | 73 +++-------------------------- tests/tests/integration_tests.rs | 38 --------------- 5 files changed, 17 insertions(+), 132 deletions(-) diff --git a/client/http-client/src/client.rs b/client/http-client/src/client.rs index 7e4fcd1cdf..76d6c1e25e 100644 --- a/client/http-client/src/client.rs +++ b/client/http-client/src/client.rs @@ -104,12 +104,6 @@ impl HttpClientBuilder { self } - /// Set max concurrent requests. - pub fn max_concurrent_requests(mut self, max: usize) -> Self { - self.max_concurrent_requests = max; - self - } - /// Force to use the rustls native certificate store. /// /// Since multiple certificate stores can be optionally enabled, this option will @@ -198,7 +192,6 @@ where let Self { max_request_size, max_response_size, - max_concurrent_requests, request_timeout, certificate_store, id_kind, @@ -220,11 +213,7 @@ where .build(target) .map_err(|e| Error::Transport(e.into()))?; - Ok(HttpClient { - transport, - id_manager: Arc::new(RequestIdManager::new(max_concurrent_requests, id_kind)), - request_timeout, - }) + Ok(HttpClient { transport, id_manager: Arc::new(RequestIdManager::new(id_kind)), request_timeout }) } } @@ -303,8 +292,7 @@ where R: DeserializeOwned, Params: ToRpcParams + Send, { - let guard = self.id_manager.next_request_id()?; - let id = guard.inner(); + let id = self.id_manager.next_request_id(); let params = params.to_rpc_params()?; let request = RequestSer::borrowed(&id, &method, params.as_deref()); @@ -340,8 +328,8 @@ where R: DeserializeOwned + fmt::Debug + 'a, { let batch = batch.build()?; - let guard = self.id_manager.next_request_id()?; - let id_range = generate_batch_id_range(&guard, batch.len() as u64)?; + let id = self.id_manager.next_request_id(); + let id_range = generate_batch_id_range(id, batch.len() as u64)?; let mut batch_request = Vec::with_capacity(batch.len()); for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 0bd8263ff9..0af94f689a 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -345,7 +345,7 @@ impl ClientBuilder { to_back: to_back.clone(), request_timeout: self.request_timeout, error: ErrorFromBack::new(to_back, disconnect_reason), - id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), + id_manager: RequestIdManager::new(self.id_kind), max_log_length: self.max_log_length, on_exit: Some(client_dropped_tx), } @@ -479,7 +479,7 @@ impl ClientT for Client { Params: ToRpcParams + Send, { // NOTE: we use this to guard against max number of concurrent requests. - let _req_id = self.id_manager.next_request_id()?; + let _req_id = self.id_manager.next_request_id(); let params = params.to_rpc_params()?; let notif = NotificationSer::borrowed(&method, params.as_deref()); @@ -505,8 +505,7 @@ impl ClientT for Client { Params: ToRpcParams + Send, { let (send_back_tx, send_back_rx) = oneshot::channel(); - let guard = self.id_manager.next_request_id()?; - let id = guard.inner(); + let id = self.id_manager.next_request_id(); let params = params.to_rpc_params()?; let raw = @@ -540,8 +539,8 @@ impl ClientT for Client { R: DeserializeOwned, { let batch = batch.build()?; - let guard = self.id_manager.next_request_id()?; - let id_range = generate_batch_id_range(&guard, batch.len() as u64)?; + let id = self.id_manager.next_request_id(); + let id_range = generate_batch_id_range(id, batch.len() as u64)?; let mut batches = Vec::with_capacity(batch.len()); for ((method, params), id) in batch.into_iter().zip(id_range.clone()) { @@ -621,8 +620,8 @@ impl SubscriptionClientT for Client { return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into()); } - let guard = self.id_manager.next_request_two_ids()?; - let (id_sub, id_unsub) = guard.inner(); + let id_sub = self.id_manager.next_request_id(); + let id_unsub = self.id_manager.next_request_id(); let params = params.to_rpc_params()?; let raw = serde_json::to_string(&RequestSer::borrowed(&id_sub, &subscribe_method, params.as_deref())) diff --git a/core/src/client/error.rs b/core/src/client/error.rs index 1948af9eee..ab2d46dcce 100644 --- a/core/src/client/error.rs +++ b/core/src/client/error.rs @@ -54,9 +54,6 @@ pub enum Error { /// Request timeout #[error("Request timeout")] RequestTimeout, - /// Max number of request slots exceeded. - #[error("Max concurrent requests exceeded")] - MaxSlotsExceeded, /// Custom error. #[error("Custom error: {0}")] Custom(String), diff --git a/core/src/client/mod.rs b/core/src/client/mod.rs index 0bd6eb40ba..ce071f15a4 100644 --- a/core/src/client/mod.rs +++ b/core/src/client/mod.rs @@ -455,10 +455,6 @@ impl Drop for Subscription { #[derive(Debug)] /// Keep track of request IDs. pub struct RequestIdManager { - // Current pending requests. - current_pending: Arc<()>, - /// Max concurrent pending requests allowed. - max_concurrent_requests: usize, /// Get the next request ID. current_id: CurrentId, /// Request ID type. @@ -467,38 +463,15 @@ pub struct RequestIdManager { impl RequestIdManager { /// Create a new `RequestIdGuard` with the provided concurrency limit. - pub fn new(limit: usize, id_kind: IdKind) -> Self { - Self { current_pending: Arc::new(()), max_concurrent_requests: limit, current_id: CurrentId::new(), id_kind } - } - - fn get_slot(&self) -> Result, Error> { - // Strong count is 1 at start, so that's why we use `>` and not `>=`. - if Arc::strong_count(&self.current_pending) > self.max_concurrent_requests { - Err(Error::MaxSlotsExceeded) - } else { - Ok(self.current_pending.clone()) - } + pub fn new(id_kind: IdKind) -> Self { + Self { current_id: CurrentId::new(), id_kind } } /// Attempts to get the next request ID. /// /// Fails if request limit has been exceeded. - pub fn next_request_id(&self) -> Result>, Error> { - let rc = self.get_slot()?; - let id = self.id_kind.into_id(self.current_id.next()); - - Ok(RequestIdGuard { _rc: rc, id }) - } - - /// Attempts to get fetch two ids (used for subscriptions) but only - /// occupy one slot in the request guard. - /// - /// Fails if request limit has been exceeded. - pub fn next_request_two_ids(&self) -> Result, Id<'static>)>, Error> { - let rc = self.get_slot()?; - let id1 = self.id_kind.into_id(self.current_id.next()); - let id2 = self.id_kind.into_id(self.current_id.next()); - Ok(RequestIdGuard { _rc: rc, id: (id1, id2) }) + pub fn next_request_id(&self) -> Id<'static> { + self.id_kind.into_id(self.current_id.next()) } /// Get a handle to the `IdKind`. @@ -507,21 +480,6 @@ impl RequestIdManager { } } -/// Reference counted request ID. -#[derive(Debug)] -pub struct RequestIdGuard { - id: T, - /// Reference count decreased when dropped. - _rc: Arc<()>, -} - -impl RequestIdGuard { - /// Get the actual ID or IDs. - pub fn inner(&self) -> T { - self.id.clone() - } -} - /// What certificate store to use #[derive(Clone, Copy, Debug, PartialEq, Eq)] #[non_exhaustive] @@ -568,8 +526,8 @@ impl CurrentId { } /// Generate a range of IDs to be used in a batch request. -pub fn generate_batch_id_range(guard: &RequestIdGuard, len: u64) -> Result, Error> { - let id_start = guard.inner().try_parse_inner_as_number()?; +pub fn generate_batch_id_range(id: Id, len: u64) -> Result, Error> { + let id_start = id.try_parse_inner_as_number()?; let id_end = id_start .checked_add(len) .ok_or_else(|| Error::Custom("BatchID range wrapped; restart the client or try again later".to_string()))?; @@ -704,22 +662,3 @@ fn subscription_channel(max_buf_size: usize) -> (SubscriptionSender, Subscriptio (SubscriptionSender { inner: tx, lagged: lagged_tx }, SubscriptionReceiver { inner: rx, lagged: lagged_rx }) } - -#[cfg(test)] -mod tests { - use super::{IdKind, RequestIdManager}; - - #[test] - fn request_id_guard_works() { - let manager = RequestIdManager::new(2, IdKind::Number); - let _first = manager.next_request_id().unwrap(); - - { - let _second = manager.next_request_two_ids().unwrap(); - assert!(manager.next_request_id().is_err()); - // second dropped here. - } - - assert!(manager.next_request_id().is_ok()); - } -} diff --git a/tests/tests/integration_tests.rs b/tests/tests/integration_tests.rs index 672dfb782e..d6cd9dd313 100644 --- a/tests/tests/integration_tests.rs +++ b/tests/tests/integration_tests.rs @@ -275,23 +275,6 @@ async fn http_method_call_str_id_works() { assert_eq!(&response, "hello"); } -#[tokio::test] -async fn http_concurrent_method_call_limits_works() { - init_logger(); - - let server_addr = server().await; - let uri = format!("http://{}", server_addr); - let client = HttpClientBuilder::default().max_concurrent_requests(1).build(&uri).unwrap(); - - let (first, second) = tokio::join!( - client.request::("say_hello", rpc_params!()), - client.request::("say_hello", rpc_params![]), - ); - - assert!(first.is_ok()); - assert!(matches!(second, Err(Error::MaxSlotsExceeded))); -} - #[tokio::test] async fn ws_subscription_several_clients() { init_logger(); @@ -418,27 +401,6 @@ async fn ws_making_more_requests_than_allowed_should_not_deadlock() { } } -#[tokio::test] -async fn http_making_more_requests_than_allowed_should_not_deadlock() { - init_logger(); - - let server_addr = server().await; - let server_url = format!("http://{}", server_addr); - let client = HttpClientBuilder::default().max_concurrent_requests(2).build(&server_url).unwrap(); - let client = Arc::new(client); - - let mut requests = Vec::new(); - - for _ in 0..6 { - let c = client.clone(); - requests.push(tokio::spawn(async move { c.request::("say_hello", rpc_params![]).await })); - } - - for req in requests { - let _ = req.await.unwrap(); - } -} - #[tokio::test] async fn https_works() { init_logger(); From e72df136f93f9ade666843ded0432e8ab1df1eed Mon Sep 17 00:00:00 2001 From: Niklas Adolfsson Date: Fri, 24 May 2024 09:01:55 +0200 Subject: [PATCH 2/2] fix wasm build --- core/src/client/async_client/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/client/async_client/mod.rs b/core/src/client/async_client/mod.rs index 0af94f689a..5b00b0dd36 100644 --- a/core/src/client/async_client/mod.rs +++ b/core/src/client/async_client/mod.rs @@ -403,7 +403,7 @@ impl ClientBuilder { to_back: to_back.clone(), request_timeout: self.request_timeout, error: ErrorFromBack::new(to_back, disconnect_reason), - id_manager: RequestIdManager::new(self.max_concurrent_requests, self.id_kind), + id_manager: RequestIdManager::new(self.id_kind), max_log_length: self.max_log_length, on_exit: Some(client_dropped_tx), }