From 54a87b35deb8dcab1ca5addf8485491568cd3c9b Mon Sep 17 00:00:00 2001 From: James Kay Date: Wed, 6 Mar 2024 23:58:35 +0000 Subject: [PATCH] xxx fix Sendness issues --- examples/Cargo.lock | 12 ++++ linera-core/src/client.rs | 4 +- linera-core/src/local_node.rs | 4 +- linera-core/src/node.rs | 18 +++-- .../src/unit_tests/client_test_utils.rs | 17 +++-- linera-core/src/updater.rs | 9 +-- linera-rpc/Cargo.toml | 1 - linera-rpc/src/client.rs | 2 + linera-rpc/src/grpc/client.rs | 69 +++---------------- linera-rpc/src/grpc/node_provider.rs | 2 +- linera-rpc/src/grpc/transport.rs | 6 +- linera-rpc/src/mass_client.rs | 4 +- linera-rpc/src/node_provider.rs | 2 +- linera-rpc/src/simple/client.rs | 15 ++-- linera-rpc/src/simple/node_provider.rs | 2 +- linera-service/src/faucet.rs | 2 + linera-service/src/linera/client_context.rs | 4 +- linera-service/src/linera/main.rs | 2 +- linera-service/src/schema_export.rs | 8 ++- 19 files changed, 81 insertions(+), 102 deletions(-) diff --git a/examples/Cargo.lock b/examples/Cargo.lock index d5b85a59600d..b2580bd296d9 100644 --- a/examples/Cargo.lock +++ b/examples/Cargo.lock @@ -1892,6 +1892,7 @@ dependencies = [ "tokio-stream", "tonic", "tracing", + "trait-variant", ] [[package]] @@ -3869,6 +3870,17 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "trait-variant" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "45e1a477061e97925d81a2f89fb73b2b8038e6baa5a0023bad380ac23b5f4fa6" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.52", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/linera-core/src/client.rs b/linera-core/src/client.rs index acb5b897ddcb..7dec48e8c89e 100644 --- a/linera-core/src/client.rs +++ b/linera-core/src/client.rs @@ -8,8 +8,8 @@ use crate::{ }, local_node::{LocalNodeClient, LocalNodeError}, node::{ - CrossChainMessageDelivery, NodeError, NotificationStream, LocalValidatorNode, - LocalValidatorNodeProvider, + CrossChainMessageDelivery, LocalValidatorNode, LocalValidatorNodeProvider, NodeError, + NotificationStream, }, notifier::Notifier, updater::{communicate_with_quorum, CommunicateAction, CommunicationError, ValidatorUpdater}, diff --git a/linera-core/src/local_node.rs b/linera-core/src/local_node.rs index bafc4644ae49..0703e539c3dd 100644 --- a/linera-core/src/local_node.rs +++ b/linera-core/src/local_node.rs @@ -4,7 +4,7 @@ use crate::{ data_types::{BlockHeightRange, ChainInfo, ChainInfoQuery, ChainInfoResponse}, - node::{NotificationStream, LocalValidatorNode}, + node::{LocalValidatorNode, NotificationStream}, notifier::Notifier, worker::{Notification, ValidatorWorker, WorkerError, WorkerState}, }; @@ -446,7 +446,7 @@ where chain_id: ChainId, ) -> Result<(), LocalNodeError> where - A: LocalValidatorNode + Clone + 'static, + A: LocalValidatorNode + Clone + 'static, { let local_info = self.local_chain_info(chain_id).await?; let range = BlockHeightRange { diff --git a/linera-core/src/node.rs b/linera-core/src/node.rs index 20a6db1c88a6..fad242523e88 100644 --- a/linera-core/src/node.rs +++ b/linera-core/src/node.rs @@ -6,7 +6,7 @@ use crate::{ data_types::{ChainInfoQuery, ChainInfoResponse}, worker::{Notification, WorkerError}, }; -use futures::{Future, Stream, stream::{BoxStream, LocalBoxStream}}; +use futures::stream::{BoxStream, LocalBoxStream, Stream}; use linera_base::{ crypto::CryptoError, data_types::{ArithmeticError, BlockHeight}, @@ -74,7 +74,10 @@ pub trait LocalValidatorNode { async fn get_version_info(&mut self) -> Result; /// Subscribes to receiving notifications for a collection of chains. - async fn subscribe(&mut self, chains: Vec) -> Result; + async fn subscribe( + &mut self, + chains: Vec, + ) -> Result; } /// Turn an address into a validator node. @@ -106,11 +109,16 @@ pub trait LocalValidatorNodeProvider { } } -pub trait ValidatorNodeProvider: LocalValidatorNodeProvider::Node> { - type Node: ValidatorNode; +pub trait ValidatorNodeProvider: + LocalValidatorNodeProvider::Node> +{ + type Node: ValidatorNode + Send + Sync; } -impl ValidatorNodeProvider for T where T::Node: ValidatorNode { +impl ValidatorNodeProvider for T +where + T::Node: ValidatorNode + Send + Sync, +{ type Node = ::Node; } diff --git a/linera-core/src/unit_tests/client_test_utils.rs b/linera-core/src/unit_tests/client_test_utils.rs index a32621f54044..487343124923 100644 --- a/linera-core/src/unit_tests/client_test_utils.rs +++ b/linera-core/src/unit_tests/client_test_utils.rs @@ -4,7 +4,10 @@ use crate::{ client::{ChainClient, ChainClientBuilder}, data_types::*, - node::{CrossChainMessageDelivery, NodeError, LocalNotificationStream, LocalValidatorNode, LocalValidatorNodeProvider}, + node::{ + CrossChainMessageDelivery, LocalValidatorNodeProvider, NodeError, NotificationStream, + ValidatorNode, + }, notifier::Notifier, worker::{Notification, ValidatorWorker, WorkerState}, }; @@ -90,12 +93,12 @@ pub struct LocalValidatorClient { client: Arc>>, } -impl LocalValidatorNode for LocalValidatorClient +impl ValidatorNode for LocalValidatorClient where S: Storage + Clone + Send + Sync + 'static, ViewError: From, { - type NotificationStream = LocalNotificationStream; + type NotificationStream = NotificationStream; async fn handle_block_proposal( &mut self, @@ -141,7 +144,7 @@ where .await } - async fn subscribe(&mut self, chains: Vec) -> Result { + async fn subscribe(&mut self, chains: Vec) -> Result { self.spawn_and_receive(move |validator, sender| validator.do_subscribe(chains, sender)) .await } @@ -305,11 +308,11 @@ where async fn do_subscribe( self, chains: Vec, - sender: oneshot::Sender>, - ) -> Result<(), Result> { + sender: oneshot::Sender>, + ) -> Result<(), Result> { let validator = self.client.lock().await; let rx = validator.notifier.subscribe(chains); - let stream: LocalNotificationStream = Box::pin(UnboundedReceiverStream::new(rx)); + let stream: NotificationStream = Box::pin(UnboundedReceiverStream::new(rx)); sender.send(Ok(stream)) } } diff --git a/linera-core/src/updater.rs b/linera-core/src/updater.rs index f7005dd5e816..88e5bdddb456 100644 --- a/linera-core/src/updater.rs +++ b/linera-core/src/updater.rs @@ -4,9 +4,9 @@ use crate::{ data_types::{ChainInfo, ChainInfoQuery, ChainInfoResponse}, - node::{CrossChainMessageDelivery, NodeError, LocalValidatorNode}, + node::{CrossChainMessageDelivery, LocalValidatorNode, NodeError}, }; -use futures::{future, StreamExt}; +use futures::{future, Future, StreamExt}; use linera_base::{ data_types::{BlockHeight, Round}, identifiers::ChainId, @@ -73,7 +73,7 @@ pub enum CommunicationError { /// Tries to stop early when a quorum is reached. If `grace_period` is not zero, other validators /// are given this much additional time to contribute to the result, as a fraction of how long it /// took to reach the quorum. -pub async fn communicate_with_quorum<'a, A, V, K, F, G>( +pub async fn communicate_with_quorum<'a, A, V, K, F, R, G>( validator_clients: &'a [(ValidatorName, A)], committee: &Committee, group_by: G, @@ -81,7 +81,8 @@ pub async fn communicate_with_quorum<'a, A, V, K, F, G>( ) -> Result<(K, Vec), CommunicationError> where A: LocalValidatorNode + Clone + 'static, - F: Fn(ValidatorName, A) -> future::LocalBoxFuture<'a, Result> + Clone, + F: Clone + Fn(ValidatorName, A) -> R, + R: Future> + 'a, G: Fn(&V) -> K, K: Hash + PartialEq + Eq + Clone + 'static, V: 'static, diff --git a/linera-rpc/Cargo.toml b/linera-rpc/Cargo.toml index facfd5824db6..0d6512778cf5 100644 --- a/linera-rpc/Cargo.toml +++ b/linera-rpc/Cargo.toml @@ -68,7 +68,6 @@ tokio-util = { workspace = true, optional = true, features = ["codec"] } tonic-health = { workspace = true, optional = true } tower.workspace = true tracing.workspace = true -trait-variant.workspace = true [dev-dependencies] linera-rpc = { path = ".", features = ["test"] } diff --git a/linera-rpc/src/client.rs b/linera-rpc/src/client.rs index be96c5d89fa6..8eb74d97996b 100644 --- a/linera-rpc/src/client.rs +++ b/linera-rpc/src/client.rs @@ -36,6 +36,8 @@ impl From for Client { } } +// TODO(TODO): deduplicate + #[cfg(web)] impl LocalValidatorNode for Client { type NotificationStream = LocalNotificationStream; diff --git a/linera-rpc/src/grpc/client.rs b/linera-rpc/src/grpc/client.rs index 8cfc761d3172..449899b81eea 100644 --- a/linera-rpc/src/grpc/client.rs +++ b/linera-rpc/src/grpc/client.rs @@ -24,7 +24,6 @@ use linera_core::node::LocalValidatorNode; #[cfg(not(web))] use linera_core::node::ValidatorNode; - use linera_version::VersionInfo; use std::{future::Future, iter, time::Duration}; @@ -178,7 +177,10 @@ impl LocalValidatorNode for Client { } #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))] - async fn subscribe(&mut self, chains: Vec) -> Result { + async fn subscribe( + &mut self, + chains: Vec, + ) -> Result { let notification_retry_delay = self.notification_retry_delay; let notification_retries = self.notification_retries; let mut retry_count = 0; @@ -304,7 +306,10 @@ impl ValidatorNode for Client { } #[instrument(target = "grpc_client", skip_all, err, fields(address = self.address))] - fn subscribe(&mut self, chains: Vec) -> impl Future> { + fn subscribe( + &mut self, + chains: Vec, + ) -> impl Future> { let notification_retry_delay = self.notification_retry_delay; let notification_retries = self.notification_retries; let mut retry_count = 0; @@ -343,7 +348,7 @@ impl ValidatorNode for Client { Some((stream, ())) } }) - .flatten(); + .flatten(); // The stream of `Notification`s that inserts increasing delays after retriable errors, and // terminates after unexpected or fatal errors. @@ -381,62 +386,8 @@ impl ValidatorNode for Client { } } -#[cfg(web)] -impl mass_client::LocalMassClient for Client { - #[tracing::instrument(skip_all, err)] - async fn send( - &mut self, - requests: Vec, - max_in_flight: usize, - ) -> Result, mass_client::Error> { - let client = &mut self.client; - let responses = stream::iter(requests) - .map(|request| { - let mut client = client.clone(); - async move { - let response = match request { - RpcMessage::BlockProposal(proposal) => { - let request = Request::new((*proposal).try_into()?); - client.handle_block_proposal(request).await? - } - RpcMessage::Certificate(request) => { - let request = Request::new((*request).try_into()?); - client.handle_certificate(request).await? - } - msg => panic!("attempted to send msg: {:?}", msg), - }; - match response - .into_inner() - .inner - .ok_or(ProtoConversionError::MissingField)? - { - Inner::ChainInfoResponse(chain_info_response) => { - Ok(Some(RpcMessage::ChainInfoResponse(Box::new( - chain_info_response.try_into()?, - )))) - } - Inner::Error(error) => { - let error = bincode::deserialize::(&error) - .map_err(ProtoConversionError::BincodeError)?; - tracing::error!(?error, "received error response"); - Ok(None) - } - } - } - }) - .buffer_unordered(max_in_flight) - .filter_map(|result: Result, mass_client::Error>| async move { - result.transpose() - }) - .collect::>() - .await - .into_iter() - .collect::, _>>()?; - Ok(responses) - } -} - #[cfg(not(web))] +#[async_trait::async_trait] impl mass_client::MassClient for Client { #[tracing::instrument(skip_all, err)] async fn send( diff --git a/linera-rpc/src/grpc/node_provider.rs b/linera-rpc/src/grpc/node_provider.rs index 579b5484a0b5..8c36c33d7910 100644 --- a/linera-rpc/src/grpc/node_provider.rs +++ b/linera-rpc/src/grpc/node_provider.rs @@ -5,7 +5,7 @@ use super::Client; use crate::{config::ValidatorPublicNetworkConfig, node_provider::NodeOptions}; -use linera_core::node::{NodeError, LocalValidatorNodeProvider}; +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::str::FromStr as _; diff --git a/linera-rpc/src/grpc/transport.rs b/linera-rpc/src/grpc/transport.rs index da0ac93c419c..c1dc687e87cb 100644 --- a/linera-rpc/src/grpc/transport.rs +++ b/linera-rpc/src/grpc/transport.rs @@ -19,7 +19,7 @@ impl From<&'_ NodeOptions> for Options { } #[cfg(web)] -pub mod transport { +mod implementation { use super::Options; pub use tonic_web_wasm_client::{Client as Channel, Error}; @@ -31,7 +31,7 @@ pub mod transport { } #[cfg(not(web))] -mod transport { +mod implementation { use super::Options; pub use tonic::transport::{Channel, Error}; @@ -51,4 +51,4 @@ mod transport { } } -pub use transport::*; +pub use implementation::*; diff --git a/linera-rpc/src/mass_client.rs b/linera-rpc/src/mass_client.rs index 32019598c1b9..6f989964d493 100644 --- a/linera-rpc/src/mass_client.rs +++ b/linera-rpc/src/mass_client.rs @@ -16,8 +16,8 @@ pub enum Error { Rpc(#[from] tonic::Status), } -#[trait_variant::make(MassClient: Send)] -pub trait LocalMassClient { +#[async_trait::async_trait] +pub trait MassClient { async fn send( &mut self, requests: Vec, diff --git a/linera-rpc/src/node_provider.rs b/linera-rpc/src/node_provider.rs index 94262449947e..01147d4f8f2d 100644 --- a/linera-rpc/src/node_provider.rs +++ b/linera-rpc/src/node_provider.rs @@ -5,7 +5,7 @@ use crate::simple; use crate::{client::Client, grpc}; -use linera_core::node::{NodeError, LocalValidatorNodeProvider}; +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::time::Duration; diff --git a/linera-rpc/src/simple/client.rs b/linera-rpc/src/simple/client.rs index fb527b48f5b8..f7089758b840 100644 --- a/linera-rpc/src/simple/client.rs +++ b/linera-rpc/src/simple/client.rs @@ -17,8 +17,7 @@ use linera_core::{ use linera_version::VersionInfo; -use std::future::Future; -use std::time::Duration; +use std::{future::Future, time::Duration}; use tokio::time; #[derive(Clone)] @@ -117,13 +116,12 @@ impl ValidatorNode for Client { self.query(query.into()).await } - fn subscribe(&mut self, _chains: Vec) -> impl Future> + Send { + fn subscribe( + &mut self, + _chains: Vec, + ) -> impl Future> + Send { let transport = self.network.protocol.to_string(); - async { - Err(NodeError::SubscriptionError { - transport, - }) - } + async { Err(NodeError::SubscriptionError { transport }) } } async fn get_version_info(&mut self) -> Result { @@ -152,6 +150,7 @@ impl MassClient { } } +#[async_trait::async_trait] impl mass_client::MassClient for MassClient { async fn send( &mut self, diff --git a/linera-rpc/src/simple/node_provider.rs b/linera-rpc/src/simple/node_provider.rs index a6f09cbc491a..938029e63602 100644 --- a/linera-rpc/src/simple/node_provider.rs +++ b/linera-rpc/src/simple/node_provider.rs @@ -5,7 +5,7 @@ use super::Client; use crate::{config::ValidatorPublicNetworkPreConfig, node_provider::NodeOptions}; -use linera_core::node::{NodeError, LocalValidatorNodeProvider}; +use linera_core::node::{LocalValidatorNodeProvider, NodeError}; use std::str::FromStr as _; diff --git a/linera-service/src/faucet.rs b/linera-service/src/faucet.rs index 641a2c600954..cb82fb8abd33 100644 --- a/linera-service/src/faucet.rs +++ b/linera-service/src/faucet.rs @@ -113,6 +113,7 @@ where impl MutationRoot where P: ValidatorNodeProvider + Send + Sync + 'static, +

::Node: Sync, S: Storage + Clone + Send + Sync + 'static, C: ClientContext

+ Send + 'static, ViewError: From, @@ -126,6 +127,7 @@ where impl MutationRoot where P: ValidatorNodeProvider + Send + Sync + 'static, +

::Node: Sync, S: Storage + Clone + Send + Sync + 'static, C: ClientContext

+ Send + 'static, ViewError: From, diff --git a/linera-service/src/linera/client_context.rs b/linera-service/src/linera/client_context.rs index 2e0f72131179..73eaa1d1fe10 100644 --- a/linera-service/src/linera/client_context.rs +++ b/linera-service/src/linera/client_context.rs @@ -727,10 +727,10 @@ impl ClientContext { responses } - fn make_validator_mass_clients(&self) -> Vec> { + fn make_validator_mass_clients(&self) -> Vec> { let mut validator_clients = Vec::new(); for config in &self.wallet_state.genesis_config().committee.validators { - let client: Box = match config.network.protocol { + let client: Box = match config.network.protocol { NetworkProtocol::Simple(protocol) => { let network = config.network.clone_with_protocol(protocol); Box::new(simple::MassClient::new( diff --git a/linera-service/src/linera/main.rs b/linera-service/src/linera/main.rs index 9faddce3f2b2..b7b98add4225 100644 --- a/linera-service/src/linera/main.rs +++ b/linera-service/src/linera/main.rs @@ -20,7 +20,7 @@ use linera_core::{ client::{ChainClient, ChainClientError}, data_types::{ChainInfoQuery, ClientOutcome}, local_node::LocalNodeClient, - node::ValidatorNodeProvider, + node::LocalValidatorNodeProvider, notifier::Notifier, worker::WorkerState, }; diff --git a/linera-service/src/schema_export.rs b/linera-service/src/schema_export.rs index 60ed057b7d95..ce3e203785b8 100644 --- a/linera-service/src/schema_export.rs +++ b/linera-service/src/schema_export.rs @@ -8,8 +8,8 @@ use linera_core::{ client::ChainClient, data_types::{ChainInfoQuery, ChainInfoResponse}, node::{ - CrossChainMessageDelivery, NodeError, NotificationStream, ValidatorNode, - ValidatorNodeProvider, + CrossChainMessageDelivery, LocalValidatorNodeProvider, NodeError, NotificationStream, + ValidatorNode, }, }; use linera_execution::committee::Committee; @@ -29,6 +29,8 @@ use linera_views::{ struct DummyValidatorNode; impl ValidatorNode for DummyValidatorNode { + type NotificationStream = NotificationStream; + async fn handle_block_proposal( &mut self, _: BlockProposal, @@ -71,7 +73,7 @@ impl ValidatorNode for DummyValidatorNode { struct DummyValidatorNodeProvider; -impl ValidatorNodeProvider for DummyValidatorNodeProvider { +impl LocalValidatorNodeProvider for DummyValidatorNodeProvider { type Node = DummyValidatorNode; fn make_node(&self, _: &str) -> Result {