diff --git a/primitives/src/parachain.rs b/primitives/src/parachain.rs index 6b9e95f9a885..327c4e4f59ce 100644 --- a/primitives/src/parachain.rs +++ b/primitives/src/parachain.rs @@ -28,7 +28,7 @@ use serde::{Serialize, Deserialize}; #[cfg(feature = "std")] use primitives::bytes; use primitives::RuntimeDebug; -use runtime_primitives::traits::{Block as BlockT}; +use runtime_primitives::traits::Block as BlockT; use inherents::InherentIdentifier; use application_crypto::KeyTypeId; diff --git a/runtime/common/src/parachains.rs b/runtime/common/src/parachains.rs index 55fa3b920a1e..131881882771 100644 --- a/runtime/common/src/parachains.rs +++ b/runtime/common/src/parachains.rs @@ -337,7 +337,7 @@ decl_storage! { pub RelayDispatchQueue: map hasher(twox_64_concat) ParaId => Vec; /// Size of the dispatch queues. Separated from actual data in order to avoid costly /// decoding when checking receipt validity. First item in tuple is the count of messages - /// second if the total length (in bytes) of the message payloads. + /// second if the total length (in bytes) of the message payloads. pub RelayDispatchQueueSize: map hasher(twox_64_concat) ParaId => (u32, u32); /// The ordered list of ParaIds that have a `RelayDispatchQueue` entry. NeedsDispatch: Vec; diff --git a/runtime/common/src/registrar.rs b/runtime/common/src/registrar.rs index 5722d9155d10..c60930f46620 100644 --- a/runtime/common/src/registrar.rs +++ b/runtime/common/src/registrar.rs @@ -498,7 +498,7 @@ decl_event!{ } impl Module { - /// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so. + /// Ensures that the given `ParaId` corresponds to a registered parathread, and returns a descriptor if so. pub fn ensure_thread_id(id: ParaId) -> Option { Paras::get(id).and_then(|info| if let Scheduling::Dynamic = info.scheduling { Some(info) diff --git a/runtime/test-runtime/src/lib.rs b/runtime/test-runtime/src/lib.rs index 8230c4ad6581..fa22284e7630 100644 --- a/runtime/test-runtime/src/lib.rs +++ b/runtime/test-runtime/src/lib.rs @@ -53,7 +53,7 @@ use frame_support::{ weights::DispatchInfo, }; use pallet_transaction_payment_rpc_runtime_api::RuntimeDispatchInfo; -use session::{historical as session_historical}; +use session::historical as session_historical; #[cfg(feature = "std")] pub use staking::StakerStatus; diff --git a/validation/src/lib.rs b/validation/src/lib.rs index 65fb6c1d4684..438e95ae666d 100644 --- a/validation/src/lib.rs +++ b/validation/src/lib.rs @@ -54,7 +54,7 @@ pub use self::shared_table::{ pub use self::validation_service::{ServiceHandle, ServiceBuilder}; #[cfg(not(target_os = "unknown"))] -pub use parachain::wasm_executor::{run_worker as run_validation_worker}; +pub use parachain::wasm_executor::run_worker as run_validation_worker; mod dynamic_inclusion; mod error; @@ -108,6 +108,7 @@ pub trait Network { /// Instantiate a table router using the given shared table. /// Also pass through any outgoing messages to be broadcast to peers. + #[must_use] fn build_table_router( &self, table: Arc, diff --git a/validation/src/validation_service/mod.rs b/validation/src/validation_service/mod.rs index e4208d695ad9..987f0e063e82 100644 --- a/validation/src/validation_service/mod.rs +++ b/validation/src/validation_service/mod.rs @@ -26,21 +26,20 @@ //! //! These attestation sessions are kept live until they are periodically garbage-collected. -use std::{time::{Duration, Instant}, sync::Arc}; +use std::{time::{Duration, Instant}, sync::Arc, pin::Pin}; use std::collections::HashMap; +use crate::pipeline::FullOutput; use sc_client_api::{BlockchainEvents, BlockBackend}; -use sp_blockchain::HeaderBackend; -use block_builder::BlockBuilderApi; use consensus::SelectChain; -use futures::{future::ready, prelude::*, task::{Spawn, SpawnExt}}; +use futures::{prelude::*, task::{Spawn, SpawnExt}}; use polkadot_primitives::{Block, Hash, BlockId}; use polkadot_primitives::parachain::{ - Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, SigningContext, + Chain, ParachainHost, Id as ParaId, ValidatorIndex, ValidatorId, ValidatorPair, + CollationInfo, SigningContext, }; -use babe_primitives::BabeApi; use keystore::KeyStorePtr; -use sp_api::{ApiExt, ProvideRuntimeApi}; +use sp_api::{ProvideRuntimeApi, ApiExt}; use runtime_primitives::traits::HashFor; use availability_store::Store as AvailabilityStore; @@ -140,13 +139,10 @@ impl ServiceBuilder where C: Collators + Send + Sync + Unpin + 'static, C::Collation: Send + Unpin + 'static, P: BlockchainEvents + BlockBackend, - P: ProvideRuntimeApi + HeaderBackend + Send + Sync + 'static, - P::Api: ParachainHost + - BlockBuilderApi + - BabeApi + - ApiExt, + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, N: Network + Send + Sync + 'static, - N::TableRouter: Send + 'static, + N::TableRouter: Send + 'static + Sync, N::BuildTableRouter: Send + Unpin + 'static, ::SendLocalCollation: Send, SC: SelectChain + 'static, @@ -171,10 +167,10 @@ impl ServiceBuilder where let mut parachain_validation = ParachainValidationInstances { client: self.client.clone(), network: self.network, - collators: self.collators, spawner: self.spawner, availability_store: self.availability_store, live_instances: HashMap::new(), + collation_fetch: DefaultCollationFetch(self.collators), }; let client = self.client; @@ -236,6 +232,57 @@ impl ServiceBuilder where } } +/// Abstraction over `collation_fetch`. +pub(crate) trait CollationFetch { + /// Error type used by `collation_fetch`. + type Error: std::fmt::Debug; + + /// Fetch a collation for the given `parachain`. + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + client: Arc

, + max_block_data_size: Option, + n_validators: usize, + ) -> Pin> + Send>> + where + P::Api: ParachainHost, + P: ProvideRuntimeApi + Send + Sync + 'static; +} + +#[derive(Clone)] +struct DefaultCollationFetch(C); +impl CollationFetch for DefaultCollationFetch + where + C: Collators + Send + Sync + Unpin + 'static, + C::Collation: Send + Unpin + 'static, +{ + type Error = C::Error; + + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + client: Arc

, + max_block_data_size: Option, + n_validators: usize, + ) -> Pin> + Send>> + where + P::Api: ParachainHost, + P: ProvideRuntimeApi + Send + Sync + 'static, + { + crate::collation::collation_fetch( + parachain, + relay_parent, + self.0, + client, + max_block_data_size, + n_validators, + ).boxed() + } +} + // finds the first key we are capable of signing with out of the given set of validators, // if any. fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option> { @@ -248,13 +295,11 @@ fn signing_key(validators: &[ValidatorId], keystore: &KeyStorePtr) -> Option { +pub(crate) struct ParachainValidationInstances { /// The client instance. client: Arc

, /// The backing network handle. network: N, - /// Parachain collators. - collators: C, /// handle to spawner spawner: SP, /// Store for extrinsic data. @@ -262,18 +307,20 @@ pub(crate) struct ParachainValidationInstances { /// Live agreements. Maps relay chain parent hashes to attestation /// instances. live_instances: HashMap, + /// Used to fetch a collation. + collation_fetch: CF, } -impl ParachainValidationInstances where - C: Collators + Send + Unpin + 'static + Sync, +impl ParachainValidationInstances where N: Network, - P: ProvideRuntimeApi + HeaderBackend + BlockBackend + Send + Sync + 'static, - P::Api: ParachainHost + BlockBuilderApi + ApiExt, - C::Collation: Send + Unpin + 'static, - N::TableRouter: Send + 'static, + N::Error: 'static, + P: ProvideRuntimeApi + Send + Sync + 'static, + P::Api: ParachainHost, + N::TableRouter: Send + 'static + Sync, ::SendLocalCollation: Send, N::BuildTableRouter: Unpin + Send + 'static, SP: Spawn + Send + 'static, + CF: CollationFetch + Clone + Send + Sync + 'static, // Rust bug: https://github.com/rust-lang/rust/issues/24159 sp_api::StateBackendFor: sp_api::StateBackend>, { @@ -361,13 +408,41 @@ impl ParachainValidationInstances where max_block_data_size, )); - let router = self.network.build_table_router( + let build_router = self.network.build_table_router( table.clone(), &validators, ); - if let Some((Chain::Parachain(id), index)) = local_duty.as_ref().map(|d| (d.validation, d.index)) { - self.launch_work(parent_hash, id, router, max_block_data_size, validators.len(), index); + let availability_store = self.availability_store.clone(); + let client = self.client.clone(); + let collation_fetch = self.collation_fetch.clone(); + + let res = self.spawner.spawn(async move { + // It is important that we build the router as it launches tasks internally + // that are required to receive gossip messages. + let router = match build_router.await { + Ok(res) => res, + Err(e) => { + warn!(target: "validation", "Failed to build router: {:?}", e); + return + } + }; + + if let Some((Chain::Parachain(id), index)) = local_duty.map(|d| (d.validation, d.index)) { + let n_validators = validators.len(); + + launch_work( + move || collation_fetch.collation_fetch(id, parent_hash, client, max_block_data_size, n_validators), + availability_store, + router, + n_validators, + index, + ).await; + } + }); + + if let Err(e) = res { + error!(target: "validation", "Failed to create router and launch work: {:?}", e); } let tracker = ValidationInstanceHandle { @@ -384,98 +459,305 @@ impl ParachainValidationInstances where fn retain bool>(&mut self, mut pred: F) { self.live_instances.retain(|k, _| pred(k)) } +} - // launch parachain work asynchronously. - fn launch_work( - &self, - relay_parent: Hash, - validation_para: ParaId, - build_router: N::BuildTableRouter, - max_block_data_size: Option, - n_validators: usize, - local_id: ValidatorIndex, - ) { - let (collators, client) = (self.collators.clone(), self.client.clone()); - let availability_store = self.availability_store.clone(); +// launch parachain work asynchronously. +async fn launch_work( + collation_fetch: impl FnOnce() -> CFF, + availability_store: AvailabilityStore, + router: impl TableRouter, + n_validators: usize, + local_id: ValidatorIndex, +) where + E: std::fmt::Debug, + CFF: Future> + Send, +{ + // fetch a local collation from connected collators. + let (collation_info, full_output) = match collation_fetch().await { + Ok(res) => res, + Err(e) => { + warn!(target: "validation", "Failed to collate candidate: {:?}", e); + return + } + }; + + let crate::pipeline::FullOutput { + commitments, + erasure_chunks, + available_data, + .. + } = full_output; + + let receipt = collation_info.into_receipt(commitments); + let pov_block = available_data.pov_block.clone(); + + if let Err(e) = availability_store.make_available( + receipt.hash(), + available_data, + ).await { + warn!( + target: "validation", + "Failed to make parachain block data available: {}", + e, + ); + } + + if let Err(e) = availability_store.clone().add_erasure_chunks( + receipt.clone(), + n_validators as _, + erasure_chunks.clone(), + ).await { + warn!(target: "validation", "Failed to add erasure chunks: {}", e); + } + + if let Err(e) = router.local_collation( + receipt, + pov_block, + (local_id, &erasure_chunks), + ).await { + warn!(target: "validation", "Failed to send local collation: {:?}", e); + } +} + +#[cfg(test)] +mod tests { + use super::*; + use futures::{executor::{ThreadPool, self}, future::ready, channel::mpsc}; + use availability_store::ErasureNetworking; + use polkadot_primitives::parachain::{ + PoVBlock, AbridgedCandidateReceipt, ErasureChunk, ValidatorIndex, + CollationInfo, DutyRoster, GlobalValidationSchedule, LocalValidationData, + Retriable, CollatorId, BlockData, Chain, AvailableData, SigningContext, + }; + use runtime_primitives::traits::Block as BlockT; + use std::pin::Pin; + use sp_keyring::sr25519::Keyring; + + /// Events fired while running mock implementations to follow execution. + enum Events { + BuildTableRouter, + CollationFetch, + LocalCollation, + } + + #[derive(Clone)] + struct MockNetwork(mpsc::UnboundedSender); + + impl Network for MockNetwork { + type Error = String; + type TableRouter = MockTableRouter; + type BuildTableRouter = Pin> + Send>>; + + fn build_table_router( + &self, + _: Arc, + _: &[ValidatorId], + ) -> Self::BuildTableRouter { + let event_sender = self.0.clone(); + async move { + event_sender.unbounded_send(Events::BuildTableRouter).expect("Send `BuildTableRouter`"); + + Ok(MockTableRouter(event_sender)) + }.boxed() + } + } + + #[derive(Clone)] + struct MockTableRouter(mpsc::UnboundedSender); + + impl TableRouter for MockTableRouter { + type Error = String; + type SendLocalCollation = Pin> + Send>>; + type FetchValidationProof = Box> + Unpin>; + + fn local_collation( + &self, + _: AbridgedCandidateReceipt, + _: PoVBlock, + _: (ValidatorIndex, &[ErasureChunk]), + ) -> Self::SendLocalCollation { + let sender = self.0.clone(); + + async move { + sender.unbounded_send(Events::LocalCollation).expect("Send `LocalCollation`"); + + Ok(()) + }.boxed() + } + + fn fetch_pov_block(&self, _: &AbridgedCandidateReceipt) -> Self::FetchValidationProof { + unimplemented!("Not required in tests") + } + } - let with_router = move |router: N::TableRouter| { - // fetch a local collation from connected collators. - let collation_work = crate::collation::collation_fetch( - validation_para, + #[derive(Clone)] + struct MockErasureNetworking; + + impl ErasureNetworking for MockErasureNetworking { + type Error = String; + + fn fetch_erasure_chunk( + &self, + _: &Hash, + _: u32, + ) -> Pin> + Send>> { + ready(Err("Not required in tests".to_string())).boxed() + } + + fn distribute_erasure_chunk(&self, _: Hash, _: ErasureChunk) { + unimplemented!("Not required in tests") + } + } + + #[derive(Clone)] + struct MockCollationFetch(mpsc::UnboundedSender); + + impl CollationFetch for MockCollationFetch { + type Error = (); + + fn collation_fetch

( + self, + parachain: ParaId, + relay_parent: Hash, + _: Arc

, + _: Option, + n_validators: usize, + ) -> Pin> + Send>> { + let info = CollationInfo { + parachain_index: parachain, relay_parent, - collators, - client.clone(), - max_block_data_size, + collator: Default::default(), + signature: Default::default(), + head_data: Default::default(), + pov_block_hash: Default::default(), + }; + + let available_data = AvailableData { + pov_block: PoVBlock { block_data: BlockData(Vec::new()) }, + omitted_validation: Default::default(), + }; + + let full_output = FullOutput { + available_data, + commitments: Default::default(), + erasure_chunks: Default::default(), n_validators, - ); + }; - collation_work.then(move |result| match result { - Ok((collation_info, full_output)) => { - let crate::pipeline::FullOutput { - commitments, - erasure_chunks, - available_data, - .. - } = full_output; - - let receipt = collation_info.into_receipt(commitments); - - // Apparently the `async move` block is the only way to convince - // the compiler that we are not moving values out of borrowed context. - let av_clone = availability_store.clone(); - let receipt_clone = receipt.clone(); - let erasure_chunks_clone = erasure_chunks.clone(); - let pov_block = available_data.pov_block.clone(); - - let res = async move { - if let Err(e) = av_clone.make_available( - receipt_clone.hash(), - available_data, - ).await { - warn!( - target: "validation", - "Failed to make parachain block data available: {}", - e, - ); - } - if let Err(e) = av_clone.clone().add_erasure_chunks( - receipt_clone, - n_validators as _, - erasure_chunks_clone, - ).await { - warn!(target: "validation", "Failed to add erasure chunks: {}", e); - } - } - .unit_error() - .then(move |_| { - router.local_collation( - receipt, - pov_block, - (local_id, &erasure_chunks), - ).map_err(|e| warn!(target: "validation", "Failed to send local collation: {:?}", e)) - }); - - res.boxed() - } - Err(e) => { - warn!(target: "validation", "Failed to collate candidate: {:?}", e); - Box::pin(ready(Ok(()))) - } - }) - }; + let sender = self.0; - let router_work = build_router - .map_ok(with_router) - .map_err(|e| { - warn!(target: "validation" , "Failed to build table router: {:?}", e); - }) - .and_then(|r| r) - .map(|_| ()); + async move { + sender.unbounded_send(Events::CollationFetch).expect("`CollationFetch` event send"); + Ok((info, full_output)) + }.boxed() + } + } + + #[derive(Clone)] + struct MockRuntimeApi { + validators: Vec, + duty_roster: DutyRoster, + } - // spawn onto thread pool. - if self.spawner.spawn(router_work).is_err() { - error!("Failed to spawn router work task"); + impl ProvideRuntimeApi for MockRuntimeApi { + type Api = Self; + + fn runtime_api<'a>(&'a self) -> sp_api::ApiRef<'a, Self::Api> { + self.clone().into() + } + } + + sp_api::mock_impl_runtime_apis! { + impl ParachainHost for MockRuntimeApi { + type Error = sp_blockchain::Error; + + fn validators(&self) -> Vec { self.validators.clone() } + fn duty_roster(&self) -> DutyRoster { self.duty_roster.clone() } + fn active_parachains() -> Vec<(ParaId, Option<(CollatorId, Retriable)>)> { vec![(ParaId::from(1), None)] } + fn global_validation_schedule() -> GlobalValidationSchedule { Default::default() } + fn local_validation_data(_: ParaId) -> Option { None } + fn parachain_code(_: ParaId) -> Option> { None } + fn get_heads(_: Vec<::Extrinsic>) -> Option> { + None + } + fn signing_context() -> SigningContext { + Default::default() + } } } + + #[test] + fn launch_work_is_executed_properly() { + let executor = ThreadPool::new().unwrap(); + let keystore = keystore::Store::new_in_memory(); + + // Make sure `Bob` key is in the keystore, so this mocked node will be a parachain validator. + keystore.write().insert_ephemeral_from_seed::(&Keyring::Bob.to_seed()) + .expect("Insert key into keystore"); + + let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())]; + let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())]; + let duty_roster = DutyRoster { validator_duty }; + + let (events_sender, events) = mpsc::unbounded(); + + let mut parachain_validation = ParachainValidationInstances { + client: Arc::new(MockRuntimeApi { validators, duty_roster }), + network: MockNetwork(events_sender.clone()), + collation_fetch: MockCollationFetch(events_sender.clone()), + spawner: executor.clone(), + availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), + live_instances: HashMap::new(), + }; + + parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + .expect("Creates new validation round"); + + let mut events = executor::block_on_stream(events); + + assert!(matches!(events.next().unwrap(), Events::BuildTableRouter)); + assert!(matches!(events.next().unwrap(), Events::CollationFetch)); + assert!(matches!(events.next().unwrap(), Events::LocalCollation)); + + drop(events_sender); + drop(parachain_validation); + assert!(events.next().is_none()); + } + + #[test] + fn router_is_built_on_relay_chain_validator() { + let executor = ThreadPool::new().unwrap(); + let keystore = keystore::Store::new_in_memory(); + + // Make sure `Alice` key is in the keystore, so this mocked node will be a relay-chain validator. + keystore.write().insert_ephemeral_from_seed::(&Keyring::Alice.to_seed()) + .expect("Insert key into keystore"); + + let validators = vec![ValidatorId::from(Keyring::Alice.public()), ValidatorId::from(Keyring::Bob.public())]; + let validator_duty = vec![Chain::Relay, Chain::Parachain(1.into())]; + let duty_roster = DutyRoster { validator_duty }; + + let (events_sender, events) = mpsc::unbounded(); + + let mut parachain_validation = ParachainValidationInstances { + client: Arc::new(MockRuntimeApi { validators, duty_roster }), + network: MockNetwork(events_sender.clone()), + collation_fetch: MockCollationFetch(events_sender.clone()), + spawner: executor.clone(), + availability_store: AvailabilityStore::new_in_memory(MockErasureNetworking), + live_instances: HashMap::new(), + }; + + parachain_validation.get_or_instantiate(Default::default(), &keystore, None) + .expect("Creates new validation round"); + + let mut events = executor::block_on_stream(events); + + assert!(matches!(events.next().unwrap(), Events::BuildTableRouter)); + + drop(events_sender); + drop(parachain_validation); + assert!(events.next().is_none()); + } }