Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

impl Stream on Subscription and tweak built-in next() method to align #601

Merged
merged 7 commits into from
Dec 10, 2021
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
*/target
**/*.rs.bk
Cargo.lock

.DS_Store

#Added by cargo
#
Expand Down
2 changes: 1 addition & 1 deletion benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ fn run_sub_round_trip(rt: &TokioRuntime, crit: &mut Criterion, client: Arc<impl
})
},
|mut sub| async move {
black_box(sub.next().await.unwrap());
black_box(sub.next().await.transpose().unwrap());
// Note that this benchmark will include costs for measuring `drop` for subscription,
// since it's not possible to combine both `iter_with_setup` and `iter_with_large_drop`.
// To estimate pure cost of method, one should subtract the result of `unsub` bench
Expand Down
2 changes: 1 addition & 1 deletion examples/proc_macro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ async fn main() -> anyhow::Result<()> {

let mut sub: Subscription<Vec<ExampleHash>> =
RpcClient::<ExampleHash, ExampleStorageKey>::subscribe_storage(&client, None).await.unwrap();
assert_eq!(Some(vec![[0; 32]]), sub.next().await.unwrap());
assert_eq!(Some(vec![[0; 32]]), sub.next().await.transpose().unwrap());

Ok(())
}
Expand Down
6 changes: 3 additions & 3 deletions proc-macros/tests/ui/correct/basic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ async fn main() {
assert_eq!(client.request::<bool>("foo_optional_param", rpc_params![1]).await.unwrap(), true);

let mut sub = client.sub().await.unwrap();
let first_recv = sub.next().await.unwrap();
let first_recv = sub.next().await.transpose().unwrap();
assert_eq!(first_recv, Some("Response_A".to_string()));
let second_recv = sub.next().await.unwrap();
let second_recv = sub.next().await.transpose().unwrap();
assert_eq!(second_recv, Some("Response_B".to_string()));

let mut sub = client.sub_with_override_notif_method().await.unwrap();
let recv = sub.next().await.unwrap();
let recv = sub.next().await.transpose().unwrap();
assert_eq!(recv, Some(1));
}
24 changes: 12 additions & 12 deletions tests/tests/integration_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,10 @@ async fn ws_subscription_works() {
let mut foo_sub: Subscription<u64> = client.subscribe("subscribe_foo", None, "unsubscribe_foo").await.unwrap();

for _ in 0..10 {
let hello = hello_sub.next().await.unwrap();
let foo = foo_sub.next().await.unwrap();
assert_eq!(hello, Some("hello from subscription".into()));
assert_eq!(foo, Some(1337));
let hello = hello_sub.next().await.unwrap().unwrap();
let foo = foo_sub.next().await.unwrap().unwrap();
assert_eq!(hello, "hello from subscription".to_string());
assert_eq!(foo, 1337);
}
}

Expand Down Expand Up @@ -181,11 +181,11 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {

// Capacity is `num_sender` + `capacity`
for _ in 0..5 {
assert!(hello_sub.next().await.unwrap().is_some());
assert!(hello_sub.next().await.unwrap().is_ok());
}

// NOTE: this is now unuseable and unregistered.
assert!(hello_sub.next().await.unwrap().is_none());
assert!(hello_sub.next().await.is_none());

// The client should still be useable => make sure it still works.
let _hello_req: JsonValue = client.request("say_hello", None).await.unwrap();
Expand All @@ -194,7 +194,7 @@ async fn ws_subscription_without_polling_doesnt_make_client_unuseable() {
let mut other_sub: Subscription<JsonValue> =
client.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();

other_sub.next().await.unwrap();
other_sub.next().await.unwrap().unwrap();
}

#[tokio::test]
Expand Down Expand Up @@ -285,7 +285,7 @@ async fn server_should_be_able_to_close_subscriptions() {

let res = sub.next().await;

assert!(matches!(res, Err(Error::SubscriptionClosed(_))));
assert!(matches!(res, Some(Err(Error::SubscriptionClosed(_)))));
}

#[tokio::test]
Expand All @@ -297,7 +297,7 @@ async fn ws_close_pending_subscription_when_server_terminated() {

let mut sub: Subscription<String> = c1.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();

assert!(matches!(sub.next().await, Ok(Some(_))));
assert!(matches!(sub.next().await, Some(Ok(_))));

handle.stop().unwrap().await;

Expand All @@ -310,7 +310,7 @@ async fn ws_close_pending_subscription_when_server_terminated() {
for _ in 0..2 {
match sub.next().await {
// All good, exit test
Ok(None) => return,
None => return,
// Try again
_ => continue,
}
Expand Down Expand Up @@ -352,9 +352,9 @@ async fn ws_server_should_stop_subscription_after_client_drop() {

let mut sub: Subscription<usize> = client.subscribe("subscribe_hello", None, "unsubscribe_hello").await.unwrap();

let res = sub.next().await.unwrap();
let res = sub.next().await.unwrap().unwrap();

assert_eq!(res.as_ref(), Some(&1));
assert_eq!(res, 1);
drop(client);
// assert that the server received `SubscriptionClosed` after the client was dropped.
assert!(matches!(rx.next().await.unwrap(), SubscriptionClosedError { .. }));
Expand Down
16 changes: 8 additions & 8 deletions tests/tests/proc_macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,17 +214,17 @@ async fn proc_macros_generic_ws_client_api() {

// Sub without params
let mut sub = client.sub().await.unwrap();
let first_recv = sub.next().await.unwrap();
assert_eq!(first_recv, Some("Response_A".to_string()));
let second_recv = sub.next().await.unwrap();
assert_eq!(second_recv, Some("Response_B".to_string()));
let first_recv = sub.next().await.unwrap().unwrap();
assert_eq!(first_recv, "Response_A".to_string());
let second_recv = sub.next().await.unwrap().unwrap();
assert_eq!(second_recv, "Response_B".to_string());

// Sub with params
let mut sub = client.sub_with_params(42).await.unwrap();
let first_recv = sub.next().await.unwrap();
assert_eq!(first_recv, Some(42));
let second_recv = sub.next().await.unwrap();
assert_eq!(second_recv, Some(42));
let first_recv = sub.next().await.unwrap().unwrap();
assert_eq!(first_recv, 42);
let second_recv = sub.next().await.unwrap().unwrap();
assert_eq!(second_recv, 42);
}

#[tokio::test]
Expand Down
43 changes: 31 additions & 12 deletions types/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,17 @@
use crate::{error::SubscriptionClosedError, v2::SubscriptionId, Error};
use core::marker::PhantomData;
use futures_channel::{mpsc, oneshot};
use futures_util::{future::FutureExt, sink::SinkExt, stream::StreamExt};
use futures_util::{
future::FutureExt,
sink::SinkExt,
stream::{Stream, StreamExt},
};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use serde_json::Value as JsonValue;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::task;

/// Subscription kind
#[derive(Debug)]
Expand Down Expand Up @@ -68,6 +74,10 @@ pub struct Subscription<Notif> {
marker: PhantomData<Notif>,
}

// `Subscription` does not automatically implement this due to `PhantomData<Notif>`,
// but type type has no need to be pinned.
impl<Notif> std::marker::Unpin for Subscription<Notif> {}

impl<Notif> Subscription<Notif> {
/// Create a new subscription.
pub fn new(
Expand Down Expand Up @@ -157,17 +167,26 @@ where
Notif: DeserializeOwned,
{
/// Returns the next notification from the stream.
/// This may return `Ok(None)` if the subscription has been terminated,
/// may happen if the channel becomes full or is dropped.
pub async fn next(&mut self) -> Result<Option<Notif>, Error> {
match self.notifs_rx.next().await {
Some(n) => match serde_json::from_value::<NotifResponse<Notif>>(n) {
Ok(NotifResponse::Ok(parsed)) => Ok(Some(parsed)),
Ok(NotifResponse::Err(e)) => Err(Error::SubscriptionClosed(e)),
Err(e) => Err(Error::ParseError(e)),
},
None => Ok(None),
}
/// This may return `None` if the subscription has been terminated,
/// which may happen if the channel becomes full or is dropped.
pub async fn next(&mut self) -> Option<Result<Notif, Error>> {
StreamExt::next(self).await
}
}

impl<Notif> Stream for Subscription<Notif>
where
Notif: DeserializeOwned,
{
type Item = Result<Notif, Error>;
fn poll_next(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> task::Poll<Option<Self::Item>> {
let n = futures_util::ready!(self.notifs_rx.poll_next_unpin(cx));
let res = n.and_then(|n| match serde_json::from_value::<NotifResponse<Notif>>(n) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does map work here?

I don't mind but clippy doesn't like and_then here...

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ooh good point! I thought I needed and_then but then didn't realise that I then surrounded all of the result arms in Some :)

Ok(NotifResponse::Ok(parsed)) => Some(Ok(parsed)),
Ok(NotifResponse::Err(e)) => Some(Err(Error::SubscriptionClosed(e))),
Err(e) => Some(Err(Error::ParseError(e))),
});
task::Poll::Ready(res)
}
}

Expand Down
6 changes: 3 additions & 3 deletions ws-client/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,18 @@ async fn notification_without_polling_doesnt_make_client_unuseable() {

// Capacity is `num_sender` + `capacity`
for _ in 0..5 {
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_some());
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
}

// NOTE: this is now unuseable and unregistered.
assert!(nh.next().with_default_timeout().await.unwrap().unwrap().is_none());
assert!(nh.next().with_default_timeout().await.unwrap().is_none());

// The same subscription should be possible to register again.
let mut other_nh: Subscription<String> =
client.subscribe_to_method("test").with_default_timeout().await.unwrap().unwrap();

// check that the new subscription works.
assert!(other_nh.next().with_default_timeout().await.unwrap().unwrap().is_some());
assert!(other_nh.next().with_default_timeout().await.unwrap().unwrap().is_ok());
assert!(client.is_connected());
}

Expand Down