Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Change best effort queue behaviour in dispute-coordinator (#6275)
Browse files Browse the repository at this point in the history
* Change best effort queue behaviour in `dispute-coordinator`

Use the same type of queue (`BTreeMap<CandidateComparator,
ParticipationRequest>`) for best effort and priority in
`dispute-coordinator`.

Rework `CandidateComparator` to handle unavailable parent
block numbers.

Best effort queue will order disputes the same way as priority does - by
parent's block height. Disputes on candidates for which the parent's
block number can't be obtained will be treated with the lowest priority.

* Fix tests: Handle `ChainApiMessage::BlockNumber` in `handle_sync_queries`

* Some tests are deadlocking on sending messages via overseer so change `SingleItemSink`to `mpsc::Sender` with a buffer of 1

* Fix a race in test after adding a buffered queue for overseer messages

* Fix the rest of the tests

* Guide update - best-effort queue

* Guide update: clarification about spam votes

* Fix tests in `availability-distribution`

* Update comments

* Add `make_buffered_subsystem_context` in `subsystem-test-helpers`

* Code review feedback

* Code review feedback

* Code review feedback

* Don't add best effort candidate if it is already in priority queue

* Remove an old comment

* Fix insert in best_effort
  • Loading branch information
tdimitrov authored Nov 17, 2022
1 parent 98df65b commit eed7f40
Show file tree
Hide file tree
Showing 7 changed files with 328 additions and 195 deletions.
185 changes: 88 additions & 97 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
cmp::Ordering,
collections::{BTreeMap, HashMap},
};
use std::{cmp::Ordering, collections::BTreeMap};

use futures::channel::oneshot;
use polkadot_node_subsystem::{messages::ChainApiMessage, overseer};
Expand Down Expand Up @@ -50,25 +47,14 @@ const PRIORITY_QUEUE_SIZE: usize = 20_000;
#[cfg(test)]
const PRIORITY_QUEUE_SIZE: usize = 2;

/// Type for counting how often a candidate was added to the best effort queue.
type BestEffortCount = u32;

/// Queues for dispute participation.
/// In both queues we have a strict ordering of candidates and participation will
/// happen in that order. Refer to `CandidateComparator` for details on the ordering.
pub struct Queues {
/// Set of best effort participation requests.
///
/// Note that as size is limited to `BEST_EFFORT_QUEUE_SIZE` we simply do a linear search for
/// the entry with the highest `added_count` to determine what dispute to participate next in.
///
/// This mechanism leads to an amplifying effect - the more validators already participated,
/// the more likely it becomes that more validators will participate soon, which should lead to
/// a quick resolution of disputes, even in the best effort queue.
best_effort: HashMap<CandidateHash, BestEffortEntry>,
best_effort: BTreeMap<CandidateComparator, ParticipationRequest>,

/// Priority queue.
///
/// In the priority queue, we have a strict ordering of candidates and participation will
/// happen in that order.
priority: BTreeMap<CandidateComparator, ParticipationRequest>,
}

Expand Down Expand Up @@ -143,14 +129,13 @@ impl ParticipationRequest {
impl Queues {
/// Create new `Queues`.
pub fn new() -> Self {
Self { best_effort: HashMap::new(), priority: BTreeMap::new() }
Self { best_effort: BTreeMap::new(), priority: BTreeMap::new() }
}

/// Will put message in queue, either priority or best effort depending on priority.
///
/// If the message was already previously present on best effort, it will be moved to priority
/// if it considered priority now, otherwise the `added_count` on the best effort queue will be
/// bumped.
/// if it is considered priority now.
///
/// Returns error in case a queue was found full already.
pub async fn queue(
Expand All @@ -159,94 +144,76 @@ impl Queues {
priority: ParticipationPriority,
req: ParticipationRequest,
) -> Result<()> {
let comparator = match priority {
ParticipationPriority::BestEffort => None,
ParticipationPriority::Priority =>
CandidateComparator::new(sender, &req.candidate_receipt).await?,
};
self.queue_with_comparator(comparator, req)?;
let comparator = CandidateComparator::new(sender, &req.candidate_receipt).await?;

self.queue_with_comparator(comparator, priority, req)?;
Ok(())
}

/// Get the next best request for dispute participation
///
/// if any. Priority queue is always considered first, then the best effort queue based on
/// `added_count`.
/// Get the next best request for dispute participation if any.
/// First the priority queue is considered and then the best effort one.
pub fn dequeue(&mut self) -> Option<ParticipationRequest> {
if let Some(req) = self.pop_priority() {
// In case a candidate became best effort over time, we might have it also queued in
// the best effort queue - get rid of any such entry:
self.best_effort.remove(req.candidate_hash());
return Some(req)
return Some(req.1)
}
self.pop_best_effort()
self.pop_best_effort().map(|d| d.1)
}

fn queue_with_comparator(
&mut self,
comparator: Option<CandidateComparator>,
comparator: CandidateComparator,
priority: ParticipationPriority,
req: ParticipationRequest,
) -> std::result::Result<(), QueueError> {
if let Some(comparator) = comparator {
if priority.is_priority() {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
// Remove any best effort entry:
self.best_effort.remove(&req.candidate_hash);
self.best_effort.remove(&comparator);
self.priority.insert(comparator, req);
} else {
if self.priority.contains_key(&comparator) {
// The candidate is already in priority queue - don't
// add in in best effort too.
return Ok(())
}
if self.best_effort.len() >= BEST_EFFORT_QUEUE_SIZE {
return Err(QueueError::BestEffortFull)
}
// Note: The request might have been added to priority in a previous call already, we
// take care of that case in `dequeue` (more efficient).
self.best_effort
.entry(req.candidate_hash)
.or_insert(BestEffortEntry { req, added_count: 0 })
.added_count += 1;
self.best_effort.insert(comparator, req);
}
Ok(())
}

/// Get the next best from the best effort queue.
///
/// If there are multiple best - just pick one.
fn pop_best_effort(&mut self) -> Option<ParticipationRequest> {
let best = self.best_effort.iter().reduce(|(hash1, entry1), (hash2, entry2)| {
if entry1.added_count > entry2.added_count {
(hash1, entry1)
} else {
(hash2, entry2)
}
});
if let Some((best_hash, _)) = best {
let best_hash = best_hash.clone();
self.best_effort.remove(&best_hash).map(|e| e.req)
} else {
None
}
/// Get best from the best effort queue.
fn pop_best_effort(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.best_effort)
}

/// Get best priority queue entry.
fn pop_priority(&mut self) -> Option<ParticipationRequest> {
fn pop_priority(&mut self) -> Option<(CandidateComparator, ParticipationRequest)> {
return Self::pop_impl(&mut self.priority)
}

// `pop_best_effort` and `pop_priority` do the same but on different `BTreeMap`s. This function has
// the extracted implementation
fn pop_impl(
target: &mut BTreeMap<CandidateComparator, ParticipationRequest>,
) -> Option<(CandidateComparator, ParticipationRequest)> {
// Once https://github.com/rust-lang/rust/issues/62924 is there, we can use a simple:
// priority.pop_first().
if let Some((comparator, _)) = self.priority.iter().next() {
// target.pop_first().
if let Some((comparator, _)) = target.iter().next() {
let comparator = comparator.clone();
self.priority.remove(&comparator)
target
.remove(&comparator)
.map(|participation_request| (comparator, participation_request))
} else {
None
}
}
}

/// Entry for the best effort queue.
struct BestEffortEntry {
req: ParticipationRequest,
/// How often was the above request added to the queue.
added_count: BestEffortCount,
}

/// `Comparator` for ordering of disputes for candidates.
///
/// This `comparator` makes it possible to order disputes based on age and to ensure some fairness
Expand All @@ -266,9 +233,12 @@ struct BestEffortEntry {
#[derive(Copy, Clone)]
#[cfg_attr(test, derive(Debug))]
struct CandidateComparator {
/// Block number of the relay parent.
/// Block number of the relay parent. It's wrapped in an `Option<>` because there are cases when
/// it can't be obtained. For example when the node is lagging behind and new leaves are received
/// with a slight delay. Candidates with unknown relay parent are treated with the lowest priority.
///
/// Important, so we will be participating in oldest disputes first.
/// The order enforced by `CandidateComparator` is important because we want to participate in
/// the oldest disputes first.
///
/// Note: In theory it would make more sense to use the `BlockNumber` of the including
/// block, as inclusion time is the actual relevant event when it comes to ordering. The
Expand All @@ -277,8 +247,10 @@ struct CandidateComparator {
/// just using the lowest `BlockNumber` of all available including blocks - the problem is,
/// that is not stable. If a new fork appears after the fact, we would start ordering the same
/// candidate differently, which would result in the same candidate getting queued twice.
relay_parent_block_number: BlockNumber,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates.
relay_parent_block_number: Option<BlockNumber>,
/// By adding the `CandidateHash`, we can guarantee a unique ordering across candidates with the
/// same relay parent block number. Candidates without `relay_parent_block_number` are ordered by
/// the `candidate_hash` (and treated with the lowest priority, as already mentioned).
candidate_hash: CandidateHash,
}

Expand All @@ -287,33 +259,35 @@ impl CandidateComparator {
///
/// Useful for testing.
#[cfg(test)]
pub fn new_dummy(block_number: BlockNumber, candidate_hash: CandidateHash) -> Self {
pub fn new_dummy(block_number: Option<BlockNumber>, candidate_hash: CandidateHash) -> Self {
Self { relay_parent_block_number: block_number, candidate_hash }
}

/// Create a candidate comparator for a given candidate.
///
/// Returns:
/// `Ok(None)` in case we could not lookup the candidate's relay parent, returns a
/// `FatalError` in case the chain API call fails with an unexpected error.
/// - `Ok(CandidateComparator{Some(relay_parent_block_number), candidate_hash})` when the
/// relay parent can be obtained. This is the happy case.
/// - `Ok(CandidateComparator{None, candidate_hash})` in case the candidate's relay parent
/// can't be obtained.
/// - `FatalError` in case the chain API call fails with an unexpected error.
pub async fn new(
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
candidate: &CandidateReceipt,
) -> FatalResult<Option<Self>> {
) -> FatalResult<Self> {
let candidate_hash = candidate.hash();
let n = match get_block_number(sender, candidate.descriptor().relay_parent).await? {
None => {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator could not be provided!"
);
return Ok(None)
},
Some(n) => n,
};
let n = get_block_number(sender, candidate.descriptor().relay_parent).await?;

if n.is_none() {
gum::warn!(
target: LOG_TARGET,
candidate_hash = ?candidate_hash,
"Candidate's relay_parent could not be found via chain API - `CandidateComparator` \
with an empty relay parent block number will be provided!"
);
}

Ok(Some(CandidateComparator { relay_parent_block_number: n, candidate_hash }))
Ok(CandidateComparator { relay_parent_block_number: n, candidate_hash })
}
}

Expand All @@ -333,11 +307,28 @@ impl PartialOrd for CandidateComparator {

impl Ord for CandidateComparator {
fn cmp(&self, other: &Self) -> Ordering {
match self.relay_parent_block_number.cmp(&other.relay_parent_block_number) {
Ordering::Equal => (),
o => return o,
return match (self.relay_parent_block_number, other.relay_parent_block_number) {
(None, None) => {
// No relay parents for both -> compare hashes
self.candidate_hash.cmp(&other.candidate_hash)
},
(Some(self_relay_parent_block_num), Some(other_relay_parent_block_num)) => {
match self_relay_parent_block_num.cmp(&other_relay_parent_block_num) {
// if the relay parent is the same for both -> compare hashes
Ordering::Equal => self.candidate_hash.cmp(&other.candidate_hash),
// if not - return the result from comparing the relay parent block numbers
o => return o,
}
},
(Some(_), None) => {
// Candidates with known relay parents are always with priority
Ordering::Less
},
(None, Some(_)) => {
// Ditto
Ordering::Greater
},
}
self.candidate_hash.cmp(&other.candidate_hash)
}
}

Expand Down
Loading

0 comments on commit eed7f40

Please sign in to comment.