diff --git a/Cargo.toml b/Cargo.toml index d39a17ba7b..e331563208 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -24,9 +24,9 @@ client = ["substrate-subxt-client"] log = "0.4.13" thiserror = "1.0.23" futures = "0.3.10" -jsonrpsee-types = "0.2.0-alpha" -jsonrpsee-ws-client = "0.2.0-alpha" -jsonrpsee-http-client = "0.2.0-alpha" +jsonrpsee-types = "0.2.0-alpha.2" +jsonrpsee-ws-client = "0.2.0-alpha.2" +jsonrpsee-http-client = "0.2.0-alpha.2" num-traits = { version = "0.2.14", default-features = false } serde = { version = "1.0.119", features = ["derive"] } serde_json = "1.0.61" diff --git a/client/src/lib.rs b/client/src/lib.rs index 4297498684..4ce993f12b 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -18,7 +18,13 @@ #![deny(missing_docs)] -use async_std::task; +use async_std::{ + sync::{ + Arc, + RwLock, + }, + task, +}; use futures::{ channel::{ mpsc, @@ -36,7 +42,10 @@ use futures01::sync::mpsc as mpsc01; use jsonrpsee_types::{ client::{ FrontToBack, + NotificationMessage, + RequestMessage, Subscription, + SubscriptionMessage, }, error::Error as JsonRpseeError, jsonrpc::{ @@ -75,7 +84,10 @@ use sc_service::{ RpcSession, TaskManager, }; -use std::marker::PhantomData; +use std::{ + collections::HashMap, + marker::PhantomData, +}; use thiserror::Error; const DEFAULT_CHANNEL_SIZE: usize = 16; @@ -102,15 +114,31 @@ impl SubxtClient { pub fn new(mut task_manager: TaskManager, rpc: RpcHandlers) -> Self { let (to_back, from_front) = mpsc::channel(DEFAULT_CHANNEL_SIZE); + let request_id = Arc::new(RwLock::new(u64::MIN)); + let subscriptions = Arc::new(RwLock::new(HashMap::::new())); + task::spawn( select( Box::pin(from_front.for_each(move |message: FrontToBack| { let rpc = rpc.clone(); let (to_front, from_back) = mpsc01::channel(DEFAULT_CHANNEL_SIZE); let session = RpcSession::new(to_front.clone()); + + let request_id = request_id.clone(); + let subscriptions = subscriptions.clone(); + async move { + let request_id = { + let mut request_id = request_id.write().await; + *request_id = request_id.wrapping_add(1); + *request_id + }; + match message { - FrontToBack::Notification { method, params } => { + FrontToBack::Notification(NotificationMessage { + method, + params, + }) => { let request = Request::Single(Call::Notification(Notification { jsonrpc: Version::V2, @@ -122,17 +150,17 @@ impl SubxtClient { } } - FrontToBack::StartRequest { + FrontToBack::StartRequest(RequestMessage { method, params, send_back, - } => { + }) => { let request = Request::Single(Call::MethodCall(MethodCall { jsonrpc: Version::V2, method: method.into(), params: params.into(), - id: Id::Num(0), + id: Id::Num(request_id), })); if let Ok(message) = serde_json::to_string(&request) { if let Some(response) = @@ -153,25 +181,31 @@ impl SubxtClient { } }; - send_back - .send(result) - .expect("failed to send request response"); + send_back.map(|tx| { + tx.send(result) + .expect("failed to send request response") + }); } } } - FrontToBack::Subscribe { + FrontToBack::Subscribe(SubscriptionMessage { subscribe_method, params, - unsubscribe_method: _, + unsubscribe_method, send_back, - } => { + }) => { + { + let mut subscriptions = subscriptions.write().await; + subscriptions.insert(request_id, unsubscribe_method); + } + let request = Request::Single(Call::MethodCall(MethodCall { jsonrpc: Version::V2, method: subscribe_method, params, - id: Id::Num(0), + id: Id::Num(request_id), })); let (mut send_front_sub, send_back_sub) = @@ -188,10 +222,7 @@ impl SubxtClient { Output::Success(_) => { Ok(( send_back_sub, - // NOTE: The ID is used to unsubscribe to specific subscription - // which the `SubxtClient` doesn't support so hardcoding it to `0` - // is fine. - SubscriptionId::Num(0), + SubscriptionId::Num(request_id), )) } Output::Failure(failure) => { @@ -219,16 +250,38 @@ impl SubxtClient { &response ) .expect("failed to decode subscription notif"); - send_front_sub + // ignore send error since the channel is probably closed + let _ = send_front_sub .send(notif.params.result) - .await - .expect("failed to send subscription notif") + .await; } }); } - FrontToBack::SubscriptionClosed(_) => { - // NOTE: unsubscriptions are not supported by SubxtClient. + FrontToBack::SubscriptionClosed(subscription_id) => { + let sub_id = + if let SubscriptionId::Num(num) = subscription_id { + num + } else { + unreachable!("subscription id should be num") + }; + let json_sub_id = jsonrpc::to_value(sub_id).unwrap(); + + let subscriptions = subscriptions.read().await; + if let Some(unsubscribe) = subscriptions.get(&sub_id) { + let request = + Request::Single(Call::MethodCall(MethodCall { + jsonrpc: Version::V2, + method: unsubscribe.into(), + params: jsonrpc::Params::Array(vec![ + json_sub_id, + ]), + id: Id::Num(request_id), + })); + if let Ok(message) = serde_json::to_string(&request) { + rpc.rpc_query(&session, &message).await; + } + } } } } @@ -265,10 +318,10 @@ impl SubxtClient { { self.to_back .clone() - .send(FrontToBack::Notification { + .send(FrontToBack::Notification(NotificationMessage { method: method.into(), params: params.into(), - }) + })) .await .map_err(|e| JsonRpseeError::TransportError(Box::new(e))) } @@ -288,11 +341,11 @@ impl SubxtClient { self.to_back .clone() - .send(FrontToBack::StartRequest { + .send(FrontToBack::StartRequest(RequestMessage { method: method.into(), params: params.into(), - send_back: send_back_tx, - }) + send_back: Some(send_back_tx), + })) .await .map_err(|e| JsonRpseeError::TransportError(Box::new(e)))?; @@ -324,12 +377,12 @@ impl SubxtClient { let (send_back_tx, send_back_rx) = oneshot::channel(); self.to_back .clone() - .send(FrontToBack::Subscribe { + .send(FrontToBack::Subscribe(SubscriptionMessage { subscribe_method, unsubscribe_method, params, send_back: send_back_tx, - }) + })) .await .map_err(JsonRpseeError::Internal)?; diff --git a/src/lib.rs b/src/lib.rs index b1b3c9c0a8..ad1f08db87 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -195,9 +195,8 @@ impl ClientBuilder { let url = self.url.as_deref().unwrap_or("ws://127.0.0.1:9944"); if url.starts_with("ws://") || url.starts_with("wss://") { let mut config = WsConfig::with_url(&url); - // max notifs per subscription capacity. - config.max_subscription_capacity = 4096; - RpcClient::WebSocket(WsClient::new(WsConfig::with_url(&url)).await?) + config.max_notifs_per_subscription = 4096; + RpcClient::WebSocket(WsClient::new(config).await?) } else { let client = HttpClient::new(url, HttpConfig::default())?; RpcClient::Http(Arc::new(client)) diff --git a/src/rpc.rs b/src/rpc.rs index c1a0ef173a..0482c700de 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -171,6 +171,7 @@ pub enum RpcClient { } impl RpcClient { + /// Start a JSON-RPC request. pub async fn request( &self, method: &str, @@ -186,6 +187,7 @@ impl RpcClient { } } + /// Start a JSON-RPC Subscription. pub async fn subscribe( &self, subscribe_method: &str,