Skip to content

Commit

Permalink
feat: async API when Response has been processed. (#1281)
Browse files Browse the repository at this point in the history
* feat: add `RpcModule::register_raw_method`

* add proc macro support

* rename API

* simplify API with MethodResponse::notify_when_sent

* improve notify API

* fix nits

* introduce ResponsePayloadV2

* impl ResponsePayloadV2 for T

* cleanup

* client: proc macro support for custom ret_ty

* add tests

* address grumbles

* remove unused code

* fix tests

* proc: revert unrelated changes

* remove panics; move should be enough

* bring back UI tests

* grumbles: remove NotifiedError

* break stuff for uniform API

* make more stuff private

* remove ResponseErrorUnit type alias

* fix ui tests

* Update proc-macros/src/render_server.rs

* Rename ws_notify_on_method_answered.rs to response_payload_notify_on_response.rs

* remove unit_error APIs

* replace notify_on_x with notify_on_completion

* Update server/src/transport/ws.rs
  • Loading branch information
niklasad1 authored Feb 6, 2024
1 parent 8f73dbe commit 387f52f
Show file tree
Hide file tree
Showing 22 changed files with 971 additions and 416 deletions.
107 changes: 57 additions & 50 deletions core/src/client/async_client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ mod utils;
use crate::client::async_client::helpers::{process_subscription_close_response, InnerBatchResponse};
use crate::client::async_client::utils::MaybePendingFutures;
use crate::client::{
BatchMessage, BatchResponse, ClientT, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT, Error
BatchMessage, BatchResponse, ClientT, Error, ReceivedMessage, RegisterNotificationMessage, RequestMessage,
Subscription, SubscriptionClientT, SubscriptionKind, SubscriptionMessage, TransportReceiverT, TransportSenderT,
};
use crate::error::RegisterMethodError;
use crate::params::{BatchRequestBuilder, EmptyBatchRequest};
Expand Down Expand Up @@ -64,7 +64,7 @@ use serde::de::DeserializeOwned;
use tokio::sync::{mpsc, oneshot};
use tracing::instrument;

use self::utils::{IntervalStream, InactivityCheck};
use self::utils::{InactivityCheck, IntervalStream};

use super::{generate_batch_id_range, FrontToBack, IdKind, RequestIdManager};

Expand Down Expand Up @@ -94,11 +94,7 @@ pub struct PingConfig {

impl Default for PingConfig {
fn default() -> Self {
Self {
ping_interval: Duration::from_secs(30),
max_failures: 1,
inactive_limit: Duration::from_secs(40),
}
Self { ping_interval: Duration::from_secs(30), max_failures: 1, inactive_limit: Duration::from_secs(40) }
}
}

Expand Down Expand Up @@ -126,9 +122,9 @@ impl PingConfig {

/// Configure how many times the connection is allowed be
/// inactive until the connection is closed.
///
///
/// # Panics
///
///
/// This method panics if `max` == 0.
pub fn max_failures(mut self, max: usize) -> Self {
assert!(max > 0);
Expand All @@ -137,7 +133,6 @@ impl PingConfig {
}
}


#[derive(Debug, Default, Clone)]
pub(crate) struct ThreadSafeRequestManager(Arc<std::sync::Mutex<RequestManager>>);

Expand Down Expand Up @@ -179,7 +174,9 @@ impl ErrorFromBack {
// This should never happen because the receiving end is still alive.
// Before shutting down the background task a error message should
// be emitted.
Err(_) => Error::Custom("Error reason could not be found. This is a bug. Please open an issue.".to_string()),
Err(_) => Error::Custom(
"Error reason could not be found. This is a bug. Please open an issue.".to_string(),
),
});
*write = Some(ReadErrorOnce::Read(arc_err.clone()));
arc_err
Expand Down Expand Up @@ -281,7 +278,7 @@ impl ClientBuilder {
}

/// Enable WebSocket ping/pong on the client.
///
///
/// This only works if the transport supports WebSocket pings.
///
/// Default: pings are disabled.
Expand Down Expand Up @@ -332,11 +329,16 @@ impl ClientBuilder {
Some(p) => {
// NOTE: This emits a tick immediately to sync how the `inactive_interval` works
// because it starts measuring when the client start-ups.
let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(p.ping_interval)));
let ping_interval = IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(
tokio::time::interval(p.ping_interval),
));

let inactive_interval = {
let inactive_interval = {
let start = tokio::time::Instant::now() + p.inactive_limit;
IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(start, p.inactive_limit)))
IntervalStream::new(tokio_stream::wrappers::IntervalStream::new(tokio::time::interval_at(
start,
p.inactive_limit,
)))
};

let inactivity_check = InactivityCheck::new(p.inactive_limit, p.max_failures);
Expand Down Expand Up @@ -386,8 +388,8 @@ impl ClientBuilder {
{
use futures_util::stream::Pending;

type PendingIntervalStream = IntervalStream<Pending<()>>;
type PendingIntervalStream = IntervalStream<Pending<()>>;

let (to_back, from_front) = mpsc::channel(self.max_concurrent_requests);
let (err_to_front, err_from_back) = oneshot::channel::<Error>();
let max_buffer_capacity_per_subscription = self.max_buffer_capacity_per_subscription;
Expand Down Expand Up @@ -466,12 +468,12 @@ impl Client {

/// This is similar to [`Client::on_disconnect`] but it can be used to get
/// the reason why the client was disconnected but it's not cancel-safe.
///
///
/// The typical use-case is that this method will be called after
/// [`Client::on_disconnect`] has returned in a "select loop".
///
///
/// # Cancel-safety
///
///
/// This method is not cancel-safe
pub async fn disconnect_reason(&self) -> Error {
self.error.read_error().await
Expand Down Expand Up @@ -554,7 +556,7 @@ impl ClientT for Client {
Err(_) => return Err(self.disconnect_reason().await),
};

rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&json_value), id), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&json_value), id), self.max_log_length);

serde_json::from_value(json_value).map_err(Error::ParseError)
}
Expand Down Expand Up @@ -643,9 +645,7 @@ impl SubscriptionClientT for Client {
Notif: DeserializeOwned,
{
if subscribe_method == unsubscribe_method {
return Err(RegisterMethodError::SubscriptionNameConflict(
unsubscribe_method.to_owned(),
).into());
return Err(RegisterMethodError::SubscriptionNameConflict(unsubscribe_method.to_owned()).into());
}

let guard = self.id_manager.next_request_two_ids()?;
Expand Down Expand Up @@ -680,7 +680,7 @@ impl SubscriptionClientT for Client {
Err(_) => return Err(self.disconnect_reason().await),
};

rx_log_from_json(&Response::new(ResponsePayload::result_borrowed(&sub_id), id_unsub), self.max_log_length);
rx_log_from_json(&Response::new(ResponsePayload::success_borrowed(&sub_id), id_unsub), self.max_log_length);

Ok(Subscription::new(self.to_back.clone(), notifs_rx, SubscriptionKind::Subscription(sub_id)))
}
Expand Down Expand Up @@ -901,8 +901,7 @@ async fn handle_frontend_messages<S: TransportSenderT>(
if manager.lock().insert_notification_handler(&reg.method, subscribe_tx).is_ok() {
let _ = reg.send_back.send(Ok((subscribe_rx, reg.method)));
} else {
let _ =
reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
let _ = reg.send_back.send(Err(RegisterMethodError::AlreadyRegistered(reg.method).into()));
}
}
// User dropped the NotificationHandler for this method
Expand Down Expand Up @@ -950,30 +949,30 @@ where

// This is safe because `tokio::time::Interval`, `tokio::mpsc::Sender` and `tokio::mpsc::Receiver`
// are cancel-safe.
let res = loop {
tokio::select! {
biased;
_ = close_tx.closed() => break Ok(()),
maybe_msg = from_frontend.recv() => {
let Some(msg) = maybe_msg else {
break Ok(());
};

if let Err(e) =
handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
{
tracing::error!(target: LOG_TARGET, "ws send failed: {e}");
break Err(Error::Transport(e.into()));
}
let res = loop {
tokio::select! {
biased;
_ = close_tx.closed() => break Ok(()),
maybe_msg = from_frontend.recv() => {
let Some(msg) = maybe_msg else {
break Ok(());
};

if let Err(e) =
handle_frontend_messages(msg, &manager, &mut sender, max_buffer_capacity_per_subscription).await
{
tracing::error!(target: LOG_TARGET, "ws send failed: {e}");
break Err(Error::Transport(e.into()));
}
_ = ping_interval.next() => {
if let Err(err) = sender.send_ping().await {
tracing::error!(target: LOG_TARGET, "Send ws ping failed: {err}");
break Err(Error::Transport(err.into()));
}
}
_ = ping_interval.next() => {
if let Err(err) = sender.send_ping().await {
tracing::error!(target: LOG_TARGET, "Send ws ping failed: {err}");
break Err(Error::Transport(err.into()));
}
}
};
}
};

from_frontend.close();
let _ = sender.close().await;
Expand All @@ -995,7 +994,15 @@ where
R: TransportReceiverT,
S: Stream + Unpin,
{
let ReadTaskParams { receiver, close_tx, to_send_task, manager, max_buffer_capacity_per_subscription, mut inactivity_check, mut inactivity_stream } = params;
let ReadTaskParams {
receiver,
close_tx,
to_send_task,
manager,
max_buffer_capacity_per_subscription,
mut inactivity_check,
mut inactivity_stream,
} = params;

let backend_event = futures_util::stream::unfold(receiver, |mut receiver| async {
let res = receiver.receive().await;
Expand Down
Loading

0 comments on commit 387f52f

Please sign in to comment.