diff --git a/node/core/dispute-coordinator/src/participation/queues/mod.rs b/node/core/dispute-coordinator/src/participation/queues/mod.rs index 3ec217628625..d2fcab1ba258 100644 --- a/node/core/dispute-coordinator/src/participation/queues/mod.rs +++ b/node/core/dispute-coordinator/src/participation/queues/mod.rs @@ -14,10 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::{ - cmp::Ordering, - collections::{BTreeMap, HashMap}, -}; +use std::{cmp::Ordering, collections::BTreeMap}; use futures::channel::oneshot; use polkadot_node_subsystem::{messages::ChainApiMessage, overseer}; @@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000; #[cfg(test)] const PRIORITY_QUEUE_SIZE: usize = 2; -/// Type for counting how often a candidate was added to the best effort queue. -type BestEffortCount = u32; - /// Queues for dispute participation. +/// In both queues we have a strict ordering of candidates and participation will +/// happen in that order. Refer to `CandidateComparator` for details on the ordering. pub struct Queues { /// Set of best effort participation requests. - /// - /// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for - /// the entry with the highest `added_count` to determine what dispute to participate next in. - /// - /// This mechanism leads to an amplifying effect - the more validators already participated, - /// the more likely it becomes that more validators will participate soon, which should lead to - /// a quick resolution of disputes, even in the best effort queue. - best_effort: HashMap, + best_effort: BTreeMap, /// Priority queue. - /// - /// In the priority queue, we have a strict ordering of candidates and participation will - /// happen in that order. priority: BTreeMap, } @@ -143,14 +129,13 @@ impl ParticipationRequest { impl Queues { /// Create new `Queues`. pub fn new() -> Self { - Self { best_effort: HashMap::new(), priority: BTreeMap::new() } + Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() } } /// Will put message in queue, either priority or best effort depending on priority. /// /// If the message was already previously present on best effort, it will be moved to priority - /// if it considered priority now, otherwise the `added_count` on the best effort queue will be - /// bumped. + /// if it is considered priority now. /// /// Returns error in case a queue was found full already. pub async fn queue( @@ -159,94 +144,76 @@ impl Queues { priority: ParticipationPriority, req: ParticipationRequest, ) -> Result<()> { - let comparator = match priority { - ParticipationPriority::BestEffort => None, - ParticipationPriority::Priority => - CandidateComparator::new(sender, &req.candidate_receipt).await?, - }; - self.queue_with_comparator(comparator, req)?; + let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?; + + self.queue_with_comparator(comparator, priority, req)?; Ok(()) } - /// Get the next best request for dispute participation - /// - /// if any. Priority queue is always considered first, then the best effort queue based on - /// `added_count`. + /// Get the next best request for dispute participation if any. + /// First the priority queue is considered and then the best effort one. pub fn dequeue(&mut self) -> Option { if let Some(req) = self.pop_priority() { - // In case a candidate became best effort over time, we might have it also queued in - // the best effort queue - get rid of any such entry: - self.best_effort.remove(req.candidate_hash()); - return Some(req) + return Some(req.1) } - self.pop_best_effort() + self.pop_best_effort().map(|d| d.1) } fn queue_with_comparator( &mut self, - comparator: Option, + comparator: CandidateComparator, + priority: ParticipationPriority, req: ParticipationRequest, ) -> std::result::Result<(), QueueError> { - if let Some(comparator) = comparator { + if priority.is_priority() { if self.priority.len() >= PRIORITY_QUEUE_SIZE { return Err(QueueError::PriorityFull) } // Remove any best effort entry: - self.best_effort.remove(&req.candidate_hash); + self.best_effort.remove(&comparator); self.priority.insert(comparator, req); } else { + if self.priority.contains_key(&comparator) { + // The candidate is already in priority queue - don't + // add in in best effort too. + return Ok(()) + } if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE { return Err(QueueError::BestEffortFull) } - // Note: The request might have been added to priority in a previous call already, we - // take care of that case in `dequeue` (more efficient). - self.best_effort - .entry(req.candidate_hash) - .or_insert(BestEffortEntry { req, added_count: 0 }) - .added_count += 1; + self.best_effort.insert(comparator, req); } Ok(()) } - /// Get the next best from the best effort queue. - /// - /// If there are multiple best - just pick one. - fn pop_best_effort(&mut self) -> Option { - let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| { - if entry1.added_count > entry2.added_count { - (hash1, entry1) - } else { - (hash2, entry2) - } - }); - if let Some((best_hash, _)) = best { - let best_hash = best_hash.clone(); - self.best_effort.remove(&best_hash).map(|e| e.req) - } else { - None - } + /// Get best from the best effort queue. + fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> { + return Self::pop_impl(&mut self.best_effort) } /// Get best priority queue entry. - fn pop_priority(&mut self) -> Option { + fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> { + return Self::pop_impl(&mut self.priority) + } + + // `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has + // the extracted implementation + fn pop_impl( + target: &mut BTreeMap, + ) -> Option<(CandidateComparator, ParticipationRequest)> { // Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple: - // priority.pop_first(). - if let Some((comparator, _)) = self.priority.iter().next() { + // target.pop_first(). + if let Some((comparator, _)) = target.iter().next() { let comparator = comparator.clone(); - self.priority.remove(&comparator) + target + .remove(&comparator) + .map(|participation_request| (comparator, participation_request)) } else { None } } } -/// Entry for the best effort queue. -struct BestEffortEntry { - req: ParticipationRequest, - /// How often was the above request added to the queue. - added_count: BestEffortCount, -} - /// `Comparator` for ordering of disputes for candidates. /// /// This `comparator` makes it possible to order disputes based on age and to ensure some fairness @@ -266,9 +233,12 @@ struct BestEffortEntry { #[derive(Copy, Clone)] #[cfg_attr(test, derive(Debug))] struct CandidateComparator { - /// Block number of the relay parent. + /// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when + /// it can't be obtained. For example when the node is lagging behind and new leaves are received + /// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority. /// - /// Important, so we will be participating in oldest disputes first. + /// The order enforced by `CandidateComparator` is important because we want to participate in + /// the oldest disputes first. /// /// Note: In theory it would make more sense to use the `BlockNumber` of the including /// block, as inclusion time is the actual relevant event when it comes to ordering. The @@ -277,8 +247,10 @@ struct CandidateComparator { /// just using the lowest `BlockNumber` of all available including blocks - the problem is, /// that is not stable. If a new fork appears after the fact, we would start ordering the same /// candidate differently, which would result in the same candidate getting queued twice. - relay_parent_block_number: BlockNumber, - /// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates. + relay_parent_block_number: Option, + /// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates with the + /// same relay parent block number. Candidates without `relay_parent_block_number` are ordered by + /// the `candidate_hash` (and treated with the lowest priority, as already mentioned). candidate_hash: CandidateHash, } @@ -287,33 +259,35 @@ impl CandidateComparator { /// /// Useful for testing. #[cfg(test)] - pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self { + pub fn new_dummy(block_number: Option, candidate_hash: CandidateHash) -> Self { Self { relay_parent_block_number: block_number, candidate_hash } } /// Create a candidate comparator for a given candidate. /// /// Returns: - /// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a - /// `FatalError` in case the chain API call fails with an unexpected error. + /// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the + /// relay parent can be obtained. This is the happy case. + /// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent + /// can't be obtained. + /// - `FatalError` in case the chain API call fails with an unexpected error. pub async fn new( sender: &mut impl overseer::DisputeCoordinatorSenderTrait, candidate: &CandidateReceipt, - ) -> FatalResult> { + ) -> FatalResult { let candidate_hash = candidate.hash(); - let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? { - None => { - gum::warn!( - target: LOG_TARGET, - candidate_hash = ?candidate_hash, - "Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!" - ); - return Ok(None) - }, - Some(n) => n, - }; + let n = get_block_number(sender, candidate.descriptor().relay_parent).await?; + + if n.is_none() { + gum::warn!( + target: LOG_TARGET, + candidate_hash = ?candidate_hash, + "Candidate's relay_parent could not be found via chain API - `CandidateComparator` \ + with an empty relay parent block number will be provided!" + ); + } - Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash })) + Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash }) } } @@ -333,11 +307,28 @@ impl PartialOrd for CandidateComparator { impl Ord for CandidateComparator { fn cmp(&self, other: &Self) -> Ordering { - match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) { - Ordering::Equal => (), - o => return o, + return match (self.relay_parent_block_number, other.relay_parent_block_number) { + (None, None) => { + // No relay parents for both -> compare hashes + self.candidate_hash.cmp(&other.candidate_hash) + }, + (Some(self_relay_parent_block_num), Some(other_relay_parent_block_num)) => { + match self_relay_parent_block_num.cmp(&other_relay_parent_block_num) { + // if the relay parent is the same for both -> compare hashes + Ordering::Equal => self.candidate_hash.cmp(&other.candidate_hash), + // if not - return the result from comparing the relay parent block numbers + o => return o, + } + }, + (Some(_), None) => { + // Candidates with known relay parents are always with priority + Ordering::Less + }, + (None, Some(_)) => { + // Ditto + Ordering::Greater + }, } - self.candidate_hash.cmp(&other.candidate_hash) } } diff --git a/node/core/dispute-coordinator/src/participation/queues/tests.rs b/node/core/dispute-coordinator/src/participation/queues/tests.rs index 4e9019ebb499..b6af4bd2b55a 100644 --- a/node/core/dispute-coordinator/src/participation/queues/tests.rs +++ b/node/core/dispute-coordinator/src/participation/queues/tests.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::ParticipationPriority; use ::test_helpers::{dummy_candidate_receipt, dummy_hash}; use assert_matches::assert_matches; use polkadot_primitives::v2::{BlockNumber, Hash}; @@ -31,15 +32,16 @@ fn make_participation_request(hash: Hash) -> ParticipationRequest { /// Make dummy comparator for request, based on the given block number. fn make_dummy_comparator( req: &ParticipationRequest, - relay_parent: BlockNumber, + relay_parent: Option, ) -> CandidateComparator { CandidateComparator::new_dummy(relay_parent, *req.candidate_hash()) } /// Check that dequeuing acknowledges order. /// -/// Any priority item will be dequeued before any best effort items, priority items will be -/// processed in order. Best effort items, based on how often they have been added. +/// Any priority item will be dequeued before any best effort items, priority and best effort with +/// known parent block number items will be processed in order. Best effort items without known parent +/// block number should be treated with lowest priority. #[test] fn ordering_works_as_expected() { let mut queue = Queues::new(); @@ -47,36 +49,69 @@ fn ordering_works_as_expected() { let req_prio = make_participation_request(Hash::repeat_byte(0x02)); let req3 = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_2 = make_participation_request(Hash::repeat_byte(0x04)); - let req5 = make_participation_request(Hash::repeat_byte(0x05)); + let req5_unknown_parent = make_participation_request(Hash::repeat_byte(0x05)); let req_full = make_participation_request(Hash::repeat_byte(0x06)); let req_prio_full = make_participation_request(Hash::repeat_byte(0x07)); - queue.queue_with_comparator(None, req1.clone()).unwrap(); queue - .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator( + make_dummy_comparator(&req1, Some(1)), + ParticipationPriority::BestEffort, + req1.clone(), + ) + .unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req_prio, Some(1)), + ParticipationPriority::Priority, + req_prio.clone(), + ) .unwrap(); - queue.queue_with_comparator(None, req3.clone()).unwrap(); queue - .queue_with_comparator(Some(make_dummy_comparator(&req_prio_2, 2)), req_prio_2.clone()) + .queue_with_comparator( + make_dummy_comparator(&req3, Some(2)), + ParticipationPriority::BestEffort, + req3.clone(), + ) + .unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req_prio_2, Some(2)), + ParticipationPriority::Priority, + req_prio_2.clone(), + ) + .unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req5_unknown_parent, None), + ParticipationPriority::BestEffort, + req5_unknown_parent.clone(), + ) .unwrap(); - queue.queue_with_comparator(None, req3.clone()).unwrap(); - queue.queue_with_comparator(None, req5.clone()).unwrap(); assert_matches!( - queue.queue_with_comparator(Some(make_dummy_comparator(&req_prio_full, 3)), req_prio_full), + queue.queue_with_comparator( + make_dummy_comparator(&req_prio_full, Some(3)), + ParticipationPriority::Priority, + req_prio_full + ), Err(QueueError::PriorityFull) ); - assert_matches!(queue.queue_with_comparator(None, req_full), Err(QueueError::BestEffortFull)); + assert_matches!( + queue.queue_with_comparator( + make_dummy_comparator(&req_full, Some(3)), + ParticipationPriority::BestEffort, + req_full + ), + Err(QueueError::BestEffortFull) + ); + // Prioritized queue is ordered correctly assert_eq!(queue.dequeue(), Some(req_prio)); assert_eq!(queue.dequeue(), Some(req_prio_2)); + // So is the best-effort + assert_eq!(queue.dequeue(), Some(req1)); assert_eq!(queue.dequeue(), Some(req3)); - assert_matches!( - queue.dequeue(), - Some(r) => { assert!(r == req1 || r == req5) } - ); - assert_matches!( - queue.dequeue(), - Some(r) => { assert!(r == req1 || r == req5) } - ); + assert_eq!(queue.dequeue(), Some(req5_unknown_parent)); + assert_matches!(queue.dequeue(), None); } @@ -89,23 +124,50 @@ fn candidate_is_only_dequeued_once() { let req_best_effort_then_prio = make_participation_request(Hash::repeat_byte(0x03)); let req_prio_then_best_effort = make_participation_request(Hash::repeat_byte(0x04)); - queue.queue_with_comparator(None, req1.clone()).unwrap(); queue - .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator( + make_dummy_comparator(&req1, None), + ParticipationPriority::BestEffort, + req1.clone(), + ) + .unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req_prio, Some(1)), + ParticipationPriority::Priority, + req_prio.clone(), + ) .unwrap(); // Insert same best effort again: - queue.queue_with_comparator(None, req1.clone()).unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req1, None), + ParticipationPriority::BestEffort, + req1.clone(), + ) + .unwrap(); // insert same prio again: queue - .queue_with_comparator(Some(make_dummy_comparator(&req_prio, 1)), req_prio.clone()) + .queue_with_comparator( + make_dummy_comparator(&req_prio, Some(1)), + ParticipationPriority::Priority, + req_prio.clone(), + ) .unwrap(); // Insert first as best effort: - queue.queue_with_comparator(None, req_best_effort_then_prio.clone()).unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req_best_effort_then_prio, Some(2)), + ParticipationPriority::BestEffort, + req_best_effort_then_prio.clone(), + ) + .unwrap(); // Then as prio: queue .queue_with_comparator( - Some(make_dummy_comparator(&req_best_effort_then_prio, 2)), + make_dummy_comparator(&req_best_effort_then_prio, Some(2)), + ParticipationPriority::Priority, req_best_effort_then_prio.clone(), ) .unwrap(); @@ -116,12 +178,19 @@ fn candidate_is_only_dequeued_once() { // Insert first as prio: queue .queue_with_comparator( - Some(make_dummy_comparator(&req_prio_then_best_effort, 3)), + make_dummy_comparator(&req_prio_then_best_effort, Some(3)), + ParticipationPriority::Priority, req_prio_then_best_effort.clone(), ) .unwrap(); // Then as best effort: - queue.queue_with_comparator(None, req_prio_then_best_effort.clone()).unwrap(); + queue + .queue_with_comparator( + make_dummy_comparator(&req_prio_then_best_effort, Some(3)), + ParticipationPriority::BestEffort, + req_prio_then_best_effort.clone(), + ) + .unwrap(); assert_eq!(queue.dequeue(), Some(req_best_effort_then_prio)); assert_eq!(queue.dequeue(), Some(req_prio_then_best_effort)); diff --git a/node/core/dispute-coordinator/src/participation/tests.rs b/node/core/dispute-coordinator/src/participation/tests.rs index 03772b1918dc..bf149a87286f 100644 --- a/node/core/dispute-coordinator/src/participation/tests.rs +++ b/node/core/dispute-coordinator/src/participation/tests.rs @@ -29,7 +29,10 @@ use parity_scale_codec::Encode; use polkadot_node_primitives::{AvailableData, BlockData, InvalidCandidate, PoV}; use polkadot_node_subsystem::{ jaeger, - messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, + messages::{ + AllMessages, ChainApiMessage, DisputeCoordinatorMessage, RuntimeApiMessage, + RuntimeApiRequest, + }, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, SpawnGlue, }; use polkadot_node_subsystem_test_helpers::{ @@ -221,9 +224,9 @@ fn same_req_wont_get_queued_if_participation_is_already_running() { #[test] fn reqs_get_queued_when_out_of_capacity() { - futures::executor::block_on(async { - let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); + let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); + let test = async { let (sender, mut worker_receiver) = mpsc::channel(1); let mut participation = Participation::new(sender); activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); @@ -239,43 +242,81 @@ fn reqs_get_queued_when_out_of_capacity() { } for _ in 0..MAX_PARALLEL_PARTICIPATIONS + 1 { - assert_matches!( - ctx_handle.recv().await, - AllMessages::AvailabilityRecovery( - AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx) - ) => { - tx.send(Err(RecoveryError::Unavailable)).unwrap(); - }, - "overseer did not receive recover available data message", - ); - let result = participation .get_participation_result(&mut ctx, worker_receiver.next().await.unwrap()) .await .unwrap(); - assert_matches!( result.outcome, ParticipationOutcome::Unavailable => {} ); } + // we should not have any further recovery requests: + assert_matches!(worker_receiver.next().timeout(Duration::from_millis(10)).await, None); + }; + + let request_handler = async { + let mut recover_available_data_msg_count = 0; + let mut block_number_msg_count = 0; + + while recover_available_data_msg_count < MAX_PARALLEL_PARTICIPATIONS + 1 || + block_number_msg_count < 1 + { + match ctx_handle.recv().await { + AllMessages::AvailabilityRecovery( + AvailabilityRecoveryMessage::RecoverAvailableData(_, _, _, tx), + ) => { + tx.send(Err(RecoveryError::Unavailable)).unwrap(); + recover_available_data_msg_count += 1; + }, + AllMessages::ChainApi(ChainApiMessage::BlockNumber(_, tx)) => { + tx.send(Ok(None)).unwrap(); + block_number_msg_count += 1; + }, + _ => assert!(false, "Received unexpected message"), + } + } - // we should not have any further results nor recovery requests: + // we should not have any further results assert_matches!(ctx_handle.recv().timeout(Duration::from_millis(10)).await, None); - assert_matches!(worker_receiver.next().timeout(Duration::from_millis(10)).await, None); - }) + }; + + futures::executor::block_on(async { + futures::join!(test, request_handler); + }); } #[test] fn reqs_get_queued_on_no_recent_block() { - futures::executor::block_on(async { - let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); - + let (mut ctx, mut ctx_handle) = make_our_subsystem_context(TaskExecutor::new()); + let (mut unblock_test, mut wait_for_verification) = mpsc::channel(0); + let test = async { let (sender, _worker_receiver) = mpsc::channel(1); let mut participation = Participation::new(sender); participate(&mut ctx, &mut participation).await.unwrap(); - assert!(ctx_handle.recv().timeout(Duration::from_millis(10)).await.is_none()); + + // We have initiated participation but we'll block `active_leaf` so that we can check that + // the participation is queued in race-free way + let _ = wait_for_verification.next().await.unwrap(); + activate_leaf(&mut ctx, &mut participation, 10).await.unwrap(); + }; + + // Responds to messages from the test and verifies its behaviour + let request_handler = async { + // If we receive `BlockNumber` request this implicitly proves that the participation is queued + assert_matches!( + ctx_handle.recv().await, + AllMessages::ChainApi(ChainApiMessage::BlockNumber(_, tx)) => { + tx.send(Ok(None)).unwrap(); + }, + "overseer did not receive `ChainApiMessage::BlockNumber` message", + ); + + assert!(ctx_handle.recv().timeout(Duration::from_millis(10)).await.is_none()); + + // No activity so the participation is queued => unblock the test + unblock_test.send(()).await.unwrap(); // after activating at least one leaf the recent block // state should be available which should lead to trying @@ -288,7 +329,11 @@ fn reqs_get_queued_on_no_recent_block() { )), "overseer did not receive recover available data message", ); - }) + }; + + futures::executor::block_on(async { + futures::join!(test, request_handler); + }); } #[test] diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index d44f46ec3442..4d32620946e0 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -57,7 +57,9 @@ use polkadot_node_subsystem::{ messages::{AllMessages, BlockDescription, RuntimeApiMessage, RuntimeApiRequest}, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, }; -use polkadot_node_subsystem_test_helpers::{make_subsystem_context, TestSubsystemContextHandle}; +use polkadot_node_subsystem_test_helpers::{ + make_buffered_subsystem_context, TestSubsystemContextHandle, +}; use polkadot_primitives::v2::{ ApprovalVote, BlockNumber, CandidateCommitments, CandidateEvent, CandidateHash, CandidateReceipt, CoreIndex, DisputeStatement, GroupIndex, Hash, HeadData, Header, IndexedVec, @@ -382,6 +384,10 @@ impl TestState { ); gum::trace!("After answering runtime API request (votes)"); }, + AllMessages::ChainApi(ChainApiMessage::BlockNumber(hash, tx)) => { + let block_num = self.headers.get(&hash).map(|header| header.number); + tx.send(Ok(block_num)).unwrap(); + }, msg => { panic!("Received unexpected message in `handle_sync_queries`: {:?}", msg); }, @@ -521,7 +527,7 @@ impl TestState { F: FnOnce(TestState, VirtualOverseer) -> BoxFuture<'static, TestState>, { self.known_session = None; - let (ctx, ctx_handle) = make_subsystem_context(TaskExecutor::new()); + let (ctx, ctx_handle) = make_buffered_subsystem_context(TaskExecutor::new(), 1); let subsystem = DisputeCoordinatorSubsystem::new( self.db.clone(), self.config.clone(), @@ -2838,6 +2844,12 @@ fn negative_issue_local_statement_only_triggers_import() { }) .await; + // Assert that subsystem is not participating. + assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); + + virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; + assert!(virtual_overseer.try_recv().await.is_none()); + let backend = DbBackend::new( test_state.db.clone(), test_state.config.column_config(), @@ -2851,12 +2863,6 @@ fn negative_issue_local_statement_only_triggers_import() { let disputes = backend.load_recent_disputes().unwrap(); assert_eq!(disputes, None); - // Assert that subsystem is not participating. - assert!(virtual_overseer.recv().timeout(TEST_TIMEOUT).await.is_none()); - - virtual_overseer.send(FromOrchestra::Signal(OverseerSignal::Conclude)).await; - assert!(virtual_overseer.try_recv().await.is_none()); - test_state }) }); diff --git a/node/network/availability-distribution/src/tests/state.rs b/node/network/availability-distribution/src/tests/state.rs index c021f1bfb81b..be77aa2d023c 100644 --- a/node/network/availability-distribution/src/tests/state.rs +++ b/node/network/availability-distribution/src/tests/state.rs @@ -51,7 +51,7 @@ use polkadot_primitives::v2::{ CandidateHash, CoreState, GroupIndex, Hash, Id as ParaId, ScheduledCore, SessionInfo, ValidatorIndex, }; -use test_helpers::{mock::make_ferdie_keystore, SingleItemSink}; +use test_helpers::mock::make_ferdie_keystore; use super::mock::{make_session_info, OccupiedCoreBuilder}; use crate::LOG_TARGET; @@ -295,7 +295,7 @@ impl TestState { } async fn overseer_signal( - mut tx: SingleItemSink>, + mut tx: mpsc::Sender>, msg: impl Into, ) { let msg = msg.into(); diff --git a/node/subsystem-test-helpers/src/lib.rs b/node/subsystem-test-helpers/src/lib.rs index 79f833b7558c..dd207039091a 100644 --- a/node/subsystem-test-helpers/src/lib.rs +++ b/node/subsystem-test-helpers/src/lib.rs @@ -177,7 +177,7 @@ where /// A test subsystem context. pub struct TestSubsystemContext { tx: TestSubsystemSender, - rx: SingleItemStream>, + rx: mpsc::Receiver>, spawn: S, } @@ -239,7 +239,7 @@ pub struct TestSubsystemContextHandle { /// /// Useful for shared ownership situations (one can have multiple senders, but only one /// receiver. - pub tx: SingleItemSink>, + pub tx: mpsc::Sender>, /// Direct access to the receiver. pub rx: mpsc::UnboundedReceiver, @@ -280,11 +280,22 @@ impl TestSubsystemContextHandle { } } -/// Make a test subsystem context. +/// Make a test subsystem context with `buffer_size == 0`. This is used by most +/// of the tests. pub fn make_subsystem_context( spawner: S, ) -> (TestSubsystemContext>, TestSubsystemContextHandle) { - let (overseer_tx, overseer_rx) = single_item_sink(); + make_buffered_subsystem_context(spawner, 0) +} + +/// Make a test subsystem context with buffered overseer channel. Some tests (e.g. +/// `dispute-coordinator`) create too many parallel operations and deadlock unless +/// the channel is buffered. Usually `buffer_size=1` is enough. +pub fn make_buffered_subsystem_context( + spawner: S, + buffer_size: usize, +) -> (TestSubsystemContext>, TestSubsystemContextHandle) { + let (overseer_tx, overseer_rx) = mpsc::channel(buffer_size); let (all_messages_tx, all_messages_rx) = mpsc::unbounded(); ( diff --git a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md index 3d44e210db0e..07fc647a711c 100644 --- a/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md +++ b/roadmap/implementers-guide/src/node/disputes/dispute-coordinator.md @@ -9,7 +9,7 @@ In particular the dispute-coordinator is responsible for: - Ensuring that the node is able to raise a dispute in case an invalid candidate is found during approval checking. -- Ensuring lazy approval votes (votes given without running the parachain +- Ensuring lazy approval votes (votes given without running the parachain validation function) will be recorded, so lazy nodes can get slashed properly. - Coordinating actual participation in a dispute, ensuring that the node participates in any justified dispute in a way that ensures resolution of @@ -84,18 +84,18 @@ While there is no need to record approval votes in the dispute coordinator preemptively, we do need to make sure they are recorded when a dispute actually happens. This is because only votes recorded by the dispute coordinator will be considered for slashing. It is sufficient for our -threat model that malicious backers are slashed as opposed to both backers and -approval checkers. However, we still must import approval votes from the approvals -process into the disputes process to ensure that lazy approval checkers -actually run the parachain validation function. Slashing lazy approval checkers is necessary, else we risk a useless approvals process where every approval -checker blindly votes valid for every candidate. If we did not import approval -votes, lazy nodes would likely cast a properly checked explicit vote as part -of the dispute in addition to their blind approval vote and thus avoid a slash. -With the 2/3rd honest assumption it seems unrealistic that lazy approval voters -will keep sending unchecked approval votes once they became aware of a raised -dispute. Hence the most crucial approval votes to import are the early ones -(tranche 0), to take into account network latencies and such we still want to -import approval votes at a later point in time as well (in particular we need +threat model that malicious backers are slashed as opposed to both backers and +approval checkers. However, we still must import approval votes from the approvals +process into the disputes process to ensure that lazy approval checkers +actually run the parachain validation function. Slashing lazy approval checkers is necessary, else we risk a useless approvals process where every approval +checker blindly votes valid for every candidate. If we did not import approval +votes, lazy nodes would likely cast a properly checked explicit vote as part +of the dispute in addition to their blind approval vote and thus avoid a slash. +With the 2/3rd honest assumption it seems unrealistic that lazy approval voters +will keep sending unchecked approval votes once they became aware of a raised +dispute. Hence the most crucial approval votes to import are the early ones +(tranche 0), to take into account network latencies and such we still want to +import approval votes at a later point in time as well (in particular we need to make sure the dispute can conclude, but more on that later). As mentioned already previously, importing votes is most efficient when batched. @@ -202,11 +202,11 @@ time participation is faster than approval, a node would do double work. ### Ensuring Chain Import While in the previous section we discussed means for nodes to ensure relevant -votes are recorded so lazy approval checkers get slashed properly, it is crucial -to also discuss the actual chain import. Only if we guarantee that recorded votes -will also get imported on chain (on all potential chains really) we will succeed -in executing slashes. Particularly we need to make sure backing votes end up on -chain consistantly. In contrast recording and slashing lazy approval voters only +votes are recorded so lazy approval checkers get slashed properly, it is crucial +to also discuss the actual chain import. Only if we guarantee that recorded votes +will also get imported on chain (on all potential chains really) we will succeed +in executing slashes. Particularly we need to make sure backing votes end up on +chain consistantly. In contrast recording and slashing lazy approval voters only needs to be likely, not certain. Dispute distribution will make sure all explicit dispute votes get distributed @@ -227,14 +227,14 @@ production in the current set - they might only exist on an already abandoned fork. This means a block producer that just joined the set, might not have seen any of them. -For approvals it is even more tricky and less necessary: Approval voting together -with finalization is a completely off-chain process therefore those protocols -don't care about block production at all. Approval votes only have a guarantee of -being propagated between the nodes that are responsible for finalizing the -concerned blocks. This implies that on an era change the current authority set, -will not necessarily get informed about any approval votes for the previous era. -Hence even if all validators of the previous era successfully recorded all approval -votes in the dispute coordinator, they won't get a chance to put them on chain, +For approvals it is even more tricky and less necessary: Approval voting together +with finalization is a completely off-chain process therefore those protocols +don't care about block production at all. Approval votes only have a guarantee of +being propagated between the nodes that are responsible for finalizing the +concerned blocks. This implies that on an era change the current authority set, +will not necessarily get informed about any approval votes for the previous era. +Hence even if all validators of the previous era successfully recorded all approval +votes in the dispute coordinator, they won't get a chance to put them on chain, hence they won't be considered for slashing. It is important to note, that the essential properties of the system still hold: @@ -359,14 +359,19 @@ times instead of just once to the oldest offender. This is obviously a good idea, in particular it makes it impossible for an attacker to prevent rolling back a very old candidate, by keeping raising disputes for newer candidates. -For candidates we have not seen included, but we have our availability piece -available we put participation on a best-effort queue, which at the moment is -processed on the basis how often we requested participation locally, which -equals the number of times we imported votes for that dispute. The idea is, if -we have not seen the candidate included, but the dispute is valid, other nodes -will have seen it included - so the more votes there are, the more likely it is -a valid dispute and we should implicitly arrive at a similar ordering as the -nodes that are able to sort based on the relay parent block height. +For candidates we have not seen included, but we know are backed (thanks to chain +scraping) or we have seen a dispute with 1/3+1 participation (confirmed dispute) +on them - we put participation on a best-effort queue. It has got the same +ordering as the priority one - by block heights of the relay parent, older blocks +are with priority. There is a possibility not to be able to obtain the block number +of the parent when we are inserting the dispute in the queue. The reason for this +is either the dispute is completely made up or we are out of sync with the other +nodes in terms of last finalized block. The former is very unlikely. If we are +adding a dispute in best-effort it should already be either confirmed or the +candidate is backed. In the latter case we will promote the dispute to the +priority queue once we learn about the new block. NOTE: this is still work in +progress and is tracked by [this issue] +(https://github.com/paritytech/polkadot/issues/5875). #### Import @@ -381,6 +386,12 @@ dispute coordinator level (dispute-distribution also has its own), which is spam slots. For each import, where we don't know whether it might be spam or not we increment a counter for each signing participant of explicit `invalid` votes. +What votes do we treat as a potential spam? A vote will increase a spam slot if +and only if all of the following condidions are satisfied: +* the candidate under dispute is not included on any chain +* the dispute is not confirmed +* we haven't casted a vote for the dispute + The reason this works is because we only need to worry about actual dispute votes. Import of backing votes are already rate limited and concern only real candidates for approval votes a similar argument holds (if they come from