Skip to content

Commit

Permalink
xxx fix Sendness issues
Browse files Browse the repository at this point in the history
  • Loading branch information
Twey committed Mar 7, 2024
1 parent 3ea727e commit 54a87b3
Show file tree
Hide file tree
Showing 19 changed files with 81 additions and 102 deletions.
12 changes: 12 additions & 0 deletions examples/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions linera-core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
},
local_node::{LocalNodeClient, LocalNodeError},
node::{
CrossChainMessageDelivery, NodeError, NotificationStream, LocalValidatorNode,
LocalValidatorNodeProvider,
CrossChainMessageDelivery, LocalValidatorNode, LocalValidatorNodeProvider, NodeError,
NotificationStream,
},
notifier::Notifier,
updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater},
Expand Down
4 changes: 2 additions & 2 deletions linera-core/src/local_node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

use crate::{
data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse},
node::{NotificationStream, LocalValidatorNode},
node::{LocalValidatorNode, NotificationStream},
notifier::Notifier,
worker::{Notification, ValidatorWorker, WorkerError, WorkerState},
};
Expand Down Expand Up @@ -446,7 +446,7 @@ where
chain_id: ChainId,
) -> Result<(), LocalNodeError>
where
A: LocalValidatorNode + Clone + 'static,
A: LocalValidatorNode + Clone + 'static,
{
let local_info = self.local_chain_info(chain_id).await?;
let range = BlockHeightRange {
Expand Down
18 changes: 13 additions & 5 deletions linera-core/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{
data_types::{ChainInfoQuery, ChainInfoResponse},
worker::{Notification, WorkerError},
};
use futures::{Future, Stream, stream::{BoxStream, LocalBoxStream}};
use futures::stream::{BoxStream, LocalBoxStream, Stream};
use linera_base::{
crypto::CryptoError,
data_types::{ArithmeticError, BlockHeight},
Expand Down Expand Up @@ -74,7 +74,10 @@ pub trait LocalValidatorNode {
async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError>;

/// Subscribes to receiving notifications for a collection of chains.
async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<Self::NotificationStream, NodeError>;
async fn subscribe(
&mut self,
chains: Vec<ChainId>,
) -> Result<Self::NotificationStream, NodeError>;
}

/// Turn an address into a validator node.
Expand Down Expand Up @@ -106,11 +109,16 @@ pub trait LocalValidatorNodeProvider {
}
}

pub trait ValidatorNodeProvider: LocalValidatorNodeProvider<Node=<Self as ValidatorNodeProvider>::Node> {
type Node: ValidatorNode;
pub trait ValidatorNodeProvider:
LocalValidatorNodeProvider<Node = <Self as ValidatorNodeProvider>::Node>
{
type Node: ValidatorNode + Send + Sync;
}

impl<T: LocalValidatorNodeProvider> ValidatorNodeProvider for T where T::Node: ValidatorNode {
impl<T: LocalValidatorNodeProvider> ValidatorNodeProvider for T
where
T::Node: ValidatorNode + Send + Sync,
{
type Node = <T as LocalValidatorNodeProvider>::Node;
}

Expand Down
17 changes: 10 additions & 7 deletions linera-core/src/unit_tests/client_test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@
use crate::{
client::{ChainClient, ChainClientBuilder},
data_types::*,
node::{CrossChainMessageDelivery, NodeError, LocalNotificationStream, LocalValidatorNode, LocalValidatorNodeProvider},
node::{
CrossChainMessageDelivery, LocalValidatorNodeProvider, NodeError, NotificationStream,
ValidatorNode,
},
notifier::Notifier,
worker::{Notification, ValidatorWorker, WorkerState},
};
Expand Down Expand Up @@ -90,12 +93,12 @@ pub struct LocalValidatorClient<S> {
client: Arc<Mutex<LocalValidator<S>>>,
}

impl<S> LocalValidatorNode for LocalValidatorClient<S>
impl<S> ValidatorNode for LocalValidatorClient<S>
where
S: Storage + Clone + Send + Sync + 'static,
ViewError: From<S::ContextError>,
{
type NotificationStream = LocalNotificationStream;
type NotificationStream = NotificationStream;

async fn handle_block_proposal(
&mut self,
Expand Down Expand Up @@ -141,7 +144,7 @@ where
.await
}

async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<LocalNotificationStream, NodeError> {
async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<NotificationStream, NodeError> {
self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender))
.await
}
Expand Down Expand Up @@ -305,11 +308,11 @@ where
async fn do_subscribe(
self,
chains: Vec<ChainId>,
sender: oneshot::Sender<Result<LocalNotificationStream, NodeError>>,
) -> Result<(), Result<LocalNotificationStream, NodeError>> {
sender: oneshot::Sender<Result<NotificationStream, NodeError>>,
) -> Result<(), Result<NotificationStream, NodeError>> {
let validator = self.client.lock().await;
let rx = validator.notifier.subscribe(chains);
let stream: LocalNotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx));
sender.send(Ok(stream))
}
}
Expand Down
9 changes: 5 additions & 4 deletions linera-core/src/updater.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@

use crate::{
data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse},
node::{CrossChainMessageDelivery, NodeError, LocalValidatorNode},
node::{CrossChainMessageDelivery, LocalValidatorNode, NodeError},
};
use futures::{future, StreamExt};
use futures::{future, Future, StreamExt};
use linera_base::{
data_types::{BlockHeight, Round},
identifiers::ChainId,
Expand Down Expand Up @@ -73,15 +73,16 @@ pub enum CommunicationError<E: fmt::Debug> {
/// Tries to stop early when a quorum is reached. If `grace_period` is not zero, other validators
/// are given this much additional time to contribute to the result, as a fraction of how long it
/// took to reach the quorum.
pub async fn communicate_with_quorum<'a, A, V, K, F, G>(
pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>(
validator_clients: &'a [(ValidatorName, A)],
committee: &Committee,
group_by: G,
execute: F,
) -> Result<(K, Vec<V>), CommunicationError<NodeError>>
where
A: LocalValidatorNode + Clone + 'static,
F: Fn(ValidatorName, A) -> future::LocalBoxFuture<'a, Result<V, NodeError>> + Clone,
F: Clone + Fn(ValidatorName, A) -> R,
R: Future<Output = Result<V, NodeError>> + 'a,
G: Fn(&V) -> K,
K: Hash + PartialEq + Eq + Clone + 'static,
V: 'static,
Expand Down
1 change: 0 additions & 1 deletion linera-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ tokio-util = { workspace = true, optional = true, features = ["codec"] }
tonic-health = { workspace = true, optional = true }
tower.workspace = true
tracing.workspace = true
trait-variant.workspace = true

[dev-dependencies]
linera-rpc = { path = ".", features = ["test"] }
Expand Down
2 changes: 2 additions & 0 deletions linera-rpc/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ impl From<simple::Client> for Client {
}
}

// TODO(TODO): deduplicate

#[cfg(web)]
impl LocalValidatorNode for Client {
type NotificationStream = LocalNotificationStream;
Expand Down
69 changes: 10 additions & 59 deletions linera-rpc/src/grpc/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use linera_core::node::LocalValidatorNode;
#[cfg(not(web))]
use linera_core::node::ValidatorNode;


use linera_version::VersionInfo;

use std::{future::Future, iter, time::Duration};
Expand Down Expand Up @@ -178,7 +177,10 @@ impl LocalValidatorNode for Client {
}

#[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
async fn subscribe(&mut self, chains: Vec<ChainId>) -> Result<LocalNotificationStream, NodeError> {
async fn subscribe(
&mut self,
chains: Vec<ChainId>,
) -> Result<LocalNotificationStream, NodeError> {
let notification_retry_delay = self.notification_retry_delay;
let notification_retries = self.notification_retries;
let mut retry_count = 0;
Expand Down Expand Up @@ -304,7 +306,10 @@ impl ValidatorNode for Client {
}

#[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))]
fn subscribe(&mut self, chains: Vec<ChainId>) -> impl Future<Output = Result<NotificationStream, NodeError>> {
fn subscribe(
&mut self,
chains: Vec<ChainId>,
) -> impl Future<Output = Result<NotificationStream, NodeError>> {
let notification_retry_delay = self.notification_retry_delay;
let notification_retries = self.notification_retries;
let mut retry_count = 0;
Expand Down Expand Up @@ -343,7 +348,7 @@ impl ValidatorNode for Client {
Some((stream, ()))
}
})
.flatten();
.flatten();

// The stream of `Notification`s that inserts increasing delays after retriable errors, and
// terminates after unexpected or fatal errors.
Expand Down Expand Up @@ -381,62 +386,8 @@ impl ValidatorNode for Client {
}
}

#[cfg(web)]
impl mass_client::LocalMassClient for Client {
#[tracing::instrument(skip_all, err)]
async fn send(
&mut self,
requests: Vec<RpcMessage>,
max_in_flight: usize,
) -> Result<Vec<RpcMessage>, mass_client::Error> {
let client = &mut self.client;
let responses = stream::iter(requests)
.map(|request| {
let mut client = client.clone();
async move {
let response = match request {
RpcMessage::BlockProposal(proposal) => {
let request = Request::new((*proposal).try_into()?);
client.handle_block_proposal(request).await?
}
RpcMessage::Certificate(request) => {
let request = Request::new((*request).try_into()?);
client.handle_certificate(request).await?
}
msg => panic!("attempted to send msg: {:?}", msg),
};
match response
.into_inner()
.inner
.ok_or(ProtoConversionError::MissingField)?
{
Inner::ChainInfoResponse(chain_info_response) => {
Ok(Some(RpcMessage::ChainInfoResponse(Box::new(
chain_info_response.try_into()?,
))))
}
Inner::Error(error) => {
let error = bincode::deserialize::<NodeError>(&error)
.map_err(ProtoConversionError::BincodeError)?;
tracing::error!(?error, "received error response");
Ok(None)
}
}
}
})
.buffer_unordered(max_in_flight)
.filter_map(|result: Result<Option<_>, mass_client::Error>| async move {
result.transpose()
})
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
Ok(responses)
}
}

#[cfg(not(web))]
#[async_trait::async_trait]
impl mass_client::MassClient for Client {
#[tracing::instrument(skip_all, err)]
async fn send(
Expand Down
2 changes: 1 addition & 1 deletion linera-rpc/src/grpc/node_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::Client;

use crate::{config::ValidatorPublicNetworkConfig, node_provider::NodeOptions};

use linera_core::node::{NodeError, LocalValidatorNodeProvider};
use linera_core::node::{LocalValidatorNodeProvider, NodeError};

use std::str::FromStr as _;

Expand Down
6 changes: 3 additions & 3 deletions linera-rpc/src/grpc/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ impl From<&'_ NodeOptions> for Options {
}

#[cfg(web)]
pub mod transport {
mod implementation {
use super::Options;

pub use tonic_web_wasm_client::{Client as Channel, Error};
Expand All @@ -31,7 +31,7 @@ pub mod transport {
}

#[cfg(not(web))]
mod transport {
mod implementation {
use super::Options;

pub use tonic::transport::{Channel, Error};
Expand All @@ -51,4 +51,4 @@ mod transport {
}
}

pub use transport::*;
pub use implementation::*;
4 changes: 2 additions & 2 deletions linera-rpc/src/mass_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ pub enum Error {
Rpc(#[from] tonic::Status),
}

#[trait_variant::make(MassClient: Send)]
pub trait LocalMassClient {
#[async_trait::async_trait]
pub trait MassClient {
async fn send(
&mut self,
requests: Vec<RpcMessage>,
Expand Down
2 changes: 1 addition & 1 deletion linera-rpc/src/node_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
use crate::simple;
use crate::{client::Client, grpc};

use linera_core::node::{NodeError, LocalValidatorNodeProvider};
use linera_core::node::{LocalValidatorNodeProvider, NodeError};

use std::time::Duration;

Expand Down
15 changes: 7 additions & 8 deletions linera-rpc/src/simple/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use linera_core::{

use linera_version::VersionInfo;

use std::future::Future;
use std::time::Duration;
use std::{future::Future, time::Duration};
use tokio::time;

#[derive(Clone)]
Expand Down Expand Up @@ -117,13 +116,12 @@ impl ValidatorNode for Client {
self.query(query.into()).await
}

fn subscribe(&mut self, _chains: Vec<ChainId>) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
fn subscribe(
&mut self,
_chains: Vec<ChainId>,
) -> impl Future<Output = Result<NotificationStream, NodeError>> + Send {
let transport = self.network.protocol.to_string();
async {
Err(NodeError::SubscriptionError {
transport,
})
}
async { Err(NodeError::SubscriptionError { transport }) }
}

async fn get_version_info(&mut self) -> Result<VersionInfo, NodeError> {
Expand Down Expand Up @@ -152,6 +150,7 @@ impl MassClient {
}
}

#[async_trait::async_trait]
impl mass_client::MassClient for MassClient {
async fn send(
&mut self,
Expand Down
2 changes: 1 addition & 1 deletion linera-rpc/src/simple/node_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use super::Client;

use crate::{config::ValidatorPublicNetworkPreConfig, node_provider::NodeOptions};

use linera_core::node::{NodeError, LocalValidatorNodeProvider};
use linera_core::node::{LocalValidatorNodeProvider, NodeError};

use std::str::FromStr as _;

Expand Down
Loading

0 comments on commit 54a87b3

Please sign in to comment.