Skip to content

Commit

Permalink
Prune the too old execution receipts while writing a new one
Browse files Browse the repository at this point in the history
We have to use the block number as key because the time is measured in
block number only.
  • Loading branch information
liuchengxu committed Jan 31, 2022
1 parent e0631d3 commit 67f06be
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 30 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 12 additions & 1 deletion cumulus/client/cirrus-executor/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ rand = "0.8.4"
rand_chacha = "0.3.1"
merkletree = "0.21.0"
parking_lot = "0.11.2"
sha2 = "0.10.0"
tracing = "0.1.25"
thiserror = "1.0.29"
tokio = "1.10"
Expand All @@ -48,5 +47,17 @@ sp-executor = { path = "../../../crates/sp-executor" }
subspace-core-primitives = { path = "../../../crates/subspace-core-primitives" }
subspace-runtime-primitives = { path = "../../../crates/subspace-runtime-primitives" }

# Ugly workaround for https://github.com/rust-lang/cargo/issues/1197
[target.'cfg(any(target_os = "linux", target_os = "macos", all(target_os = "windows", target_env = "gnu")))'.dependencies.sha2]
features = ["asm"]
version = "0.10.0"

# Ugly workaround for https://github.com/rust-lang/cargo/issues/1197
# `asm` feature is not supported on Windows except with GNU toolchain
[target.'cfg(not(any(target_os = "linux", target_os = "macos", all(target_os = "windows", target_env = "gnu"))))'.dependencies.sha2]
version = "0.10.0"

[dev-dependencies]
sp-keyring = { git = "https://github.com/paritytech/substrate", rev = "04c7c6abef1e0c6b4126427159090a44f3221333" }
substrate-test-runtime = { path = "../../../substrate/substrate-test-runtime" }
substrate-test-runtime-client = { path = "../../../substrate/substrate-test-runtime-client" }
135 changes: 126 additions & 9 deletions cumulus/client/cirrus-executor/src/aux_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,15 @@ use codec::{Decode, Encode};
use sc_client_api::backend::AuxStore;
use sp_blockchain::{Error as ClientError, Result as ClientResult};
use sp_executor::ExecutionReceipt;
use sp_runtime::traits::Block as BlockT;
use sp_runtime::{
traits::{Block as BlockT, Header as HeaderT, One, Saturating},
SaturatedConversion,
};

const EXECUTION_RECEIPT_KEY: &[u8] = b"execution_receipt";
const EXECUTION_RECEIPT_START: &[u8] = b"execution_receipt_start";
/// Prune the execution receipts when they reach this number.
const PRUNING_DEPTH: u64 = 1000;

fn load_decode<Backend: AuxStore, T: Decode>(
backend: &Backend,
Expand All @@ -22,29 +28,140 @@ fn load_decode<Backend: AuxStore, T: Decode>(
}
}

/// Write the execution receipt of a block to aux storage.
/// Write the execution receipt of a block to aux storage, optionally prune the receipts that are
/// too old.
pub(super) fn write_execution_receipt<Backend: AuxStore, Block: BlockT>(
backend: &Backend,
block_hash: Block::Hash,
block_number: <<Block as BlockT>::Header as HeaderT>::Number,
execution_receipt: &ExecutionReceipt<Block::Hash>,
) -> Result<(), sp_blockchain::Error> {
let key = (EXECUTION_RECEIPT_KEY, block_hash).encode();
backend.insert_aux(&[(key.as_slice(), execution_receipt.encode().as_slice())], [])
let first_saved_receipt = load_decode::<_, <<Block as BlockT>::Header as HeaderT>::Number>(
backend,
EXECUTION_RECEIPT_START,
)?
.unwrap_or(block_number);

let mut new_first_saved_receipt = first_saved_receipt;

if block_number - first_saved_receipt >= PRUNING_DEPTH.saturated_into() {
new_first_saved_receipt = block_number.saturating_sub((PRUNING_DEPTH - 1).saturated_into());

let mut keys_to_delete = vec![];
let mut to_delete_start = first_saved_receipt;
while to_delete_start < new_first_saved_receipt {
keys_to_delete.push((EXECUTION_RECEIPT_KEY, to_delete_start).encode());
to_delete_start = to_delete_start.saturating_add(One::one());
}

backend.insert_aux(
&[
(
(EXECUTION_RECEIPT_KEY, block_number).encode().as_slice(),
execution_receipt.encode().as_slice(),
),
((EXECUTION_RECEIPT_START, new_first_saved_receipt.encode().as_slice())),
],
&keys_to_delete.iter().map(|k| &k[..]).collect::<Vec<&[u8]>>()[..],
)
} else {
backend.insert_aux(
&[
(
(EXECUTION_RECEIPT_KEY, block_number).encode().as_slice(),
execution_receipt.encode().as_slice(),
),
((EXECUTION_RECEIPT_START, new_first_saved_receipt.encode().as_slice())),
],
[],
)
}
}

/// Load the execution receipt associated with a block.
pub(super) fn load_execution_receipt<Backend: AuxStore, Block: BlockT>(
backend: &Backend,
block_hash: Block::Hash,
block_number: <<Block as BlockT>::Header as HeaderT>::Number,
) -> ClientResult<Option<ExecutionReceipt<Block::Hash>>> {
load_decode(backend, (EXECUTION_RECEIPT_KEY, block_hash).encode().as_slice())
let key = (EXECUTION_RECEIPT_KEY, block_number).encode();
load_decode(backend, key.as_slice())
}

/// Remove the validated execution receipt.
pub(super) fn delete_execution_receipt<Backend: AuxStore, Block: BlockT>(
backend: &Backend,
block_hash: Block::Hash,
block_number: <<Block as BlockT>::Header as HeaderT>::Number,
) -> Result<(), sp_blockchain::Error> {
let key = (EXECUTION_RECEIPT_KEY, block_hash).encode();
let key = (EXECUTION_RECEIPT_KEY, block_number).encode();
backend.insert_aux([], &[(key.as_slice())])
}

pub(super) fn target_receipt_is_pruned<Block: BlockT>(
current_block: <<Block as BlockT>::Header as HeaderT>::Number,
target_block: <<Block as BlockT>::Header as HeaderT>::Number,
) -> bool {
current_block - target_block >= PRUNING_DEPTH.saturated_into()
}

#[cfg(test)]
mod tests {
use super::*;
use sp_core::hash::H256;
use substrate_test_runtime::{Block, BlockNumber, Hash};

type ExecutionReceipt = sp_executor::ExecutionReceipt<Hash>;

fn create_execution_receipt() -> ExecutionReceipt {
ExecutionReceipt {
primary_hash: H256::random(),
secondary_hash: H256::random(),
trace: Default::default(),
trace_root: Default::default(),
}
}

#[test]
fn prune_execution_receipt_works() {
let client = substrate_test_runtime_client::new();

let receipt_start = || {
load_decode::<_, BlockNumber>(&client, EXECUTION_RECEIPT_START.to_vec().as_slice())
.unwrap()
};

let receipt_at =
|number: BlockNumber| load_execution_receipt::<_, Block>(&client, number).unwrap();

let write_receipt_at = |number: BlockNumber| {
write_execution_receipt::<_, Block>(&client, number, &create_execution_receipt())
.unwrap()
};

assert_eq!(receipt_start(), None);

// Create PRUNING_DEPTH receipts.
(1..=PRUNING_DEPTH).for_each(|number| {
write_receipt_at(number);
assert!(receipt_at(number).is_some());
assert_eq!(receipt_start(), Some(1));
});

assert!(!target_receipt_is_pruned::<Block>(PRUNING_DEPTH, 1));

// Create PRUNING_DEPTH + 1 receipt.
write_receipt_at(PRUNING_DEPTH + 1);
assert!(receipt_at(PRUNING_DEPTH + 1).is_some());
// ER of block #1 should be pruned.
assert!(receipt_at(1).is_none());
assert!(target_receipt_is_pruned::<Block>(PRUNING_DEPTH + 1, 1));
assert_eq!(receipt_start(), Some(2));

// Create PRUNING_DEPTH + 2 receipt.
write_receipt_at(PRUNING_DEPTH + 2);
assert!(receipt_at(PRUNING_DEPTH + 2).is_some());
// ER of block #2 should be pruned.
assert!(receipt_at(2).is_none());
assert!(target_receipt_is_pruned::<Block>(PRUNING_DEPTH + 2, 2));
assert!(!target_receipt_is_pruned::<Block>(PRUNING_DEPTH + 2, 3));
assert_eq!(receipt_start(), Some(3));
}
}
23 changes: 16 additions & 7 deletions cumulus/client/cirrus-executor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,20 @@ where
) -> Result<Action, Self::Error> {
// TODO: validate the Proof-of-Election

let block_hash = execution_receipt.secondary_hash;
let block_number = self
.client
.expect_block_number_from_id(&BlockId::Hash(execution_receipt.secondary_hash))?;
let best_number = self.client.info().best_number;

// Just ignore it if the receipt is too old and has been pruned.
if aux_schema::target_receipt_is_pruned::<Block>(best_number, block_number) {
return Ok(Action::Empty)
}

// TODO: more efficient execution receipt checking strategy?
let local_receipt = if let Some(local_receipt) =
crate::aux_schema::load_execution_receipt::<_, Block>(
&*self.client,
execution_receipt.secondary_hash,
)? {
crate::aux_schema::load_execution_receipt::<_, Block>(&*self.client, block_number)?
{
local_receipt
} else {
// Wait for the local execution receipt until it's ready.
Expand All @@ -425,7 +431,8 @@ where
async move {
loop {
match crate::aux_schema::load_execution_receipt::<_, Block>(
&*client, block_hash,
&*client,
block_number,
) {
Ok(Some(local_receipt)) => {
let _ = tx.send(Ok(local_receipt));
Expand Down Expand Up @@ -471,7 +478,9 @@ where

Ok(Action::Empty)
} else {
crate::aux_schema::delete_execution_receipt::<_, Block>(&*self.client, block_hash)?;
// TODO: Problem comes if we somehow receive the same receipt again after deleting the local one
// from disk.
crate::aux_schema::delete_execution_receipt::<_, Block>(&*self.client, block_number)?;
Ok(Action::RebroadcastExecutionReceipt)
}
}
Expand Down
20 changes: 11 additions & 9 deletions cumulus/client/cirrus-executor/src/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,7 @@ where
let (header, body) = block.deconstruct();
let state_root = *header.state_root();
let header_hash = header.hash();
let header_number = *header.number();

let block_import_params = {
let mut import_block = BlockImportParams::new(BlockOrigin::Own, header);
Expand All @@ -192,15 +193,16 @@ where

let mut roots =
self.client.runtime_api().intermediate_roots(&BlockId::Hash(parent_hash))?;
roots.push(state_root.encode());
roots.push(
state_root
.encode()
.try_into()
.expect("State root uses the same Block hash type which can fit into [u8; 32]; qed"),
);

let trace_root = crate::merkle_tree::MerkleTree::new(
roots
.iter()
.map(|r| r.as_slice().try_into().expect("Storage root type is [u8; 32]; qed")),
)
.expect("Failed to construct merkle tree for execution trace")
.root();
let trace_root = crate::merkle_tree::MerkleTree::new(roots.clone())
.map_err(|e| sp_blockchain::Error::Application(e.into()))?
.root();

let trace = roots
.into_iter()
Expand All @@ -223,7 +225,7 @@ where

crate::aux_schema::write_execution_receipt::<_, Block>(
&*self.client,
header_hash,
header_number,
&execution_receipt,
)?;

Expand Down
7 changes: 5 additions & 2 deletions cumulus/pallets/executive/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,12 +74,15 @@ mod pallet {
/// Intermediate storage roots collected during the block execution.
#[pallet::storage]
#[pallet::getter(fn intermediate_roots)]
pub(super) type IntermediateRoots<T: Config> = StorageValue<_, Vec<Vec<u8>>, ValueQuery>;
pub(super) type IntermediateRoots<T: Config> = StorageValue<_, Vec<[u8; 32]>, ValueQuery>;
}

impl<T: Config> Pallet<T> {
pub(crate) fn push_root(root: Vec<u8>) {
IntermediateRoots::<T>::append(root);
IntermediateRoots::<T>::append(
TryInto::<[u8; 32]>::try_into(root)
.expect("root is a SCALE encoded hash which uses H256; qed"),
);
}
}

Expand Down
2 changes: 1 addition & 1 deletion cumulus/parachain-template/runtime/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ impl_runtime_apis! {
extrinsics.into_iter().map(|xt| (xt.signer(&lookup), xt)).collect()
}

fn intermediate_roots() -> Vec<Vec<u8>> {
fn intermediate_roots() -> Vec<[u8; 32]> {
ExecutivePallet::intermediate_roots()
}
}
Expand Down
2 changes: 1 addition & 1 deletion cumulus/primitives/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ sp_api::decl_runtime_apis! {
) -> Vec<(Option<AccountId>, <Block as BlockT>::Extrinsic)>;

/// Returns the intermediate storage roots in an encoded form.
fn intermediate_roots() -> Vec<Vec<u8>>;
fn intermediate_roots() -> Vec<[u8; 32]>;
}
}

0 comments on commit 67f06be

Please sign in to comment.