diff --git a/polkadot/node/core/approval-voting/src/lib.rs b/polkadot/node/core/approval-voting/src/lib.rs index 456ae319787b..8cc16a6e1ec1 100644 --- a/polkadot/node/core/approval-voting/src/lib.rs +++ b/polkadot/node/core/approval-voting/src/lib.rs @@ -1253,13 +1253,20 @@ async fn handle_actions( Action::BecomeActive => { *mode = Mode::Active; - let messages = distribution_messages_for_activation( + let (messages, next_actions) = distribution_messages_for_activation( + ctx, overlayed_db, state, delayed_approvals_timers, - )?; + session_info_provider, + ) + .await?; ctx.send_messages(messages.into_iter()).await; + let next_actions: Vec = + next_actions.into_iter().map(|v| v.clone()).chain(actions_iter).collect(); + + actions_iter = next_actions.into_iter(); }, Action::Conclude => { conclude = true; @@ -1313,15 +1320,19 @@ fn get_assignment_core_indices( } } -fn distribution_messages_for_activation( +#[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] +async fn distribution_messages_for_activation( + ctx: &mut Context, db: &OverlayedBackend<'_, impl Backend>, state: &State, delayed_approvals_timers: &mut DelayedApprovalTimer, -) -> SubsystemResult> { + session_info_provider: &mut RuntimeInfo, +) -> SubsystemResult<(Vec, Vec)> { let all_blocks: Vec = db.load_all_blocks()?; let mut approval_meta = Vec::with_capacity(all_blocks.len()); let mut messages = Vec::new(); + let mut actions = Vec::new(); messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value. @@ -1396,16 +1407,60 @@ fn distribution_messages_for_activation( &claimed_core_indices, &block_entry, ) { - Ok(bitfield) => messages.push( - ApprovalDistributionMessage::DistributeAssignment( - IndirectAssignmentCertV2 { - block_hash, - validator: assignment.validator_index(), - cert: assignment.cert().clone(), - }, - bitfield, - ), - ), + Ok(bitfield) => { + gum::debug!( + target: LOG_TARGET, + candidate_hash = ?candidate_entry.candidate_receipt().hash(), + ?block_hash, + "Discovered, triggered assignment, not approved yet", + ); + + let indirect_cert = IndirectAssignmentCertV2 { + block_hash, + validator: assignment.validator_index(), + cert: assignment.cert().clone(), + }; + messages.push( + ApprovalDistributionMessage::DistributeAssignment( + indirect_cert.clone(), + bitfield.clone(), + ), + ); + + if !block_entry + .candidate_is_pending_signature(*candidate_hash) + { + let ExtendedSessionInfo { ref executor_params, .. } = + match get_extended_session_info( + session_info_provider, + ctx.sender(), + block_entry.block_hash(), + block_entry.session(), + ) + .await + { + Some(i) => i, + None => continue, + }; + + actions.push(Action::LaunchApproval { + claimed_candidate_indices: bitfield, + candidate_hash: candidate_entry + .candidate_receipt() + .hash(), + indirect_cert, + assignment_tranche: assignment.tranche(), + relay_block_hash: block_hash, + session: block_entry.session(), + executor_params: executor_params.clone(), + candidate: candidate_entry + .candidate_receipt() + .clone(), + backing_group: approval_entry.backing_group(), + distribute_assignment: false, + }); + } + }, Err(err) => { // Should never happen. If we fail here it means the // assignment is null (no cores claimed). @@ -1496,7 +1551,7 @@ fn distribution_messages_for_activation( } messages[0] = ApprovalDistributionMessage::NewBlocks(approval_meta); - Ok(messages) + Ok((messages, actions)) } // Handle an incoming signal from the overseer. Returns true if execution should conclude. diff --git a/polkadot/node/core/approval-voting/src/persisted_entries.rs b/polkadot/node/core/approval-voting/src/persisted_entries.rs index ef47bdb2213a..b924a1b52ccf 100644 --- a/polkadot/node/core/approval-voting/src/persisted_entries.rs +++ b/polkadot/node/core/approval-voting/src/persisted_entries.rs @@ -588,6 +588,13 @@ impl BlockEntry { !self.candidates_pending_signature.is_empty() } + /// Returns true if candidate hash is in the queue for a signature. + pub fn candidate_is_pending_signature(&self, candidate_hash: CandidateHash) -> bool { + self.candidates_pending_signature + .values() + .any(|context| context.candidate_hash == candidate_hash) + } + /// Candidate hashes for candidates pending signatures fn candidate_hashes_pending_signature(&self) -> Vec { self.candidates_pending_signature diff --git a/polkadot/node/core/approval-voting/src/tests.rs b/polkadot/node/core/approval-voting/src/tests.rs index 9220e84a2554..c9053232a4c8 100644 --- a/polkadot/node/core/approval-voting/src/tests.rs +++ b/polkadot/node/core/approval-voting/src/tests.rs @@ -78,6 +78,7 @@ struct TestSyncOracle { struct TestSyncOracleHandle { done_syncing_receiver: oneshot::Receiver<()>, + is_major_syncing: Arc, } impl TestSyncOracleHandle { @@ -108,8 +109,9 @@ impl SyncOracle for TestSyncOracle { fn make_sync_oracle(val: bool) -> (Box, TestSyncOracleHandle) { let (tx, rx) = oneshot::channel(); let flag = Arc::new(AtomicBool::new(val)); - let oracle = TestSyncOracle { flag, done_syncing_sender: Arc::new(Mutex::new(Some(tx))) }; - let handle = TestSyncOracleHandle { done_syncing_receiver: rx }; + let oracle = + TestSyncOracle { flag: flag.clone(), done_syncing_sender: Arc::new(Mutex::new(Some(tx))) }; + let handle = TestSyncOracleHandle { done_syncing_receiver: rx, is_major_syncing: flag }; (Box::new(oracle), handle) } @@ -465,6 +467,7 @@ struct HarnessConfigBuilder { clock: Option, backend: Option, assignment_criteria: Option>, + major_syncing: bool, } impl HarnessConfigBuilder { @@ -476,9 +479,19 @@ impl HarnessConfigBuilder { self } + pub fn major_syncing(&mut self, value: bool) -> &mut Self { + self.major_syncing = value; + self + } + + pub fn backend(&mut self, store: TestStore) -> &mut Self { + self.backend = Some(store); + self + } + pub fn build(&mut self) -> HarnessConfig { let (sync_oracle, sync_oracle_handle) = - self.sync_oracle.take().unwrap_or_else(|| make_sync_oracle(false)); + self.sync_oracle.take().unwrap_or_else(|| make_sync_oracle(self.major_syncing)); let assignment_criteria = self .assignment_criteria @@ -736,11 +749,13 @@ struct BlockConfig { slot: Slot, candidates: Option>, session_info: Option, + end_syncing: bool, } struct ChainBuilder { blocks_by_hash: HashMap, blocks_at_height: BTreeMap>, + is_major_syncing: Arc, } impl ChainBuilder { @@ -748,16 +763,28 @@ impl ChainBuilder { const GENESIS_PARENT_HASH: Hash = Hash::repeat_byte(0x00); pub fn new() -> Self { - let mut builder = - Self { blocks_by_hash: HashMap::new(), blocks_at_height: BTreeMap::new() }; + let mut builder = Self { + blocks_by_hash: HashMap::new(), + blocks_at_height: BTreeMap::new(), + is_major_syncing: Arc::new(AtomicBool::new(false)), + }; builder.add_block_inner( Self::GENESIS_HASH, Self::GENESIS_PARENT_HASH, 0, - BlockConfig { slot: Slot::from(0), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(0), + candidates: None, + session_info: None, + end_syncing: false, + }, ); builder } + pub fn major_syncing(&mut self, major_syncing: Arc) -> &mut Self { + self.is_major_syncing = major_syncing; + self + } pub fn add_block( &mut self, @@ -808,8 +835,16 @@ impl ChainBuilder { } ancestry.reverse(); - import_block(overseer, ancestry.as_ref(), *number, block_config, false, i > 0) - .await; + import_block( + overseer, + ancestry.as_ref(), + *number, + block_config, + false, + i > 0, + self.is_major_syncing.clone(), + ) + .await; let _: Option<()> = future::pending().timeout(Duration::from_millis(100)).await; } } @@ -863,6 +898,7 @@ async fn import_block( config: &BlockConfig, gap: bool, fork: bool, + major_syncing: Arc, ) { let (new_head, new_header) = &hashes[hashes.len() - 1]; let candidates = config.candidates.clone().unwrap_or(vec![( @@ -891,6 +927,12 @@ async fn import_block( h_tx.send(Ok(Some(new_header.clone()))).unwrap(); } ); + + let is_major_syncing = major_syncing.load(Ordering::SeqCst); + if config.end_syncing { + major_syncing.store(false, Ordering::SeqCst); + } + if !fork { let mut _ancestry_step = 0; if gap { @@ -931,7 +973,7 @@ async fn import_block( } } - if number > 0 { + if number > 0 && !is_major_syncing { assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi( @@ -944,7 +986,6 @@ async fn import_block( c_tx.send(Ok(inclusion_events)).unwrap(); } ); - assert_matches!( overseer_recv(overseer).await, AllMessages::RuntimeApi( @@ -984,14 +1025,14 @@ async fn import_block( ); } - if number == 0 { + if number == 0 && !is_major_syncing { assert_matches!( overseer_recv(overseer).await, AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks(v)) => { assert_eq!(v.len(), 0usize); } ); - } else { + } else if number > 0 && !is_major_syncing { if !fork { // SessionInfo won't be called for forks - it's already cached assert_matches!( @@ -1031,20 +1072,23 @@ async fn import_block( ); } - assert_matches!( - overseer_recv(overseer).await, - AllMessages::ApprovalDistribution( - ApprovalDistributionMessage::NewBlocks(mut approval_vec) - ) => { - assert_eq!(approval_vec.len(), 1); - let metadata = approval_vec.pop().unwrap(); - let hash = &hashes[number as usize]; - let parent_hash = &hashes[(number - 1) as usize]; - assert_eq!(metadata.hash, hash.0.clone()); - assert_eq!(metadata.parent_hash, parent_hash.0.clone()); - assert_eq!(metadata.slot, config.slot); - } - ); + if !is_major_syncing { + assert_matches!( + overseer_recv(overseer).await, + + AllMessages::ApprovalDistribution( + ApprovalDistributionMessage::NewBlocks(mut approval_vec) + ) => { + assert_eq!(approval_vec.len(), 1); + let metadata = approval_vec.pop().unwrap(); + let hash = &hashes[number as usize]; + let parent_hash = &hashes[(number - 1) as usize]; + assert_eq!(metadata.hash, hash.0.clone()); + assert_eq!(metadata.parent_hash, parent_hash.0.clone()); + assert_eq!(metadata.slot, config.slot); + } + ); + } } } @@ -1072,7 +1116,7 @@ fn subsystem_rejects_bad_assignment_ok_criteria() { block_hash, head, 1, - BlockConfig { slot, candidates: None, session_info: None }, + BlockConfig { slot, candidates: None, session_info: None, end_syncing: false }, ); builder.build(&mut virtual_overseer).await; @@ -1135,7 +1179,7 @@ fn subsystem_rejects_bad_assignment_err_criteria() { block_hash, head, 1, - BlockConfig { slot, candidates: None, session_info: None }, + BlockConfig { slot, candidates: None, session_info: None, end_syncing: false }, ); builder.build(&mut virtual_overseer).await; @@ -1240,6 +1284,7 @@ fn subsystem_rejects_approval_if_no_candidate_entry() { slot, candidates: Some(vec![(candidate_descriptor, CoreIndex(1), GroupIndex(1))]), session_info: None, + end_syncing: false, }, ); builder.build(&mut virtual_overseer).await; @@ -1345,7 +1390,12 @@ fn subsystem_rejects_approval_before_assignment() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1398,7 +1448,12 @@ fn subsystem_rejects_assignment_in_future() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(0), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(0), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1472,6 +1527,7 @@ fn subsystem_accepts_duplicate_assignment() { (candidate_receipt2, CoreIndex(1), GroupIndex(1)), ]), session_info: None, + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -1537,7 +1593,12 @@ fn subsystem_rejects_assignment_with_unknown_candidate() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1582,7 +1643,12 @@ fn subsystem_rejects_oversized_bitfields() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1650,7 +1716,12 @@ fn subsystem_accepts_and_imports_approval_after_assignment() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1741,6 +1812,7 @@ fn subsystem_second_approval_import_only_schedules_wakeups() { slot: Slot::from(0), candidates: None, session_info: Some(session_info), + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -1824,7 +1896,12 @@ fn subsystem_assignment_import_updates_candidate_entry_and_schedules_wakeup() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1873,7 +1950,12 @@ fn subsystem_process_wakeup_schedules_wakeup() { block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot: Slot::from(1), candidates: None, session_info: None }, + BlockConfig { + slot: Slot::from(1), + candidates: None, + session_info: None, + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -1925,7 +2007,7 @@ fn linear_import_act_on_leaf() { hash, head, i, - BlockConfig { slot, candidates: None, session_info: None }, + BlockConfig { slot, candidates: None, session_info: None, end_syncing: false }, ); head = hash; } @@ -1983,7 +2065,7 @@ fn forkful_import_at_same_height_act_on_leaf() { hash, head, i, - BlockConfig { slot, candidates: None, session_info: None }, + BlockConfig { slot, candidates: None, session_info: None, end_syncing: false }, ); head = hash; } @@ -1997,7 +2079,7 @@ fn forkful_import_at_same_height_act_on_leaf() { hash, head, session, - BlockConfig { slot, candidates: None, session_info: None }, + BlockConfig { slot, candidates: None, session_info: None, end_syncing: false }, ); } builder.build(&mut virtual_overseer).await; @@ -2168,6 +2250,7 @@ fn import_checked_approval_updates_entries_and_schedules() { slot, candidates: Some(vec![(candidate_descriptor, CoreIndex(0), GroupIndex(0))]), session_info: Some(session_info), + end_syncing: false, }, ); builder.build(&mut virtual_overseer).await; @@ -2323,6 +2406,7 @@ fn subsystem_import_checked_approval_sets_one_block_bit_at_a_time() { (candidate_receipt2, CoreIndex(1), GroupIndex(1)), ]), session_info: Some(session_info), + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -2445,6 +2529,7 @@ fn approved_ancestor_test( slot: Slot::from(i as u64), candidates: Some(vec![(candidate_receipt, CoreIndex(0), GroupIndex(0))]), session_info: None, + end_syncing: false, }, ); } @@ -2623,13 +2708,19 @@ fn subsystem_validate_approvals_cache() { slot, candidates: candidates.clone(), session_info: Some(session_info.clone()), + end_syncing: false, }, ) .add_block( fork_block_hash, ChainBuilder::GENESIS_HASH, 1, - BlockConfig { slot, candidates, session_info: Some(session_info) }, + BlockConfig { + slot, + candidates, + session_info: Some(session_info), + end_syncing: false, + }, ) .build(&mut virtual_overseer) .await; @@ -2740,6 +2831,7 @@ fn subsystem_doesnt_distribute_duplicate_compact_assignments() { (candidate_receipt2.clone(), CoreIndex(1), GroupIndex(1)), ]), session_info: None, + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -2997,6 +3089,7 @@ where slot, candidates: Some(vec![(candidate_receipt, CoreIndex(0), GroupIndex(2))]), session_info: Some(session_info), + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -3314,6 +3407,7 @@ fn pre_covers_dont_stall_approval() { slot, candidates: Some(vec![(candidate_descriptor, CoreIndex(0), GroupIndex(0))]), session_info: Some(session_info), + end_syncing: false, }, ); builder.build(&mut virtual_overseer).await; @@ -3491,6 +3585,7 @@ fn waits_until_approving_assignments_are_old_enough() { slot, candidates: Some(vec![(candidate_descriptor, CoreIndex(0), GroupIndex(0))]), session_info: Some(session_info), + end_syncing: false, }, ); builder.build(&mut virtual_overseer).await; @@ -3705,6 +3800,7 @@ fn test_approval_is_sent_on_max_approval_coalesce_count() { slot, candidates: candidates.clone(), session_info: Some(session_info.clone()), + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -3943,8 +4039,7 @@ fn test_approval_is_sent_on_max_approval_coalesce_wait() { let store = config.backend(); test_harness(config, |test_harness| async move { - let TestHarness { mut virtual_overseer, clock, sync_oracle_handle: _sync_oracle_handle } = - test_harness; + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle: _ } = test_harness; assert_matches!( overseer_recv(&mut virtual_overseer).await, @@ -4006,6 +4101,7 @@ fn test_approval_is_sent_on_max_approval_coalesce_wait() { slot, candidates: candidates.clone(), session_info: Some(session_info.clone()), + end_syncing: false, }, ) .build(&mut virtual_overseer) @@ -4040,3 +4136,569 @@ fn test_approval_is_sent_on_max_approval_coalesce_wait() { virtual_overseer }); } + +// Builds a chain with a fork where both relay blocks include the same candidate. +async fn build_chain_with_two_blocks_with_one_candidate_each( + block_hash1: Hash, + block_hash2: Hash, + slot: Slot, + sync_oracle_handle: TestSyncOracleHandle, + candidate_receipt: CandidateReceipt, +) -> (ChainBuilder, SessionInfo) { + let validators = vec![ + Sr25519Keyring::Alice, + Sr25519Keyring::Bob, + Sr25519Keyring::Charlie, + Sr25519Keyring::Dave, + Sr25519Keyring::Eve, + ]; + let session_info = SessionInfo { + validator_groups: IndexedVec::>::from(vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2)], + vec![ValidatorIndex(3), ValidatorIndex(4)], + ]), + ..session_info(&validators) + }; + + let candidates = Some(vec![(candidate_receipt.clone(), CoreIndex(0), GroupIndex(0))]); + let mut chain_builder = ChainBuilder::new(); + + chain_builder + .major_syncing(sync_oracle_handle.is_major_syncing.clone()) + .add_block( + block_hash1, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { + slot, + candidates: candidates.clone(), + session_info: Some(session_info.clone()), + end_syncing: false, + }, + ) + .add_block( + block_hash2, + ChainBuilder::GENESIS_HASH, + 1, + BlockConfig { + slot, + candidates, + session_info: Some(session_info.clone()), + end_syncing: true, + }, + ); + (chain_builder, session_info) +} + +async fn setup_overseer_with_two_blocks_each_with_one_assignment_triggered( + virtual_overseer: &mut VirtualOverseer, + store: TestStore, + clock: &Box, + sync_oracle_handle: TestSyncOracleHandle, +) { + assert_matches!( + overseer_recv(virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + let fork_block_hash = Hash::repeat_byte(0x02); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); + let candidate_hash = candidate_receipt.hash(); + let slot = Slot::from(1); + let (chain_builder, _session_info) = build_chain_with_two_blocks_with_one_candidate_each( + block_hash, + fork_block_hash, + slot, + sync_oracle_handle, + candidate_receipt, + ) + .await; + chain_builder.build(virtual_overseer).await; + + assert!(!clock.inner.lock().current_wakeup_is(1)); + clock.inner.lock().wakeup_all(1); + + assert!(clock.inner.lock().current_wakeup_is(slot_to_tick(slot))); + clock.inner.lock().wakeup_all(slot_to_tick(slot)); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + clock.inner.lock().wakeup_all(slot_to_tick(slot + 2)); + + assert_eq!(clock.inner.lock().wakeups.len(), 0); + + futures_timer::Delay::new(Duration::from_millis(200)).await; + + let candidate_entry = store.load_candidate_entry(&candidate_hash).unwrap().unwrap(); + let our_assignment = + candidate_entry.approval_entry(&block_hash).unwrap().our_assignment().unwrap(); + assert!(our_assignment.triggered()); +} + +// Tests that for candidates that we did not approve yet, for which we triggered the assignment and +// the approval work we restart the work to approve it. +#[test] +fn subsystem_relaunches_approval_work_on_restart() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert(AssignmentCertKind::RelayVRFModulo { sample: 0 }) + .into(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)] + .try_into() + .unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + let store_clone = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_two_blocks_each_with_one_assignment_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + // Bail early after the assignment has been distributed but before we answer with the mocked + // approval from CandidateValidation. + virtual_overseer + }); + + // Restart a new approval voting subsystem with the same database and major syncing true untill + // the last leaf. + let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + let fork_block_hash = Hash::repeat_byte(0x02); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); + let slot = Slot::from(1); + clock.inner.lock().set_tick(slot_to_tick(slot + 2)); + let (chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each( + block_hash, + fork_block_hash, + slot, + sync_oracle_handle, + candidate_receipt, + ) + .await; + + chain_builder.build(&mut virtual_overseer).await; + + futures_timer::Delay::new(Duration::from_millis(2000)).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(_, si_tx), + ) + ) => { + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(_, si_tx), + ) + ) => { + // Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent) + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::NodeFeatures(_, si_tx), ) + ) => { + si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap(); + } + ); + + // On major syncing ending Approval voting should send all the necessary messages for a + // candidate to be approved. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks( + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + // Guarantees the approval work has been relaunched. + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +} + +// Test that cached approvals, which are candidates that we approved but we didn't issue +// the signature yet because we want to coalesce it with more candidate are sent after restart. +#[test] +fn subsystem_sends_pending_approvals_on_approval_restart() { + let assignment_criteria = Box::new(MockAssignmentCriteria( + || { + let mut assignments = HashMap::new(); + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert(AssignmentCertKind::RelayVRFModulo { sample: 0 }) + .into(), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + + let _ = assignments.insert( + CoreIndex(0), + approval_db::v2::OurAssignment { + cert: garbage_assignment_cert_v2(AssignmentCertKindV2::RelayVRFModuloCompact { + core_bitfield: vec![CoreIndex(0), CoreIndex(1), CoreIndex(2)] + .try_into() + .unwrap(), + }), + tranche: 0, + validator_index: ValidatorIndex(0), + triggered: false, + } + .into(), + ); + assignments + }, + |_| Ok(0), + )); + let config = HarnessConfigBuilder::default().assignment_criteria(assignment_criteria).build(); + let store = config.backend(); + let store_clone = config.backend(); + + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + setup_overseer_with_two_blocks_each_with_one_assignment_triggered( + &mut virtual_overseer, + store, + &clock, + sync_oracle_handle, + ) + .await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + recover_available_data(&mut virtual_overseer).await; + fetch_validation_code(&mut virtual_overseer).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _ + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::CandidateValidation(CandidateValidationMessage::ValidateFromExhaustive { + exec_kind, + response_sender, + .. + }) if exec_kind == PvfExecKind::Approval => { + response_sender.send(Ok(ValidationResult::Valid(Default::default(), Default::default()))) + .unwrap(); + } + ); + + // Configure a big coalesce number, so that the signature is cached instead of being sent to + // approval-distribution. + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 6, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 6, + })); + } + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); + + let config = HarnessConfigBuilder::default().backend(store_clone).major_syncing(true).build(); + // On restart signatures should be sent to approval-distribution without relaunching the + // approval work. + test_harness(config, |test_harness| async move { + let TestHarness { mut virtual_overseer, clock, sync_oracle_handle } = test_harness; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ChainApi(ChainApiMessage::FinalizedBlockNumber(rx)) => { + rx.send(Ok(0)).unwrap(); + } + ); + + let block_hash = Hash::repeat_byte(0x01); + let fork_block_hash = Hash::repeat_byte(0x02); + let candidate_commitments = CandidateCommitments::default(); + let mut candidate_receipt = dummy_candidate_receipt(block_hash); + candidate_receipt.commitments_hash = candidate_commitments.hash(); + let slot = Slot::from(1); + + clock.inner.lock().set_tick(slot_to_tick(slot + 2)); + let (chain_builder, session_info) = build_chain_with_two_blocks_with_one_candidate_each( + block_hash, + fork_block_hash, + slot, + sync_oracle_handle, + candidate_receipt, + ) + .await; + chain_builder.build(&mut virtual_overseer).await; + + futures_timer::Delay::new(Duration::from_millis(2000)).await; + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::NewBlocks( + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeAssignment( + _, + _, + )) => { + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionInfo(_, si_tx), + ) + ) => { + si_tx.send(Ok(Some(session_info.clone()))).unwrap(); + } + ); + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request( + _, + RuntimeApiRequest::SessionExecutorParams(_, si_tx), + ) + ) => { + // Make sure all SessionExecutorParams calls are not made for the leaf (but for its relay parent) + si_tx.send(Ok(Some(ExecutorParams::default()))).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi( + RuntimeApiMessage::Request(_, RuntimeApiRequest::NodeFeatures(_, si_tx), ) + ) => { + si_tx.send(Ok(NodeFeatures::EMPTY)).unwrap(); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::RuntimeApi(RuntimeApiMessage::Request(_, RuntimeApiRequest::ApprovalVotingParams(_, sender))) => { + let _ = sender.send(Ok(ApprovalVotingParams { + max_approval_coalesce_count: 1, + })); + } + ); + + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::ApprovalDistribution(ApprovalDistributionMessage::DistributeApproval(_)) + ); + + // Assert that there are no more messages being sent by the subsystem + assert!(overseer_recv(&mut virtual_overseer).timeout(TIMEOUT / 2).await.is_none()); + + virtual_overseer + }); +}