Skip to content

Commit

Permalink
[testloop] Refactor builder to separate node state and shared state (#…
Browse files Browse the repository at this point in the history
…13041)

The big function `setup_client` in builder now does not take builder as
a parameter.

We have extracted out the common state from builder into `SharedState`
while we build `NodeState` for each individual client.

This is useful as now if we have the node_state and shared_state, we can
independently initialize nodes. This is useful for implementing node
restarts and adding new nodes to test environment outside builder.

Couple of other refactoring changes included.

Before
```
    fn setup_client(
        &mut self,
        idx: usize,
        tempdir: &TempDir,
        is_archival: bool,
        network_shared_state: &TestLoopNetworkSharedState,
    ) -> TestData {}
```

After
```
    fn setup_client(
        identifier: &str,
        test_loop: &mut TestLoopV2,
        node_handle: NodeState,
        shared_state: &SharedState,
    ) -> TestData {}
```
  • Loading branch information
shreyan-gupta authored Mar 7, 2025
1 parent 9dc2234 commit 01373eb
Show file tree
Hide file tree
Showing 38 changed files with 772 additions and 626 deletions.
3 changes: 1 addition & 2 deletions test-loop-tests/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg(test)]
mod builder;
mod env;
mod setup;
mod tests;
mod utils;
656 changes: 348 additions & 308 deletions test-loop-tests/src/builder.rs → test-loop-tests/src/setup/builder.rs

Large diffs are not rendered by default.

89 changes: 14 additions & 75 deletions test-loop-tests/src/env.rs → test-loop-tests/src/setup/env.rs
Original file line number Diff line number Diff line change
@@ -1,33 +1,23 @@
use near_async::messaging::{CanSend, IntoMultiSender, IntoSender, LateBoundSender, Sender};
use near_async::messaging::{CanSend, LateBoundSender};
use near_async::test_loop::TestLoopV2;
use near_async::test_loop::data::{TestLoopData, TestLoopDataHandle};
use near_async::test_loop::data::TestLoopData;
use near_async::test_loop::sender::TestLoopSender;
use near_async::time::Duration;
use near_chunks::adapter::ShardsManagerRequestFromClient;
use near_chunks::shards_manager_actor::ShardsManagerActor;
use near_client::client_actor::ClientActorInner;
use near_client::{PartialWitnessActor, ViewClientActorInner};
use near_jsonrpc::ViewClientSenderForRpc;
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::state_witness::PartialWitnessSenderForNetwork;
use near_network::test_loop::{
ClientSenderForTestLoopNetwork, TestLoopPeerManagerActor, ViewClientSenderForTestLoopNetwork,
};
use near_primitives::network::PeerId;
use near_primitives::sharding::{ChunkHash, ShardChunkHeader};
use near_primitives::types::AccountId;
use near_primitives_core::types::BlockHeight;
use nearcore::state_sync::StateSyncDumper;
use std::collections::HashMap;
use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex};
use tempfile::TempDir;

const NETWORK_DELAY: Duration = Duration::milliseconds(10);
use super::state::{SharedState, TestData};

pub struct TestLoopEnv {
pub test_loop: TestLoopV2,
pub datas: Vec<TestData>,
pub tempdir: TempDir,
pub node_datas: Vec<TestData>,
pub shared_state: SharedState,
}

impl TestLoopEnv {
Expand All @@ -37,7 +27,11 @@ impl TestLoopEnv {
/// Needed because for smaller heights blocks may not get all chunks and/or
/// approvals.
pub fn warmup(self) -> Self {
let Self { mut test_loop, datas, tempdir } = self;
let Self { mut test_loop, node_datas: datas, shared_state } = self;

// This may happen if you're calling warmup twice or have set skip_warmup in builder.
assert!(shared_state.warmup_pending.load(Ordering::Relaxed), "warmup already done");
shared_state.warmup_pending.store(false, Ordering::Relaxed);

let client_handle = datas[0].client_sender.actor_handle();
let genesis_height = test_loop.data.get(&client_handle).client.chain.genesis().height();
Expand All @@ -60,8 +54,7 @@ impl TestLoopEnv {
test_loop.send_adhoc_event("assertions".to_owned(), Box::new(event));
}
test_loop.run_instant();

Self { test_loop, datas, tempdir }
Self { test_loop, node_datas: datas, shared_state }
}

pub fn kill_node(&mut self, identifier: &str) {
Expand All @@ -76,12 +69,12 @@ impl TestLoopEnv {
/// Returns the test loop data dir, if the caller wishes to reuse it for another test loop.
pub fn shutdown_and_drain_remaining_events(mut self, timeout: Duration) -> TempDir {
// State sync dumper is not an Actor, handle stopping separately.
for node_data in self.datas {
for node_data in self.node_datas {
self.test_loop.data.get_mut(&node_data.state_sync_dumper_handle).stop();
}

self.test_loop.shutdown_and_drain_remaining_events(timeout);
self.tempdir
self.shared_state.tempdir
}
}

Expand Down Expand Up @@ -142,57 +135,3 @@ impl CanSend<ShardsManagerRequestFromClient> for ClientToShardsManagerSender {
self.sender.send(message);
}
}

#[derive(Clone)]
pub struct TestData {
pub account_id: AccountId,
pub peer_id: PeerId,
pub client_sender: TestLoopSender<ClientActorInner>,
pub view_client_sender: TestLoopSender<ViewClientActorInner>,
pub shards_manager_sender: TestLoopSender<ShardsManagerActor>,
pub partial_witness_sender: TestLoopSender<PartialWitnessActor>,
pub peer_manager_sender: TestLoopSender<TestLoopPeerManagerActor>,
pub state_sync_dumper_handle: TestLoopDataHandle<StateSyncDumper>,
}

impl From<&TestData> for AccountId {
fn from(data: &TestData) -> AccountId {
data.account_id.clone()
}
}

impl From<&TestData> for PeerId {
fn from(data: &TestData) -> PeerId {
data.peer_id.clone()
}
}

impl From<&TestData> for ClientSenderForTestLoopNetwork {
fn from(data: &TestData) -> ClientSenderForTestLoopNetwork {
data.client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for ViewClientSenderForRpc {
fn from(data: &TestData) -> ViewClientSenderForRpc {
data.view_client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for ViewClientSenderForTestLoopNetwork {
fn from(data: &TestData) -> ViewClientSenderForTestLoopNetwork {
data.view_client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for PartialWitnessSenderForNetwork {
fn from(data: &TestData) -> PartialWitnessSenderForNetwork {
data.partial_witness_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for Sender<ShardsManagerRequestFromNetwork> {
fn from(data: &TestData) -> Sender<ShardsManagerRequestFromNetwork> {
data.shards_manager_sender.clone().with_delay(NETWORK_DELAY).into_sender()
}
}
3 changes: 3 additions & 0 deletions test-loop-tests/src/setup/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod builder;
pub mod env;
pub mod state;
118 changes: 118 additions & 0 deletions test-loop-tests/src/setup/state.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::sync::atomic::AtomicBool;
use std::sync::{Arc, Mutex};

use near_async::messaging::{IntoMultiSender, IntoSender, Sender};
use near_async::test_loop::data::TestLoopDataHandle;
use near_async::test_loop::sender::TestLoopSender;
use near_async::time::Duration;
use near_chain_configs::{ClientConfig, Genesis};
use near_chunks::shards_manager_actor::ShardsManagerActor;
use near_client::client_actor::ClientActorInner;
use near_client::{PartialWitnessActor, ViewClientActorInner};
use near_jsonrpc::ViewClientSenderForRpc;
use near_network::shards_manager::ShardsManagerRequestFromNetwork;
use near_network::state_witness::PartialWitnessSenderForNetwork;
use near_network::test_loop::{
ClientSenderForTestLoopNetwork, TestLoopNetworkSharedState, TestLoopPeerManagerActor,
ViewClientSenderForTestLoopNetwork,
};
use near_parameters::RuntimeConfigStore;
use near_primitives::epoch_manager::EpochConfigStore;
use near_primitives::network::PeerId;
use near_primitives::types::AccountId;
use near_primitives::upgrade_schedule::ProtocolUpgradeVotingSchedule;
use near_store::Store;
use nearcore::state_sync::StateSyncDumper;
use tempfile::TempDir;

use super::builder::DropConditionKind;
use super::env::TestLoopChunksStorage;

const NETWORK_DELAY: Duration = Duration::milliseconds(10);

/// This is the state associate with the test loop environment.
/// This state is shared across all nodes and none of it belongs to a specific node.
pub struct SharedState {
pub genesis: Genesis,
/// Directory of the current test. This is automatically deleted once tempdir goes out of scope.
pub tempdir: TempDir,
pub epoch_config_store: EpochConfigStore,
pub runtime_config_store: Option<RuntimeConfigStore>,
/// Shared state across all the network actors. It handles the mapping between AccountId,
/// PeerId, and the route back CryptoHash, so that individual network actors can do routing.
pub network_shared_state: TestLoopNetworkSharedState,
pub upgrade_schedule: ProtocolUpgradeVotingSchedule,
/// Stores all chunks ever observed on chain. Used by drop conditions to simulate network drops.
pub chunks_storage: Arc<Mutex<TestLoopChunksStorage>>,
/// List of drop conditions that apply to all nodes in the network.
pub drop_condition_kinds: Vec<DropConditionKind>,
pub load_memtries_for_tracked_shards: bool,
/// Flag to indicate if warmup is pending. This is used to ensure that warmup is only done once.
pub warmup_pending: Arc<AtomicBool>,
}

/// This is the state associated with each node in the test loop environment before being built.
/// The setup_client function will be called for each node to build the node and return TestData
pub struct NodeState {
pub account_id: AccountId,
pub client_config: ClientConfig,
pub store: Store,
pub split_store: Option<Store>,
}

/// This is the state associated with each node in the test loop environment after being built.
/// This state is specific to each node and is not shared across nodes.
/// We can access each of the individual actors and senders from this state.
#[derive(Clone)]
pub struct TestData {
pub account_id: AccountId,
pub peer_id: PeerId,
pub client_sender: TestLoopSender<ClientActorInner>,
pub view_client_sender: TestLoopSender<ViewClientActorInner>,
pub shards_manager_sender: TestLoopSender<ShardsManagerActor>,
pub partial_witness_sender: TestLoopSender<PartialWitnessActor>,
pub peer_manager_sender: TestLoopSender<TestLoopPeerManagerActor>,
pub state_sync_dumper_handle: TestLoopDataHandle<StateSyncDumper>,
}

impl From<&TestData> for AccountId {
fn from(data: &TestData) -> AccountId {
data.account_id.clone()
}
}

impl From<&TestData> for PeerId {
fn from(data: &TestData) -> PeerId {
data.peer_id.clone()
}
}

impl From<&TestData> for ClientSenderForTestLoopNetwork {
fn from(data: &TestData) -> ClientSenderForTestLoopNetwork {
data.client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for ViewClientSenderForRpc {
fn from(data: &TestData) -> ViewClientSenderForRpc {
data.view_client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for ViewClientSenderForTestLoopNetwork {
fn from(data: &TestData) -> ViewClientSenderForTestLoopNetwork {
data.view_client_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for PartialWitnessSenderForNetwork {
fn from(data: &TestData) -> PartialWitnessSenderForNetwork {
data.partial_witness_sender.clone().with_delay(NETWORK_DELAY).into_multi_sender()
}
}

impl From<&TestData> for Sender<ShardsManagerRequestFromNetwork> {
fn from(data: &TestData) -> Sender<ShardsManagerRequestFromNetwork> {
data.shards_manager_sender.clone().with_delay(NETWORK_DELAY).into_sender()
}
}
12 changes: 7 additions & 5 deletions test-loop-tests/src/tests/bandwidth_scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,9 @@ use testlib::bandwidth_scheduler::{
TestScenario, TestScenarioBuilder, TestSummary,
};

use crate::builder::TestLoopBuilder;
use crate::env::{TestData, TestLoopEnv};
use crate::setup::builder::TestLoopBuilder;
use crate::setup::env::TestLoopEnv;
use crate::setup::state::TestData;
use crate::utils::transactions::{TransactionRunner, run_txs_parallel};
use crate::utils::{ONE_NEAR, TGAS};

Expand Down Expand Up @@ -162,12 +163,13 @@ fn run_bandwidth_scheduler_test(scenario: TestScenario, tx_concurrency: usize) -
.build();
let epoch_config_store = TestEpochConfigBuilder::build_store_from_genesis(&genesis);

let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = TestLoopBuilder::new()
let TestLoopEnv { mut test_loop, node_datas, shared_state } = TestLoopBuilder::new()
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(vec![node_account])
.drop_chunks_by_height(missing_chunks_map)
.build();
.build()
.warmup();

// Initialize the workload generator.
let mut workload_generator = WorkloadGenerator::init(
Expand Down Expand Up @@ -219,7 +221,7 @@ fn run_bandwidth_scheduler_test(scenario: TestScenario, tx_concurrency: usize) -
let bandwidth_stats =
analyze_workload_blocks(first_height.unwrap(), last_height.unwrap(), client);

TestLoopEnv { test_loop, datas: node_datas, tempdir }
TestLoopEnv { test_loop, node_datas, shared_state }
.shutdown_and_drain_remaining_events(Duration::seconds(20));

let summary = bandwidth_stats.summarize(&active_links);
Expand Down
14 changes: 9 additions & 5 deletions test-loop-tests/src/tests/chunk_validator_kickout.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use crate::builder::TestLoopBuilder;
use crate::env::TestLoopEnv;
use crate::setup::builder::TestLoopBuilder;
use crate::setup::env::TestLoopEnv;
use crate::utils::ONE_NEAR;
use crate::utils::validators::get_epoch_all_validators;
use itertools::Itertools;
Expand Down Expand Up @@ -87,8 +87,12 @@ fn run_test_chunk_validator_kickout(accounts: Vec<AccountId>, test_case: TestCas
.target_validator_mandates_per_shard(num_validator_mandates_per_shard)
.build_store_for_genesis_protocol_version();

let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } =
builder.genesis(genesis).epoch_config_store(epoch_config_store).clients(clients).build();
let TestLoopEnv { mut test_loop, node_datas, shared_state } = builder
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(clients)
.build()
.warmup();

// Run chain until our targeted chunk validator is (not) kicked out.
let client_handle = node_datas[0].client_sender.actor_handle();
Expand Down Expand Up @@ -136,7 +140,7 @@ fn run_test_chunk_validator_kickout(accounts: Vec<AccountId>, test_case: TestCas
Duration::seconds((5 * epoch_length) as i64),
);

TestLoopEnv { test_loop, datas: node_datas, tempdir }
TestLoopEnv { test_loop, node_datas, shared_state }
.shutdown_and_drain_remaining_events(Duration::seconds(20));
}

Expand Down
12 changes: 7 additions & 5 deletions test-loop-tests/src/tests/congestion_control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ use near_o11y::testonly::init_test_logger;
use near_primitives::shard_layout::ShardLayout;
use near_primitives::types::{AccountId, BlockHeight};

use crate::builder::TestLoopBuilder;
use crate::env::{TestData, TestLoopEnv};
use crate::setup::builder::TestLoopBuilder;
use crate::setup::env::TestLoopEnv;
use crate::setup::state::TestData;
use crate::utils::transactions::{call_contract, check_txs, deploy_contract, make_accounts};
use crate::utils::{ONE_NEAR, TGAS};

Expand All @@ -35,7 +36,7 @@ fn slow_test_congestion_control_simple() {
accounts.push(contract_id.clone());

let (env, rpc_id) = setup(&accounts);
let TestLoopEnv { mut test_loop, datas: node_datas, tempdir } = env;
let TestLoopEnv { mut test_loop, node_datas, shared_state } = env;

// Test

Expand All @@ -54,7 +55,7 @@ fn slow_test_congestion_control_simple() {

// Give the test a chance to finish off remaining events in the event loop, which can
// be important for properly shutting down the nodes.
TestLoopEnv { test_loop, datas: node_datas, tempdir }
TestLoopEnv { test_loop, node_datas, shared_state }
.shutdown_and_drain_remaining_events(Duration::seconds(20));
}

Expand Down Expand Up @@ -92,7 +93,8 @@ fn setup(accounts: &Vec<AccountId>) -> (TestLoopEnv, AccountId) {
.genesis(genesis)
.epoch_config_store(epoch_config_store)
.clients(clients)
.build();
.build()
.warmup();
(env, rpc_id.clone())
}

Expand Down
Loading

0 comments on commit 01373eb

Please sign in to comment.