Skip to content

Commit

Permalink
Allow TaskExecutor to be used in async tests (#3178)
Browse files Browse the repository at this point in the history
# Description

Since the `TaskExecutor` currently requires a `Weak<Runtime>`, it's impossible to use it in an async test where the `Runtime` is created outside our scope. Whilst we *could* create a new `Runtime` instance inside the async test, dropping that `Runtime` would cause a panic (you can't drop a `Runtime` in an async context).

To address this issue, this PR creates the `enum Handle`, which supports either:

- A `Weak<Runtime>` (for use in our production code)
- A `Handle` to a runtime (for use in testing)

In theory, there should be no change to the behaviour of our production code (beyond some slightly different descriptions in HTTP 500 errors), or even our tests. If there is no change, you might ask *"why bother?"*. There are two PRs (#3070 and #3175) that are waiting on these fixes to introduce some new tests. Since we've added the EL to the `BeaconChain` (for the merge), we are now doing more async stuff in tests.

I've also added a `RuntimeExecutor` to the `BeaconChainTestHarness`. Whilst that's not immediately useful, it will become useful in the near future with all the new async testing.
  • Loading branch information
paulhauner committed May 16, 2022
1 parent 3f9e83e commit 38050fa
Show file tree
Hide file tree
Showing 20 changed files with 284 additions and 203 deletions.
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions beacon_node/beacon_chain/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ execution_layer = { path = "../execution_layer" }
sensitive_url = { path = "../../common/sensitive_url" }
superstruct = "0.5.0"
hex = "0.4.2"
exit-future = "0.2.0"

[[test]]
name = "beacon_chain_tests"
Expand Down
14 changes: 13 additions & 1 deletion beacon_node/beacon_chain/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use store::{Error as StoreError, HotColdDB, ItemStore, KeyValueStoreOp};
use task_executor::ShutdownReason;
use task_executor::{ShutdownReason, TaskExecutor};
use types::{
BeaconBlock, BeaconState, ChainSpec, Checkpoint, EthSpec, Graffiti, Hash256, PublicKeyBytes,
Signature, SignedBeaconBlock, Slot,
Expand Down Expand Up @@ -91,6 +91,7 @@ pub struct BeaconChainBuilder<T: BeaconChainTypes> {
// Pending I/O batch that is constructed during building and should be executed atomically
// alongside `PersistedBeaconChain` storage when `BeaconChainBuilder::build` is called.
pending_io_batch: Vec<KeyValueStoreOp>,
task_executor: Option<TaskExecutor>,
}

impl<TSlotClock, TEth1Backend, TEthSpec, THotStore, TColdStore>
Expand Down Expand Up @@ -129,6 +130,7 @@ where
slasher: None,
validator_monitor: None,
pending_io_batch: vec![],
task_executor: None,
}
}

Expand Down Expand Up @@ -182,6 +184,13 @@ where
self.log = Some(log);
self
}

/// Sets the task executor.
pub fn task_executor(mut self, task_executor: TaskExecutor) -> Self {
self.task_executor = Some(task_executor);
self
}

/// Attempt to load an existing eth1 cache from the builder's `Store`.
pub fn get_persisted_eth1_backend(&self) -> Result<Option<SszEth1>, String> {
let store = self
Expand Down Expand Up @@ -919,6 +928,7 @@ mod test {
use std::time::Duration;
use store::config::StoreConfig;
use store::{HotColdDB, MemoryStore};
use task_executor::test_utils::TestRuntime;
use types::{EthSpec, MinimalEthSpec, Slot};

type TestEthSpec = MinimalEthSpec;
Expand Down Expand Up @@ -952,10 +962,12 @@ mod test {
.expect("should create interop genesis state");

let (shutdown_tx, _) = futures::channel::mpsc::channel(1);
let runtime = TestRuntime::default();

let chain = BeaconChainBuilder::new(MinimalEthSpec)
.logger(log.clone())
.store(Arc::new(store))
.task_executor(runtime.task_executor.clone())
.genesis_state(genesis_state)
.expect("should build state using recent genesis")
.dummy_eth1_backend()
Expand Down
31 changes: 15 additions & 16 deletions beacon_node/beacon_chain/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,12 @@ use crate::{
};
use bls::get_withdrawal_credentials;
use execution_layer::{
test_utils::{
ExecutionBlockGenerator, ExecutionLayerRuntime, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK,
},
test_utils::{ExecutionBlockGenerator, MockExecutionLayer, DEFAULT_TERMINAL_BLOCK},
ExecutionLayer,
};
use futures::channel::mpsc::Receiver;
pub use genesis::{interop_genesis_state, DEFAULT_ETH1_BLOCK_HASH};
use int_to_bytes::int_to_bytes32;
use logging::test_logger;
use merkle_proof::MerkleTree;
use parking_lot::Mutex;
use parking_lot::RwLockWriteGuard;
Expand All @@ -41,7 +38,7 @@ use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use store::{config::StoreConfig, HotColdDB, ItemStore, LevelDB, MemoryStore};
use task_executor::ShutdownReason;
use task_executor::{test_utils::TestRuntime, ShutdownReason};
use tree_hash::TreeHash;
use types::sync_selection_proof::SyncSelectionProof;
pub use types::test_utils::generate_deterministic_keypairs;
Expand Down Expand Up @@ -151,8 +148,8 @@ pub struct Builder<T: BeaconChainTypes> {
initial_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
store_mutator: Option<BoxedMutator<T::EthSpec, T::HotStore, T::ColdStore>>,
execution_layer: Option<ExecutionLayer>,
execution_layer_runtime: Option<ExecutionLayerRuntime>,
mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
runtime: TestRuntime,
log: Logger,
}

Expand Down Expand Up @@ -255,6 +252,9 @@ where
Cold: ItemStore<E>,
{
pub fn new(eth_spec_instance: E) -> Self {
let runtime = TestRuntime::default();
let log = runtime.log.clone();

Self {
eth_spec_instance,
spec: None,
Expand All @@ -266,8 +266,8 @@ where
store_mutator: None,
execution_layer: None,
mock_execution_layer: None,
execution_layer_runtime: None,
log: test_logger(),
runtime,
log,
}
}

Expand Down Expand Up @@ -330,8 +330,6 @@ where
"execution layer already defined"
);

let el_runtime = ExecutionLayerRuntime::default();

let urls: Vec<SensitiveUrl> = urls
.iter()
.map(|s| SensitiveUrl::parse(*s))
Expand All @@ -346,19 +344,19 @@ where
};
let execution_layer = ExecutionLayer::from_config(
config,
el_runtime.task_executor.clone(),
el_runtime.log.clone(),
self.runtime.task_executor.clone(),
self.log.clone(),
)
.unwrap();

self.execution_layer = Some(execution_layer);
self.execution_layer_runtime = Some(el_runtime);
self
}

pub fn mock_execution_layer(mut self) -> Self {
let spec = self.spec.clone().expect("cannot build without spec");
let mock = MockExecutionLayer::new(
self.runtime.task_executor.clone(),
spec.terminal_total_difficulty,
DEFAULT_TERMINAL_BLOCK,
spec.terminal_block_hash,
Expand All @@ -383,7 +381,7 @@ where
pub fn build(self) -> BeaconChainHarness<BaseHarnessType<E, Hot, Cold>> {
let (shutdown_tx, shutdown_receiver) = futures::channel::mpsc::channel(1);

let log = test_logger();
let log = self.log;
let spec = self.spec.expect("cannot build without spec");
let seconds_per_slot = spec.seconds_per_slot;
let validator_keypairs = self
Expand All @@ -395,6 +393,7 @@ where
.custom_spec(spec)
.store(self.store.expect("cannot build without store"))
.store_migrator_config(MigratorConfig::default().blocking())
.task_executor(self.runtime.task_executor.clone())
.execution_layer(self.execution_layer)
.dummy_eth1_backend()
.expect("should build dummy backend")
Expand Down Expand Up @@ -434,8 +433,8 @@ where
chain: Arc::new(chain),
validator_keypairs,
shutdown_receiver: Arc::new(Mutex::new(shutdown_receiver)),
runtime: self.runtime,
mock_execution_layer: self.mock_execution_layer,
execution_layer_runtime: self.execution_layer_runtime,
rng: make_rng(),
}
}
Expand All @@ -451,9 +450,9 @@ pub struct BeaconChainHarness<T: BeaconChainTypes> {
pub chain: Arc<BeaconChain<T>>,
pub spec: ChainSpec,
pub shutdown_receiver: Arc<Mutex<Receiver<ShutdownReason>>>,
pub runtime: TestRuntime,

pub mock_execution_layer: Option<MockExecutionLayer<T::EthSpec>>,
pub execution_layer_runtime: Option<ExecutionLayerRuntime>,

pub rng: Mutex<StdRng>,
}
Expand Down
1 change: 1 addition & 0 deletions beacon_node/client/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ where
let builder = BeaconChainBuilder::new(eth_spec_instance)
.logger(context.log().clone())
.store(store)
.task_executor(context.executor.clone())
.custom_spec(spec.clone())
.chain_config(chain_config)
.graffiti(graffiti)
Expand Down
28 changes: 13 additions & 15 deletions beacon_node/execution_layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,11 +304,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = Result<V, Error>>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
runtime.block_on(generate_future(self))
}
Expand All @@ -322,11 +318,7 @@ impl ExecutionLayer {
T: Fn(&'a Self) -> U,
U: Future<Output = V>,
{
let runtime = self
.executor()
.runtime()
.upgrade()
.ok_or(Error::ShuttingDown)?;
let runtime = self.executor().handle().ok_or(Error::ShuttingDown)?;
// TODO(merge): respect the shutdown signal.
Ok(runtime.block_on(generate_future(self)))
}
Expand Down Expand Up @@ -1263,13 +1255,15 @@ impl ExecutionLayer {
mod test {
use super::*;
use crate::test_utils::MockExecutionLayer as GenericMockExecutionLayer;
use task_executor::test_utils::TestRuntime;
use types::MainnetEthSpec;

type MockExecutionLayer = GenericMockExecutionLayer<MainnetEthSpec>;

#[tokio::test]
async fn produce_three_valid_pos_execution_blocks() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.produce_valid_execution_payload_on_head()
.await
Expand All @@ -1281,7 +1275,8 @@ mod test {

#[tokio::test]
async fn finds_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_block_prior_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1300,7 +1295,8 @@ mod test {

#[tokio::test]
async fn verifies_valid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1316,7 +1312,8 @@ mod test {

#[tokio::test]
async fn rejects_invalid_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, terminal_block| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand All @@ -1334,7 +1331,8 @@ mod test {

#[tokio::test]
async fn rejects_unknown_terminal_block_hash() {
MockExecutionLayer::default_params()
let runtime = TestRuntime::default();
MockExecutionLayer::default_params(runtime.task_executor.clone())
.move_to_terminal_block()
.with_terminal_block(|spec, el, _| async move {
el.engines().upcheck_not_synced(Logging::Disabled).await;
Expand Down
Loading

0 comments on commit 38050fa

Please sign in to comment.