From 4f6af088ae5473e76731f82fb06f7791964497c8 Mon Sep 17 00:00:00 2001 From: Davide Galassi Date: Wed, 23 Feb 2022 19:58:33 +0100 Subject: [PATCH] Clean obsolete BABE's weight data (#10748) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Clean obsolete BABE weight data * Take out test assertion from check closure * Optimize metadata access using `HeaderMetadata` trait * Apply suggestions from code review * Introduce finalize and import pre-commit synchronous actions * Do not hold locks between internal methods calls * Remove unused generic bound * Apply suggestions from code review * Register BABE's pre-commit actions on `block_import` instead of `start_babe` * PreCommit actions should be `Fn` instead of `FnMut` * More robust safenet in case of malformed finality notifications Co-authored-by: Bastian Köcher Co-authored-by: André Silva <123550+andresilva@users.noreply.github.com> --- client/api/src/backend.rs | 2 + client/api/src/client.rs | 55 +++++++- client/consensus/babe/src/lib.rs | 105 +++++++++++++-- client/consensus/babe/src/tests.rs | 132 ++++++++++++++----- client/service/src/client/client.rs | 129 +++++++++++------- primitives/blockchain/src/header_metadata.rs | 2 +- 6 files changed, 330 insertions(+), 95 deletions(-) diff --git a/client/api/src/backend.rs b/client/api/src/backend.rs index bb2579e4a420e..e96616d5416e6 100644 --- a/client/api/src/backend.rs +++ b/client/api/src/backend.rs @@ -75,6 +75,8 @@ pub struct ImportSummary { /// Contains information about the block that just got finalized, /// including tree heads that became stale at the moment of finalization. pub struct FinalizeSummary { + /// Last finalized block header. + pub header: Block::Header, /// Blocks that were finalized. /// The last entry is the one that has been explicitly finalized. pub finalized: Vec, diff --git a/client/api/src/client.rs b/client/api/src/client.rs index 9bb212099565b..949cc42c1c04b 100644 --- a/client/api/src/client.rs +++ b/client/api/src/client.rs @@ -27,7 +27,8 @@ use sp_runtime::{ }; use std::{collections::HashSet, convert::TryFrom, fmt, sync::Arc}; -use crate::{blockchain::Info, notifications::StorageEventStream}; +use crate::{blockchain::Info, notifications::StorageEventStream, FinalizeSummary, ImportSummary}; + use sc_transaction_pool_api::ChainEvent; use sc_utils::mpsc::TracingUnboundedReceiver; use sp_blockchain; @@ -76,6 +77,34 @@ pub trait BlockchainEvents { ) -> sp_blockchain::Result>; } +/// List of operations to be performed on storage aux data. +/// First tuple element is the encoded data key. +/// Second tuple element is the encoded optional data to write. +/// If `None`, the key and the associated data are deleted from storage. +pub type AuxDataOperations = Vec<(Vec, Option>)>; + +/// Callback invoked before committing the operations created during block import. +/// This gives the opportunity to perform auxiliary pre-commit actions and optionally +/// enqueue further storage write operations to be atomically performed on commit. +pub type OnImportAction = + Box) -> AuxDataOperations) + Send>; + +/// Callback invoked before committing the operations created during block finalization. +/// This gives the opportunity to perform auxiliary pre-commit actions and optionally +/// enqueue further storage write operations to be atomically performed on commit. +pub type OnFinalityAction = + Box) -> AuxDataOperations) + Send>; + +/// Interface to perform auxiliary actions before committing a block import or +/// finality operation. +pub trait PreCommitActions { + /// Actions to be performed on block import. + fn register_import_action(&self, op: OnImportAction); + + /// Actions to be performed on block finalization. + fn register_finality_action(&self, op: OnFinalityAction); +} + /// Interface for fetching block data. pub trait BlockBackend { /// Get block body by ID. Returns `None` if the body is not stored. @@ -300,3 +329,27 @@ impl From> for ChainEvent { Self::Finalized { hash: n.hash, tree_route: n.tree_route } } } + +impl From> for FinalityNotification { + fn from(mut summary: FinalizeSummary) -> Self { + let hash = summary.finalized.pop().unwrap_or_default(); + FinalityNotification { + hash, + header: summary.header, + tree_route: Arc::new(summary.finalized), + stale_heads: Arc::new(summary.stale_heads), + } + } +} + +impl From> for BlockImportNotification { + fn from(summary: ImportSummary) -> Self { + BlockImportNotification { + hash: summary.hash, + origin: summary.origin, + header: summary.header, + is_new_best: summary.is_new_best, + tree_route: summary.tree_route.map(Arc::new), + } + } +} diff --git a/client/consensus/babe/src/lib.rs b/client/consensus/babe/src/lib.rs index 78fe506dbf341..0e9f943eca307 100644 --- a/client/consensus/babe/src/lib.rs +++ b/client/consensus/babe/src/lib.rs @@ -66,7 +66,16 @@ #![forbid(unsafe_code)] #![warn(missing_docs)] -use std::{borrow::Cow, collections::HashMap, convert::TryInto, pin::Pin, sync::Arc, u64}; +use std::{ + borrow::Cow, + collections::{HashMap, HashSet}, + convert::TryInto, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; use codec::{Decode, Encode}; use futures::{ @@ -82,7 +91,10 @@ use prometheus_endpoint::Registry; use retain_mut::RetainMut; use schnorrkel::SignatureError; -use sc_client_api::{backend::AuxStore, BlockchainEvents, ProvideUncles, UsageProvider}; +use sc_client_api::{ + backend::AuxStore, AuxDataOperations, BlockchainEvents, FinalityNotification, PreCommitActions, + ProvideUncles, UsageProvider, +}; use sc_consensus::{ block_import::{ BlockCheckParams, BlockImport, BlockImportParams, ForkChoiceStrategy, ImportResult, @@ -98,7 +110,7 @@ use sc_consensus_slots::{ SlotInfo, StorageChanges, }; use sc_telemetry::{telemetry, TelemetryHandle, CONSENSUS_DEBUG, CONSENSUS_TRACE}; -use sp_api::{ApiExt, NumberFor, ProvideRuntimeApi}; +use sp_api::{ApiExt, ProvideRuntimeApi}; use sp_application_crypto::AppKey; use sp_block_builder::BlockBuilder as BlockBuilderApi; use sp_blockchain::{Error as ClientError, HeaderBackend, HeaderMetadata, Result as ClientResult}; @@ -113,7 +125,7 @@ use sp_inherents::{CreateInherentDataProviders, InherentData, InherentDataProvid use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use sp_runtime::{ generic::{BlockId, OpaqueDigestItemId}, - traits::{Block as BlockT, Header, Zero}, + traits::{Block as BlockT, Header, NumberFor, One, SaturatedConversion, Saturating, Zero}, DigestItem, }; @@ -458,6 +470,7 @@ where C: ProvideRuntimeApi + ProvideUncles + BlockchainEvents + + PreCommitActions + HeaderBackend + HeaderMetadata + Send @@ -501,7 +514,8 @@ where }; info!(target: "babe", "👶 Starting BABE Authorship worker"); - let inner = sc_consensus_slots::start_slot_worker( + + let slot_worker = sc_consensus_slots::start_slot_worker( babe_link.config.slot_duration(), select_chain, worker, @@ -515,13 +529,69 @@ where let answer_requests = answer_requests(worker_rx, babe_link.config, client, babe_link.epoch_changes.clone()); + let inner = future::select(Box::pin(slot_worker), Box::pin(answer_requests)); Ok(BabeWorker { - inner: Box::pin(future::join(inner, answer_requests).map(|_| ())), + inner: Box::pin(inner.map(|_| ())), slot_notification_sinks, handle: BabeWorkerHandle(worker_tx), }) } +// Remove obsolete block's weight data by leveraging finality notifications. +// This includes data for all finalized blocks (excluding the most recent one) +// and all stale branches. +fn aux_storage_cleanup, Block: BlockT>( + client: &C, + notification: &FinalityNotification, +) -> AuxDataOperations { + let mut aux_keys = HashSet::new(); + + // Cleans data for finalized block's ancestors down to, and including, the previously + // finalized one. + + let first_new_finalized = notification.tree_route.get(0).unwrap_or(¬ification.hash); + match client.header_metadata(*first_new_finalized) { + Ok(meta) => { + aux_keys.insert(aux_schema::block_weight_key(meta.parent)); + }, + Err(err) => { + warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", first_new_finalized.to_string(), err.to_string()); + }, + } + + aux_keys.extend(notification.tree_route.iter().map(aux_schema::block_weight_key)); + + // Cleans data for stale branches. + + // A safenet in case of malformed notification. + let height_limit = notification.header.number().saturating_sub( + notification.tree_route.len().saturated_into::>() + One::one(), + ); + for head in notification.stale_heads.iter() { + let mut hash = *head; + // Insert stale blocks hashes until canonical chain is not reached. + // Soon or late we should hit an element already present within the `aux_keys` set. + while aux_keys.insert(aux_schema::block_weight_key(hash)) { + match client.header_metadata(hash) { + Ok(meta) => { + // This should never happen and must be considered a bug. + if meta.number <= height_limit { + warn!(target: "babe", "unexpected canonical chain state or malformed finality notification"); + break + } + hash = meta.parent; + }, + Err(err) => { + warn!(target: "babe", "header lookup fail while cleaning data for block {}: {}", head.to_string(), err.to_string()); + break + }, + } + } + } + + aux_keys.into_iter().map(|val| (val, None)).collect() +} + async fn answer_requests( mut request_rx: Receiver>, config: Config, @@ -604,7 +674,7 @@ impl BabeWorkerHandle { /// Worker for Babe which implements `Future`. This must be polled. #[must_use] pub struct BabeWorker { - inner: Pin + Send + 'static>>, + inner: Pin + Send + 'static>>, slot_notification_sinks: SlotNotificationSinks, handle: BabeWorkerHandle, } @@ -628,13 +698,10 @@ impl BabeWorker { } } -impl futures::Future for BabeWorker { +impl Future for BabeWorker { type Output = (); - fn poll( - mut self: Pin<&mut Self>, - cx: &mut futures::task::Context, - ) -> futures::task::Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll { self.inner.as_mut().poll(cx) } } @@ -857,7 +924,7 @@ where self.telemetry.clone() } - fn proposing_remaining_duration(&self, slot_info: &SlotInfo) -> std::time::Duration { + fn proposing_remaining_duration(&self, slot_info: &SlotInfo) -> Duration { let parent_slot = find_pre_digest::(&slot_info.chain_head).ok().map(|d| d.slot()); sc_consensus_slots::proposing_remaining_duration( @@ -1683,7 +1750,11 @@ pub fn block_import( client: Arc, ) -> ClientResult<(BabeBlockImport, BabeLink)> where - Client: AuxStore + HeaderBackend + HeaderMetadata, + Client: AuxStore + + HeaderBackend + + HeaderMetadata + + PreCommitActions + + 'static, { let epoch_changes = aux_schema::load_epoch_changes::(&*client, &config.genesis_config)?; @@ -1694,6 +1765,12 @@ where // startup rather than waiting until importing the next epoch change block. prune_finalized(client.clone(), &mut epoch_changes.shared_data())?; + let client_clone = client.clone(); + let on_finality = move |summary: &FinalityNotification| { + aux_storage_cleanup(client_clone.as_ref(), summary) + }; + client.register_finality_action(Box::new(on_finality)); + let import = BabeBlockImport::new(client, epoch_changes, wrapped_block_import, config); Ok((import, link)) diff --git a/client/consensus/babe/src/tests.rs b/client/consensus/babe/src/tests.rs index d38d57b84f5e1..d2de05bc91952 100644 --- a/client/consensus/babe/src/tests.rs +++ b/client/consensus/babe/src/tests.rs @@ -28,7 +28,7 @@ use log::debug; use rand::RngCore; use rand_chacha::{rand_core::SeedableRng, ChaChaRng}; use sc_block_builder::{BlockBuilder, BlockBuilderProvider}; -use sc_client_api::{backend::TransactionFor, BlockchainEvents}; +use sc_client_api::{backend::TransactionFor, BlockchainEvents, Finalizer}; use sc_consensus::{BoxBlockImport, BoxJustificationImport}; use sc_consensus_slots::BackoffAuthoringOnFinalizedHeadLagging; use sc_keystore::LocalKeystore; @@ -608,8 +608,8 @@ fn propose_and_import_block( slot: Option, proposer_factory: &mut DummyFactory, block_import: &mut BoxBlockImport, -) -> sp_core::H256 { - let mut proposer = futures::executor::block_on(proposer_factory.init(parent)).unwrap(); +) -> Hash { + let mut proposer = block_on(proposer_factory.init(parent)).unwrap(); let slot = slot.unwrap_or_else(|| { let parent_pre_digest = find_pre_digest::(parent).unwrap(); @@ -625,7 +625,7 @@ fn propose_and_import_block( let parent_hash = parent.hash(); - let mut block = futures::executor::block_on(proposer.propose_with(pre_digest)).unwrap().block; + let mut block = block_on(proposer.propose_with(pre_digest)).unwrap().block; let epoch_descriptor = proposer_factory .epoch_changes @@ -673,6 +673,29 @@ fn propose_and_import_block( post_hash } +// Propose and import n valid BABE blocks that are built on top of the given parent. +// The proposer takes care of producing epoch change digests according to the epoch +// duration (which is set to 6 slots in the test runtime). +fn propose_and_import_blocks( + client: &PeersFullClient, + proposer_factory: &mut DummyFactory, + block_import: &mut BoxBlockImport, + parent_id: BlockId, + n: usize, +) -> Vec { + let mut hashes = Vec::with_capacity(n); + let mut parent_header = client.header(&parent_id).unwrap().unwrap(); + + for _ in 0..n { + let block_hash = + propose_and_import_block(&parent_header, None, proposer_factory, block_import); + hashes.push(block_hash); + parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap(); + } + + hashes +} + #[test] fn importing_block_one_sets_genesis_epoch() { let mut net = BabeTestNet::new(1); @@ -714,8 +737,6 @@ fn importing_block_one_sets_genesis_epoch() { #[test] fn importing_epoch_change_block_prunes_tree() { - use sc_client_api::Finalizer; - let mut net = BabeTestNet::new(1); let peer = net.peer(0); @@ -732,26 +753,8 @@ fn importing_epoch_change_block_prunes_tree() { mutator: Arc::new(|_, _| ()), }; - // This is just boilerplate code for proposing and importing n valid BABE - // blocks that are built on top of the given parent. The proposer takes care - // of producing epoch change digests according to the epoch duration (which - // is set to 6 slots in the test runtime). - let mut propose_and_import_blocks = |parent_id, n| { - let mut hashes = Vec::new(); - let mut parent_header = client.header(&parent_id).unwrap().unwrap(); - - for _ in 0..n { - let block_hash = propose_and_import_block( - &parent_header, - None, - &mut proposer_factory, - &mut block_import, - ); - hashes.push(block_hash); - parent_header = client.header(&BlockId::Hash(block_hash)).unwrap().unwrap(); - } - - hashes + let mut propose_and_import_blocks_wrap = |parent_id, n| { + propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n) }; // This is the block tree that we're going to use in this test. Each node @@ -766,12 +769,12 @@ fn importing_epoch_change_block_prunes_tree() { // Create and import the canon chain and keep track of fork blocks (A, C, D) // from the diagram above. - let canon_hashes = propose_and_import_blocks(BlockId::Number(0), 30); + let canon_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 30); // Create the forks - let fork_1 = propose_and_import_blocks(BlockId::Hash(canon_hashes[0]), 10); - let fork_2 = propose_and_import_blocks(BlockId::Hash(canon_hashes[12]), 15); - let fork_3 = propose_and_import_blocks(BlockId::Hash(canon_hashes[18]), 10); + let fork_1 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[0]), 10); + let fork_2 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[12]), 15); + let fork_3 = propose_and_import_blocks_wrap(BlockId::Hash(canon_hashes[18]), 10); // We should be tracking a total of 9 epochs in the fork tree assert_eq!(epoch_changes.shared_data().tree().iter().count(), 9); @@ -782,7 +785,7 @@ fn importing_epoch_change_block_prunes_tree() { // We finalize block #13 from the canon chain, so on the next epoch // change the tree should be pruned, to not contain F (#7). client.finalize_block(BlockId::Hash(canon_hashes[12]), None, false).unwrap(); - propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 7); + propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 7); // at this point no hashes from the first fork must exist on the tree assert!(!epoch_changes @@ -809,7 +812,7 @@ fn importing_epoch_change_block_prunes_tree() { // finalizing block #25 from the canon chain should prune out the second fork client.finalize_block(BlockId::Hash(canon_hashes[24]), None, false).unwrap(); - propose_and_import_blocks(BlockId::Hash(client.chain_info().best_hash), 8); + propose_and_import_blocks_wrap(BlockId::Hash(client.chain_info().best_hash), 8); // at this point no hashes from the second fork must exist on the tree assert!(!epoch_changes @@ -894,3 +897,68 @@ fn babe_transcript_generation_match() { }; debug_assert!(test(orig_transcript) == test(transcript_from_data(new_transcript))); } + +#[test] +fn obsolete_blocks_aux_data_cleanup() { + let mut net = BabeTestNet::new(1); + + let peer = net.peer(0); + let data = peer.data.as_ref().expect("babe link set up during initialization"); + let client = peer.client().as_client(); + + // Register the handler (as done by `babe_start`) + let client_clone = client.clone(); + let on_finality = move |summary: &FinalityNotification| { + aux_storage_cleanup(client_clone.as_ref(), summary) + }; + client.register_finality_action(Box::new(on_finality)); + + let mut proposer_factory = DummyFactory { + client: client.clone(), + config: data.link.config.clone(), + epoch_changes: data.link.epoch_changes.clone(), + mutator: Arc::new(|_, _| ()), + }; + + let mut block_import = data.block_import.lock().take().expect("import set up during init"); + + let mut propose_and_import_blocks_wrap = |parent_id, n| { + propose_and_import_blocks(&client, &mut proposer_factory, &mut block_import, parent_id, n) + }; + + let aux_data_check = |hashes: &[Hash], expected: bool| { + hashes.iter().all(|hash| { + aux_schema::load_block_weight(&*peer.client().as_backend(), hash) + .unwrap() + .is_some() == expected + }) + }; + + // Create the following test scenario: + // + // /-----B3 --- B4 ( < fork2 ) + // G --- A1 --- A2 --- A3 --- A4 ( < fork1 ) + // \-----C4 --- C5 ( < fork3 ) + + let fork1_hashes = propose_and_import_blocks_wrap(BlockId::Number(0), 4); + let fork2_hashes = propose_and_import_blocks_wrap(BlockId::Number(2), 2); + let fork3_hashes = propose_and_import_blocks_wrap(BlockId::Number(3), 2); + + // Check that aux data is present for all but the genesis block. + assert!(aux_data_check(&[client.chain_info().genesis_hash], false)); + assert!(aux_data_check(&fork1_hashes, true)); + assert!(aux_data_check(&fork2_hashes, true)); + assert!(aux_data_check(&fork3_hashes, true)); + + // Finalize A3 + client.finalize_block(BlockId::Number(3), None, true).unwrap(); + + // Wiped: A1, A2 + assert!(aux_data_check(&fork1_hashes[..2], false)); + // Present: A3, A4 + assert!(aux_data_check(&fork1_hashes[2..], true)); + // Wiped: B3, B4 + assert!(aux_data_check(&fork2_hashes, false)); + // Present C4, C5 + assert!(aux_data_check(&fork3_hashes, true)); +} diff --git a/client/service/src/client/client.rs b/client/service/src/client/client.rs index 071af36a23f96..8497c34f5abbb 100644 --- a/client/service/src/client/client.rs +++ b/client/service/src/client/client.rs @@ -36,11 +36,12 @@ use sc_client_api::{ client::{ BadBlocks, BlockBackend, BlockImportNotification, BlockOf, BlockchainEvents, ClientInfo, FinalityNotification, FinalityNotifications, ForkBlocks, ImportNotifications, - ProvideUncles, + PreCommitActions, ProvideUncles, }, execution_extensions::ExecutionExtensions, notifications::{StorageEventStream, StorageNotifications}, - CallExecutor, ExecutorProvider, KeyIterator, ProofProvider, UsageProvider, + CallExecutor, ExecutorProvider, KeyIterator, OnFinalityAction, OnImportAction, ProofProvider, + UsageProvider, }; use sc_consensus::{ BlockCheckParams, BlockImportParams, ForkChoiceStrategy, ImportResult, StateAction, @@ -76,8 +77,9 @@ use sp_runtime::{ }; use sp_state_machine::{ prove_child_read, prove_range_read_with_child_with_size, prove_read, - read_range_proof_check_with_child_on_proving_backend, Backend as StateBackend, KeyValueStates, - KeyValueStorageLevel, MAX_NESTED_TRIE_DEPTH, + read_range_proof_check_with_child_on_proving_backend, Backend as StateBackend, + ChildStorageCollection, KeyValueStates, KeyValueStorageLevel, StorageCollection, + MAX_NESTED_TRIE_DEPTH, }; use sp_trie::{CompactProof, StorageProof}; use std::{ @@ -108,7 +110,13 @@ where storage_notifications: Mutex>, import_notification_sinks: NotificationSinks>, finality_notification_sinks: NotificationSinks>, - // holds the block hash currently being imported. TODO: replace this with block queue + // Collects auxiliary operations to be performed atomically together with + // block import operations. + import_actions: Mutex>>, + // Collects auxiliary operations to be performed atomically together with + // block finalization operations. + finality_actions: Mutex>>, + // Holds the block hash currently being imported. TODO: replace this with block queue. importing_block: RwLock>, block_rules: BlockRules, execution_extensions: ExecutionExtensions, @@ -279,11 +287,32 @@ where let r = f(&mut op)?; - let ClientImportOperation { op, notify_imported, notify_finalized } = op; + let ClientImportOperation { mut op, notify_imported, notify_finalized } = op; + + let finality_notification = notify_finalized.map(|summary| summary.into()); + let (import_notification, storage_changes) = match notify_imported { + Some(mut summary) => { + let storage_changes = summary.storage_changes.take(); + (Some(summary.into()), storage_changes) + }, + None => (None, None), + }; + + if let Some(ref notification) = finality_notification { + for action in self.finality_actions.lock().iter_mut() { + op.insert_aux(action(notification))?; + } + } + if let Some(ref notification) = import_notification { + for action in self.import_actions.lock().iter_mut() { + op.insert_aux(action(notification))?; + } + } + self.backend.commit_operation(op)?; - self.notify_finalized(notify_finalized)?; - self.notify_imported(notify_imported)?; + self.notify_finalized(finality_notification)?; + self.notify_imported(import_notification, storage_changes)?; Ok(r) }; @@ -367,6 +396,8 @@ where storage_notifications: Mutex::new(StorageNotifications::new(prometheus_registry)), import_notification_sinks: Default::default(), finality_notification_sinks: Default::default(), + import_actions: Default::default(), + finality_actions: Default::default(), importing_block: Default::default(), block_rules: BlockRules::new(fork_blocks, bad_blocks), execution_extensions, @@ -686,12 +717,21 @@ where // We only notify when we are already synced to the tip of the chain // or if this import triggers a re-org if make_notifications || tree_route.is_some() { + let header = import_headers.into_post(); if finalized { let mut summary = match operation.notify_finalized.take() { - Some(summary) => summary, - None => FinalizeSummary { finalized: Vec::new(), stale_heads: Vec::new() }, + Some(mut summary) => { + summary.header = header.clone(); + summary.finalized.push(hash); + summary + }, + None => FinalizeSummary { + header: header.clone(), + finalized: vec![hash], + stale_heads: Vec::new(), + }, }; - summary.finalized.push(hash); + if parent_exists { // Add to the stale list all heads that are branching from parent besides our // current `head`. @@ -718,7 +758,7 @@ where operation.notify_imported = Some(ImportSummary { hash, origin, - header: import_headers.into_post(), + header, is_new_best, storage_changes, tree_route, @@ -863,7 +903,7 @@ where .backend .blockchain() .number(last_finalized)? - .expect("Finalized block expected to be onchain; qed"); + .expect("Previous finalized block expected to be onchain; qed"); let mut stale_heads = Vec::new(); for head in self.backend.blockchain().leaves()? { let route_from_finalized = @@ -884,7 +924,12 @@ where stale_heads.push(head); } } - operation.notify_finalized = Some(FinalizeSummary { finalized, stale_heads }); + let header = self + .backend + .blockchain() + .header(BlockId::Hash(block))? + .expect("Finalized block expected to be onchain; qed"); + operation.notify_finalized = Some(FinalizeSummary { header, finalized, stale_heads }); } Ok(()) @@ -892,11 +937,11 @@ where fn notify_finalized( &self, - notify_finalized: Option>, + notification: Option>, ) -> sp_blockchain::Result<()> { let mut sinks = self.finality_notification_sinks.lock(); - let mut notify_finalized = match notify_finalized { + let notification = match notification { Some(notify_finalized) => notify_finalized, None => { // Cleanup any closed finality notification sinks @@ -907,30 +952,14 @@ where }, }; - let last = notify_finalized.finalized.pop().expect( - "At least one finalized block shall exist within a valid finalization summary; qed", - ); - - let header = self.header(&BlockId::Hash(last))?.expect( - "Header already known to exist in DB because it is indicated in the tree route; \ - qed", - ); - telemetry!( self.telemetry; SUBSTRATE_INFO; "notify.finalized"; - "height" => format!("{}", header.number()), - "best" => ?last, + "height" => format!("{}", notification.header.number()), + "best" => ?notification.hash, ); - let notification = FinalityNotification { - hash: last, - header, - tree_route: Arc::new(notify_finalized.finalized), - stale_heads: Arc::new(notify_finalized.stale_heads), - }; - sinks.retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); Ok(()) @@ -938,12 +967,13 @@ where fn notify_imported( &self, - notify_import: Option>, + notification: Option>, + storage_changes: Option<(StorageCollection, ChildStorageCollection)>, ) -> sp_blockchain::Result<()> { - let notify_import = match notify_import { + let notification = match notification { Some(notify_import) => notify_import, None => { - // cleanup any closed import notification sinks since we won't + // Cleanup any closed import notification sinks since we won't // be sending any notifications below which would remove any // closed sinks. this is necessary since during initial sync we // won't send any import notifications which could lead to a @@ -954,23 +984,15 @@ where }, }; - if let Some(storage_changes) = notify_import.storage_changes { + if let Some(storage_changes) = storage_changes { // TODO [ToDr] How to handle re-orgs? Should we re-emit all storage changes? self.storage_notifications.lock().trigger( - ¬ify_import.hash, + ¬ification.hash, storage_changes.0.into_iter(), storage_changes.1.into_iter().map(|(sk, v)| (sk, v.into_iter())), ); } - let notification = BlockImportNotification:: { - hash: notify_import.hash, - origin: notify_import.origin, - header: notify_import.header, - is_new_best: notify_import.is_new_best, - tree_route: notify_import.tree_route.map(Arc::new), - }; - self.import_notification_sinks .lock() .retain(|sink| sink.unbounded_send(notification.clone()).is_ok()); @@ -1892,6 +1914,19 @@ where } } +impl PreCommitActions for Client +where + Block: BlockT, +{ + fn register_import_action(&self, action: OnImportAction) { + self.import_actions.lock().push(action); + } + + fn register_finality_action(&self, action: OnFinalityAction) { + self.finality_actions.lock().push(action); + } +} + impl BlockchainEvents for Client where E: CallExecutor, diff --git a/primitives/blockchain/src/header_metadata.rs b/primitives/blockchain/src/header_metadata.rs index 6e8dc562473d8..c21c82b9fbc23 100644 --- a/primitives/blockchain/src/header_metadata.rs +++ b/primitives/blockchain/src/header_metadata.rs @@ -206,7 +206,7 @@ impl TreeRoute { /// Handles header metadata: hash, number, parent hash, etc. pub trait HeaderMetadata { /// Error used in case the header metadata is not found. - type Error; + type Error: std::error::Error; fn header_metadata( &self,