diff --git a/Cargo.lock b/Cargo.lock index 5847bf63f600..b9f15ad0ed18 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6022,6 +6022,7 @@ dependencies = [ "polkadot-node-subsystem-test-helpers", "polkadot-node-subsystem-util", "polkadot-primitives", + "polkadot-primitives-test-helpers", "rand 0.8.5", "rand_chacha 0.3.1", "rand_core 0.5.1", @@ -6924,6 +6925,7 @@ dependencies = [ "polkadot-primitives", "rand 0.8.5", "sp-application-crypto", + "sp-core", "sp-keyring", "sp-runtime", ] diff --git a/Cargo.toml b/Cargo.toml index 4983b346225b..be2e04291370 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -199,7 +199,6 @@ try-runtime = [ "polkadot-cli/try-runtime" ] fast-runtime = [ "polkadot-cli/fast-runtime" ] runtime-metrics = [ "polkadot-cli/runtime-metrics" ] pyroscope = ["polkadot-cli/pyroscope"] -staging-client = ["polkadot-cli/staging-client"] # Configuration for building a .deb package - for use with `cargo-deb` [package.metadata.deb] diff --git a/cli/Cargo.toml b/cli/Cargo.toml index 4cc97cf9af8d..1e770cd8715b 100644 --- a/cli/Cargo.toml +++ b/cli/Cargo.toml @@ -74,4 +74,3 @@ rococo-native = ["service/rococo-native"] malus = ["full-node", "service/malus"] runtime-metrics = ["service/runtime-metrics", "polkadot-node-metrics/runtime-metrics"] -staging-client = ["service/staging-client"] diff --git a/node/core/dispute-coordinator/src/db/v1.rs b/node/core/dispute-coordinator/src/db/v1.rs index 4d33949db644..2c643d341de2 100644 --- a/node/core/dispute-coordinator/src/db/v1.rs +++ b/node/core/dispute-coordinator/src/db/v1.rs @@ -16,6 +16,7 @@ //! `V1` database for the dispute coordinator. +use polkadot_node_primitives::DisputeStatus; use polkadot_node_subsystem::{SubsystemError, SubsystemResult}; use polkadot_node_subsystem_util::database::{DBTransaction, Database}; use polkadot_primitives::v2::{ @@ -31,7 +32,6 @@ use crate::{ backend::{Backend, BackendWriteOp, OverlayedBackend}, error::{FatalError, FatalResult}, metrics::Metrics, - status::DisputeStatus, DISPUTE_WINDOW, LOG_TARGET, }; diff --git a/node/core/dispute-coordinator/src/initialized.rs b/node/core/dispute-coordinator/src/initialized.rs index 075cfbb33c27..5f29245f33f8 100644 --- a/node/core/dispute-coordinator/src/initialized.rs +++ b/node/core/dispute-coordinator/src/initialized.rs @@ -26,8 +26,8 @@ use futures::{ use sc_keystore::LocalKeystore; use polkadot_node_primitives::{ - CandidateVotes, DisputeMessage, DisputeMessageCheckError, SignedDisputeStatement, - DISPUTE_WINDOW, + CandidateVotes, DisputeMessage, DisputeMessageCheckError, DisputeStatus, + SignedDisputeStatement, Timestamp, DISPUTE_WINDOW, }; use polkadot_node_subsystem::{ messages::{ @@ -49,7 +49,7 @@ use crate::{ error::{log_error, Error, FatalError, FatalResult, JfyiError, JfyiResult, Result}, import::{CandidateEnvironment, CandidateVoteState}, metrics::Metrics, - status::{get_active_with_status, Clock, DisputeStatus, Timestamp}, + status::{get_active_with_status, Clock}, DisputeCoordinatorSubsystem, LOG_TARGET, }; @@ -599,7 +599,9 @@ impl Initialized { }; gum::trace!(target: LOG_TARGET, "Loaded recent disputes from db"); - let _ = tx.send(recent_disputes.keys().cloned().collect()); + let _ = tx.send( + recent_disputes.into_iter().map(|(k, v)| (k.0, k.1, v)).collect::>(), + ); }, DisputeCoordinatorMessage::ActiveDisputes(tx) => { // Return error if session information is missing. diff --git a/node/core/dispute-coordinator/src/status.rs b/node/core/dispute-coordinator/src/status.rs index d2ad551bd9ad..6332c3653274 100644 --- a/node/core/dispute-coordinator/src/status.rs +++ b/node/core/dispute-coordinator/src/status.rs @@ -14,125 +14,18 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -use std::time::{SystemTime, UNIX_EPOCH}; - -use parity_scale_codec::{Decode, Encode}; +use polkadot_node_primitives::{dispute_is_inactive, DisputeStatus, Timestamp}; use polkadot_primitives::v2::{CandidateHash, SessionIndex}; +use std::time::{SystemTime, UNIX_EPOCH}; use crate::LOG_TARGET; -/// The choice here is fairly arbitrary. But any dispute that concluded more than a few minutes ago -/// is not worth considering anymore. Changing this value has little to no bearing on consensus, -/// and really only affects the work that the node might do on startup during periods of many -/// disputes. -pub const ACTIVE_DURATION_SECS: Timestamp = 180; - -/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. -pub type Timestamp = u64; - -/// The status of dispute. This is a state machine which can be altered by the -/// helper methods. -#[derive(Debug, Clone, Copy, Encode, Decode, PartialEq)] -pub enum DisputeStatus { - /// The dispute is active and unconcluded. - #[codec(index = 0)] - Active, - /// The dispute has been concluded in favor of the candidate - /// since the given timestamp. - #[codec(index = 1)] - ConcludedFor(Timestamp), - /// The dispute has been concluded against the candidate - /// since the given timestamp. - /// - /// This takes precedence over `ConcludedFor` in the case that - /// both are true, which is impossible unless a large amount of - /// validators are participating on both sides. - #[codec(index = 2)] - ConcludedAgainst(Timestamp), - /// Dispute has been confirmed (more than `byzantine_threshold` have already participated/ or - /// we have seen the candidate included already/participated successfully ourselves). - #[codec(index = 3)] - Confirmed, -} - -impl DisputeStatus { - /// Initialize the status to the active state. - pub fn active() -> DisputeStatus { - DisputeStatus::Active - } - - /// Move status to confirmed status, if not yet concluded/confirmed already. - pub fn confirm(self) -> DisputeStatus { - match self { - DisputeStatus::Active => DisputeStatus::Confirmed, - DisputeStatus::Confirmed => DisputeStatus::Confirmed, - DisputeStatus::ConcludedFor(_) | DisputeStatus::ConcludedAgainst(_) => self, - } - } - - /// Check whether the dispute is not a spam dispute. - pub fn is_confirmed_concluded(&self) -> bool { - match self { - &DisputeStatus::Confirmed | - &DisputeStatus::ConcludedFor(_) | - DisputeStatus::ConcludedAgainst(_) => true, - &DisputeStatus::Active => false, - } - } - - /// Transition the status to a new status after observing the dispute has concluded for the candidate. - /// This may be a no-op if the status was already concluded. - pub fn concluded_for(self, now: Timestamp) -> DisputeStatus { - match self { - DisputeStatus::Active | DisputeStatus::Confirmed => DisputeStatus::ConcludedFor(now), - DisputeStatus::ConcludedFor(at) => DisputeStatus::ConcludedFor(std::cmp::min(at, now)), - against => against, - } - } - - /// Transition the status to a new status after observing the dispute has concluded against the candidate. - /// This may be a no-op if the status was already concluded. - pub fn concluded_against(self, now: Timestamp) -> DisputeStatus { - match self { - DisputeStatus::Active | DisputeStatus::Confirmed => - DisputeStatus::ConcludedAgainst(now), - DisputeStatus::ConcludedFor(at) => - DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), - DisputeStatus::ConcludedAgainst(at) => - DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), - } - } - - /// Whether the disputed candidate is possibly invalid. - pub fn is_possibly_invalid(&self) -> bool { - match self { - DisputeStatus::Active | - DisputeStatus::Confirmed | - DisputeStatus::ConcludedAgainst(_) => true, - DisputeStatus::ConcludedFor(_) => false, - } - } - - /// Yields the timestamp this dispute concluded at, if any. - pub fn concluded_at(&self) -> Option { - match self { - DisputeStatus::Active | DisputeStatus::Confirmed => None, - DisputeStatus::ConcludedFor(at) | DisputeStatus::ConcludedAgainst(at) => Some(*at), - } - } -} - /// Get active disputes as iterator, preserving its `DisputeStatus`. pub fn get_active_with_status( recent_disputes: impl Iterator, now: Timestamp, ) -> impl Iterator { - recent_disputes.filter_map(move |(disputed, status)| { - status - .concluded_at() - .filter(|at| *at + ACTIVE_DURATION_SECS < now) - .map_or(Some((disputed, status)), |_| None) - }) + recent_disputes.filter(move |(_, status)| !dispute_is_inactive(status, &now)) } pub trait Clock: Send + Sync { diff --git a/node/core/dispute-coordinator/src/tests.rs b/node/core/dispute-coordinator/src/tests.rs index 39fdc3a037e5..ff85319599ce 100644 --- a/node/core/dispute-coordinator/src/tests.rs +++ b/node/core/dispute-coordinator/src/tests.rs @@ -49,6 +49,7 @@ use sp_keyring::Sr25519Keyring; use sp_keystore::{SyncCryptoStore, SyncCryptoStorePtr}; use ::test_helpers::{dummy_candidate_receipt_bad_sig, dummy_digest, dummy_hash}; +use polkadot_node_primitives::{Timestamp, ACTIVE_DURATION_SECS}; use polkadot_node_subsystem::{ jaeger, messages::{AllMessages, BlockDescription, RuntimeApiMessage, RuntimeApiRequest}, @@ -66,7 +67,7 @@ use crate::{ backend::Backend, metrics::Metrics, participation::{participation_full_happy_path, participation_missing_availability}, - status::{Clock, Timestamp, ACTIVE_DURATION_SECS}, + status::Clock, Config, DisputeCoordinatorSubsystem, }; diff --git a/node/core/provisioner/Cargo.toml b/node/core/provisioner/Cargo.toml index 4f18c10aba35..77d0794754a6 100644 --- a/node/core/provisioner/Cargo.toml +++ b/node/core/provisioner/Cargo.toml @@ -13,8 +13,8 @@ polkadot-primitives = { path = "../../../primitives" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-node-subsystem-util = { path = "../../subsystem-util" } -futures-timer = "3.0.2" rand = "0.8.5" +futures-timer = "3.0.2" fatality = "0.0.6" [dev-dependencies] @@ -22,6 +22,3 @@ sp-application-crypto = { git = "https://github.com/paritytech/substrate", branc sp-keystore = { git = "https://github.com/paritytech/substrate", branch = "master" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" } - -[features] -staging-client = [] diff --git a/node/core/provisioner/src/disputes/mod.rs b/node/core/provisioner/src/disputes/mod.rs new file mode 100644 index 000000000000..404e800702b1 --- /dev/null +++ b/node/core/provisioner/src/disputes/mod.rs @@ -0,0 +1,53 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! The disputes module is responsible for selecting dispute votes to be sent with the inherent data. It contains two +//! different implementations, extracted in two separate modules - `random_selection` and `prioritized_selection`. Which +//! implementation will be executed depends on the version of the runtime. Runtime v2 supports `random_selection`. Runtime +//! v3 and above - `prioritized_selection`. The entrypoint to these implementations is the `select_disputes` function. +//! prioritized_selection` is considered superior and will be the default one in the future. Refer to the documentation of +//! the modules for more details about each implementation. + +use crate::LOG_TARGET; +use futures::channel::oneshot; +use polkadot_node_primitives::CandidateVotes; +use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer}; +use polkadot_primitives::v2::{CandidateHash, SessionIndex}; + +/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`. +async fn request_votes( + sender: &mut impl overseer::ProvisionerSenderTrait, + disputes_to_query: Vec<(SessionIndex, CandidateHash)>, +) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> { + let (tx, rx) = oneshot::channel(); + // Bounded by block production - `ProvisionerMessage::RequestInherentData`. + sender.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes( + disputes_to_query, + tx, + )); + + match rx.await { + Ok(v) => v, + Err(oneshot::Canceled) => { + gum::warn!(target: LOG_TARGET, "Unable to query candidate votes"); + Vec::new() + }, + } +} + +pub(crate) mod prioritized_selection; + +pub(crate) mod random_selection; diff --git a/node/core/provisioner/src/disputes/prioritized_selection/mod.rs b/node/core/provisioner/src/disputes/prioritized_selection/mod.rs new file mode 100644 index 000000000000..6582f0a612ff --- /dev/null +++ b/node/core/provisioner/src/disputes/prioritized_selection/mod.rs @@ -0,0 +1,470 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! This module uses different approach for selecting dispute votes. It queries the Runtime +//! about the votes already known onchain and tries to select only relevant votes. Refer to +//! the documentation of `select_disputes` for more details about the actual implementation. + +use crate::{error::GetOnchainDisputesError, metrics, LOG_TARGET}; +use futures::channel::oneshot; +use polkadot_node_primitives::{dispute_is_inactive, CandidateVotes, DisputeStatus, Timestamp}; +use polkadot_node_subsystem::{ + errors::RuntimeApiError, + messages::{DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, + overseer, ActivatedLeaf, +}; +use polkadot_primitives::v2::{ + supermajority_threshold, CandidateHash, DisputeState, DisputeStatement, DisputeStatementSet, + Hash, MultiDisputeStatementSet, SessionIndex, ValidatorIndex, +}; +use std::{ + collections::{BTreeMap, HashMap}, + time::{SystemTime, UNIX_EPOCH}, +}; + +#[cfg(test)] +mod tests; + +/// The maximum number of disputes Provisioner will include in the inherent data. +/// Serves as a protection not to flood the Runtime with excessive data. +#[cfg(not(test))] +pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200_000; +#[cfg(test)] +pub const MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME: usize = 200; + +/// Controls how much dispute votes to be fetched from the runtime per iteration in `fn vote_selection`. +/// The purpose is to fetch the votes in batches until `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` is +/// reached. This value should definitely be less than `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME`. +/// +/// The ratio `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` / `VOTES_SELECTION_BATCH_SIZE` gives an +/// approximation about how many runtime requests will be issued to fetch votes from the runtime in +/// a single `select_disputes` call. Ideally we don't want to make more than 2-3 calls. In practice +/// it's hard to predict this number because we can't guess how many new votes (for the runtime) a +/// batch will contain. +/// +/// The value below is reached by: `MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME` / 2 + 10% +/// The 10% makes approximately means '10% new votes'. Tweak this if provisioner makes excessive +/// number of runtime calls. +#[cfg(not(test))] +const VOTES_SELECTION_BATCH_SIZE: usize = 1_100; +#[cfg(test)] +const VOTES_SELECTION_BATCH_SIZE: usize = 11; // Just a small value for tests. Doesn't follow the rules above + +/// Implements the `select_disputes` function which selects dispute votes which should +/// be sent to the Runtime. +/// +/// # How the prioritization works +/// +/// Generally speaking disputes can be described as: +/// * Active vs Inactive +/// * Known vs Unknown onchain +/// * Offchain vs Onchain +/// * Concluded onchain vs Unconcluded onchain +/// +/// Provisioner fetches all disputes from `dispute-coordinator` and separates them in multiple partitions. +/// Please refer to `struct PartitionedDisputes` for details about the actual partitions. +/// Each partition has got a priority implicitly assigned to it and the disputes are selected based on this +/// priority (e.g. disputes in partition 1, then if there is space - disputes from partition 2 and so on). +/// +/// # Votes selection +/// +/// Besides the prioritization described above the votes in each partition are filtered too. Provisioner +/// fetches all onchain votes and filters them out from all partitions. As a result the Runtime receives +/// only fresh votes (votes it didn't know about). +/// +/// # How the onchain votes are fetched +/// +/// The logic outlined above relies on `RuntimeApiRequest::Disputes` message from the Runtime. The user +/// check the Runtime version before calling `select_disputes`. If the function is used with old runtime +/// an error is logged and the logic will continue with empty onchain votes HashMap. +pub async fn select_disputes( + sender: &mut Sender, + metrics: &metrics::Metrics, + leaf: &ActivatedLeaf, +) -> MultiDisputeStatementSet +where + Sender: overseer::ProvisionerSenderTrait, +{ + gum::trace!( + target: LOG_TARGET, + ?leaf, + "Selecting disputes for inherent data using prioritized selection" + ); + + // Fetch the onchain disputes. We'll do a prioritization based on them. + let onchain = match get_onchain_disputes(sender, leaf.hash.clone()).await { + Ok(r) => r, + Err(GetOnchainDisputesError::NotSupported(runtime_api_err, relay_parent)) => { + // Runtime version is checked before calling this method, so the error below should never happen! + gum::error!( + target: LOG_TARGET, + ?runtime_api_err, + ?relay_parent, + "Can't fetch onchain disputes, because ParachainHost runtime api version is old. Will continue with empty onchain disputes set.", + ); + HashMap::new() + }, + Err(GetOnchainDisputesError::Channel) => { + // This error usually means the node is shutting down. Log just in case. + gum::debug!( + target: LOG_TARGET, + "Channel error occurred while fetching onchain disputes. Will continue with empty onchain disputes set.", + ); + HashMap::new() + }, + Err(GetOnchainDisputesError::Execution(runtime_api_err, parent_hash)) => { + gum::warn!( + target: LOG_TARGET, + ?runtime_api_err, + ?parent_hash, + "Unexpected execution error occurred while fetching onchain votes. Will continue with empty onchain disputes set.", + ); + HashMap::new() + }, + }; + + let recent_disputes = request_disputes(sender).await; + gum::trace!( + target: LOG_TARGET, + ?leaf, + "Got {} recent disputes and {} onchain disputes.", + recent_disputes.len(), + onchain.len(), + ); + + let partitioned = partition_recent_disputes(recent_disputes, &onchain); + metrics.on_partition_recent_disputes(&partitioned); + + if partitioned.inactive_unknown_onchain.len() > 0 { + gum::warn!( + target: LOG_TARGET, + ?leaf, + "Got {} inactive unknown onchain disputes. This should not happen!", + partitioned.inactive_unknown_onchain.len() + ); + } + let result = vote_selection(sender, partitioned, &onchain).await; + + make_multi_dispute_statement_set(metrics, result) +} + +/// Selects dispute votes from `PartitionedDisputes` which should be sent to the runtime. Votes which +/// are already onchain are filtered out. Result should be sorted by `(SessionIndex, CandidateHash)` +/// which is enforced by the `BTreeMap`. This is a requirement from the runtime. +async fn vote_selection( + sender: &mut Sender, + partitioned: PartitionedDisputes, + onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>, +) -> BTreeMap<(SessionIndex, CandidateHash), CandidateVotes> +where + Sender: overseer::ProvisionerSenderTrait, +{ + // fetch in batches until there are enough votes + let mut disputes = partitioned.into_iter().collect::>(); + let mut total_votes_len = 0; + let mut result = BTreeMap::new(); + let mut request_votes_counter = 0; + while !disputes.is_empty() { + let batch_size = std::cmp::min(VOTES_SELECTION_BATCH_SIZE, disputes.len()); + let batch = Vec::from_iter(disputes.drain(0..batch_size)); + + // Filter votes which are already onchain + request_votes_counter += 1; + let votes = super::request_votes(sender, batch) + .await + .into_iter() + .map(|(session_index, candidate_hash, mut votes)| { + let onchain_state = + if let Some(onchain_state) = onchain.get(&(session_index, candidate_hash)) { + onchain_state + } else { + // onchain knows nothing about this dispute - add all votes + return (session_index, candidate_hash, votes) + }; + + votes.valid.retain(|validator_idx, (statement_kind, _)| { + is_vote_worth_to_keep( + validator_idx, + DisputeStatement::Valid(*statement_kind), + &onchain_state, + ) + }); + votes.invalid.retain(|validator_idx, (statement_kind, _)| { + is_vote_worth_to_keep( + validator_idx, + DisputeStatement::Invalid(*statement_kind), + &onchain_state, + ) + }); + (session_index, candidate_hash, votes) + }) + .collect::>(); + + // Check if votes are within the limit + for (session_index, candidate_hash, selected_votes) in votes { + let votes_len = selected_votes.valid.len() + selected_votes.invalid.len(); + if votes_len + total_votes_len > MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME { + // we are done - no more votes can be added + return result + } + result.insert((session_index, candidate_hash), selected_votes); + total_votes_len += votes_len + } + } + + gum::trace!( + target: LOG_TARGET, + ?request_votes_counter, + "vote_selection DisputeCoordinatorMessage::QueryCandidateVotes counter", + ); + + result +} + +/// Contains disputes by partitions. Check the field comments for further details. +#[derive(Default)] +pub(crate) struct PartitionedDisputes { + /// Concluded and inactive disputes which are completely unknown for the Runtime. + /// Hopefully this should never happen. + /// Will be sent to the Runtime with FIRST priority. + pub inactive_unknown_onchain: Vec<(SessionIndex, CandidateHash)>, + /// Disputes which are INACTIVE locally but they are unconcluded for the Runtime. + /// A dispute can have enough local vote to conclude and at the same time the + /// Runtime knows nothing about them at treats it as unconcluded. This discrepancy + /// should be treated with high priority. + /// Will be sent to the Runtime with SECOND priority. + pub inactive_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>, + /// Active disputes completely unknown onchain. + /// Will be sent to the Runtime with THIRD priority. + pub active_unknown_onchain: Vec<(SessionIndex, CandidateHash)>, + /// Active disputes unconcluded onchain. + /// Will be sent to the Runtime with FOURTH priority. + pub active_unconcluded_onchain: Vec<(SessionIndex, CandidateHash)>, + /// Active disputes concluded onchain. New votes are not that important for + /// this partition. + /// Will be sent to the Runtime with FIFTH priority. + pub active_concluded_onchain: Vec<(SessionIndex, CandidateHash)>, + /// Inactive disputes which has concluded onchain. These are not interesting and + /// won't be sent to the Runtime. + /// Will be DROPPED + pub inactive_concluded_onchain: Vec<(SessionIndex, CandidateHash)>, +} + +impl PartitionedDisputes { + fn new() -> PartitionedDisputes { + Default::default() + } + + fn into_iter(self) -> impl Iterator { + self.inactive_unknown_onchain + .into_iter() + .chain(self.inactive_unconcluded_onchain.into_iter()) + .chain(self.active_unknown_onchain.into_iter()) + .chain(self.active_unconcluded_onchain.into_iter()) + .chain(self.active_concluded_onchain.into_iter()) + // inactive_concluded_onchain is dropped on purpose + } +} + +fn secs_since_epoch() -> Timestamp { + match SystemTime::now().duration_since(UNIX_EPOCH) { + Ok(d) => d.as_secs(), + Err(e) => { + gum::warn!( + target: LOG_TARGET, + err = ?e, + "Error getting system time." + ); + 0 + }, + } +} + +fn concluded_onchain(onchain_state: &DisputeState) -> bool { + // Check if there are enough onchain votes for or against to conclude the dispute + let supermajority = supermajority_threshold(onchain_state.validators_for.len()); + + onchain_state.validators_for.count_ones() >= supermajority || + onchain_state.validators_against.count_ones() >= supermajority +} + +fn partition_recent_disputes( + recent: Vec<(SessionIndex, CandidateHash, DisputeStatus)>, + onchain: &HashMap<(SessionIndex, CandidateHash), DisputeState>, +) -> PartitionedDisputes { + let mut partitioned = PartitionedDisputes::new(); + + // Drop any duplicates + let unique_recent = recent + .into_iter() + .map(|(session_index, candidate_hash, dispute_state)| { + ((session_index, candidate_hash), dispute_state) + }) + .collect::>(); + + // Split recent disputes in ACTIVE and INACTIVE + let time_now = &secs_since_epoch(); + let (active, inactive): ( + Vec<(SessionIndex, CandidateHash, DisputeStatus)>, + Vec<(SessionIndex, CandidateHash, DisputeStatus)>, + ) = unique_recent + .into_iter() + .map(|((session_index, candidate_hash), dispute_state)| { + (session_index, candidate_hash, dispute_state) + }) + .partition(|(_, _, status)| !dispute_is_inactive(status, time_now)); + + // Split ACTIVE in three groups... + for (session_index, candidate_hash, _) in active { + match onchain.get(&(session_index, candidate_hash)) { + Some(d) => match concluded_onchain(d) { + true => partitioned.active_concluded_onchain.push((session_index, candidate_hash)), + false => + partitioned.active_unconcluded_onchain.push((session_index, candidate_hash)), + }, + None => partitioned.active_unknown_onchain.push((session_index, candidate_hash)), + }; + } + + // ... and INACTIVE in three more + for (session_index, candidate_hash, _) in inactive { + match onchain.get(&(session_index, candidate_hash)) { + Some(onchain_state) => + if concluded_onchain(onchain_state) { + partitioned.inactive_concluded_onchain.push((session_index, candidate_hash)); + } else { + partitioned.inactive_unconcluded_onchain.push((session_index, candidate_hash)); + }, + None => partitioned.inactive_unknown_onchain.push((session_index, candidate_hash)), + } + } + + partitioned +} + +/// Determines if a vote is worth to be kept, based on the onchain disputes +fn is_vote_worth_to_keep( + validator_index: &ValidatorIndex, + dispute_statement: DisputeStatement, + onchain_state: &DisputeState, +) -> bool { + let offchain_vote = match dispute_statement { + DisputeStatement::Valid(_) => true, + DisputeStatement::Invalid(_) => false, + }; + let in_validators_for = onchain_state + .validators_for + .get(validator_index.0 as usize) + .as_deref() + .copied() + .unwrap_or(false); + let in_validators_against = onchain_state + .validators_against + .get(validator_index.0 as usize) + .as_deref() + .copied() + .unwrap_or(false); + + if in_validators_for && in_validators_against { + // The validator has double voted and runtime knows about this. Ignore this vote. + return false + } + + if offchain_vote && in_validators_against || !offchain_vote && in_validators_for { + // offchain vote differs from the onchain vote + // we need this vote to punish the offending validator + return true + } + + // The vote is valid. Return true if it is not seen onchain. + !in_validators_for && !in_validators_against +} + +/// Request disputes identified by `CandidateHash` and the `SessionIndex`. +async fn request_disputes( + sender: &mut impl overseer::ProvisionerSenderTrait, +) -> Vec<(SessionIndex, CandidateHash, DisputeStatus)> { + let (tx, rx) = oneshot::channel(); + let msg = DisputeCoordinatorMessage::RecentDisputes(tx); + + // Bounded by block production - `ProvisionerMessage::RequestInherentData`. + sender.send_unbounded_message(msg); + + let recent_disputes = rx.await.unwrap_or_else(|err| { + gum::warn!(target: LOG_TARGET, err=?err, "Unable to gather recent disputes"); + Vec::new() + }); + recent_disputes +} + +// This function produces the return value for `pub fn select_disputes()` +fn make_multi_dispute_statement_set( + metrics: &metrics::Metrics, + dispute_candidate_votes: BTreeMap<(SessionIndex, CandidateHash), CandidateVotes>, +) -> MultiDisputeStatementSet { + // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. + dispute_candidate_votes + .into_iter() + .map(|((session_index, candidate_hash), votes)| { + let valid_statements = votes + .valid + .into_iter() + .map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig)); + + let invalid_statements = votes + .invalid + .into_iter() + .map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig)); + + metrics.inc_valid_statements_by(valid_statements.len()); + metrics.inc_invalid_statements_by(invalid_statements.len()); + metrics.inc_dispute_statement_sets_by(1); + + DisputeStatementSet { + candidate_hash, + session: session_index, + statements: valid_statements.chain(invalid_statements).collect(), + } + }) + .collect() +} + +/// Gets the on-chain disputes at a given block number and returns them as a `HashMap` so that searching in them is cheap. +pub async fn get_onchain_disputes( + sender: &mut Sender, + relay_parent: Hash, +) -> Result, GetOnchainDisputesError> +where + Sender: overseer::ProvisionerSenderTrait, +{ + gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes"); + let (tx, rx) = oneshot::channel(); + sender + .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Disputes(tx))) + .await; + + rx.await + .map_err(|_| GetOnchainDisputesError::Channel) + .and_then(|res| { + res.map_err(|e| match e { + RuntimeApiError::Execution { .. } => + GetOnchainDisputesError::Execution(e, relay_parent), + RuntimeApiError::NotSupported { .. } => + GetOnchainDisputesError::NotSupported(e, relay_parent), + }) + }) + .map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect()) +} diff --git a/node/core/provisioner/src/disputes/prioritized_selection/tests.rs b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs new file mode 100644 index 000000000000..f76107dc65d4 --- /dev/null +++ b/node/core/provisioner/src/disputes/prioritized_selection/tests.rs @@ -0,0 +1,722 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use super::super::{ + super::{tests::common::test_harness, *}, + prioritized_selection::*, +}; +use bitvec::prelude::*; +use futures::channel::mpsc; +use polkadot_node_primitives::{CandidateVotes, DisputeStatus, ACTIVE_DURATION_SECS}; +use polkadot_node_subsystem::messages::{ + AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest, +}; +use polkadot_node_subsystem_test_helpers::TestSubsystemSender; +use polkadot_primitives::v2::{ + CandidateHash, DisputeState, InvalidDisputeStatementKind, SessionIndex, + ValidDisputeStatementKind, ValidatorSignature, +}; +use std::sync::Arc; +use test_helpers; + +// +// Unit tests for various functions +// +#[test] +fn should_keep_vote_behaves() { + let onchain_state = DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 0, 1, 0, 1], + validators_against: bitvec![u8, Lsb0; 0, 1, 0, 0, 1], + start: 1, + concluded_at: None, + }; + + let local_valid_known = (ValidatorIndex(0), ValidDisputeStatementKind::Explicit); + let local_valid_unknown = (ValidatorIndex(3), ValidDisputeStatementKind::Explicit); + + let local_invalid_known = (ValidatorIndex(1), InvalidDisputeStatementKind::Explicit); + let local_invalid_unknown = (ValidatorIndex(3), InvalidDisputeStatementKind::Explicit); + + assert_eq!( + is_vote_worth_to_keep( + &local_valid_known.0, + DisputeStatement::Valid(local_valid_known.1), + &onchain_state + ), + false + ); + assert_eq!( + is_vote_worth_to_keep( + &local_valid_unknown.0, + DisputeStatement::Valid(local_valid_unknown.1), + &onchain_state + ), + true + ); + assert_eq!( + is_vote_worth_to_keep( + &local_invalid_known.0, + DisputeStatement::Invalid(local_invalid_known.1), + &onchain_state + ), + false + ); + assert_eq!( + is_vote_worth_to_keep( + &local_invalid_unknown.0, + DisputeStatement::Invalid(local_invalid_unknown.1), + &onchain_state + ), + true + ); + + //double voting - onchain knows + let local_double_vote_onchain_knows = + (ValidatorIndex(4), InvalidDisputeStatementKind::Explicit); + assert_eq!( + is_vote_worth_to_keep( + &local_double_vote_onchain_knows.0, + DisputeStatement::Invalid(local_double_vote_onchain_knows.1), + &onchain_state + ), + false + ); + + //double voting - onchain doesn't know + let local_double_vote_onchain_doesnt_knows = + (ValidatorIndex(0), InvalidDisputeStatementKind::Explicit); + assert_eq!( + is_vote_worth_to_keep( + &local_double_vote_onchain_doesnt_knows.0, + DisputeStatement::Invalid(local_double_vote_onchain_doesnt_knows.1), + &onchain_state + ), + true + ); + + // empty onchain state + let empty_onchain_state = DisputeState { + validators_for: BitVec::new(), + validators_against: BitVec::new(), + start: 1, + concluded_at: None, + }; + assert_eq!( + is_vote_worth_to_keep( + &local_double_vote_onchain_doesnt_knows.0, + DisputeStatement::Invalid(local_double_vote_onchain_doesnt_knows.1), + &empty_onchain_state + ), + true + ); +} + +#[test] +fn partitioning_happy_case() { + let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new(); + let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new(); + let time_now = secs_since_epoch(); + + // Create one dispute for each partition + let inactive_unknown_onchain = ( + 0, + CandidateHash(Hash::random()), + DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2), + ); + input.push(inactive_unknown_onchain.clone()); + + let inactive_unconcluded_onchain = ( + 1, + CandidateHash(Hash::random()), + DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2), + ); + input.push(inactive_unconcluded_onchain.clone()); + onchain.insert( + (inactive_unconcluded_onchain.0, inactive_unconcluded_onchain.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0], + validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: None, + }, + ); + + let active_unknown_onchain = (2, CandidateHash(Hash::random()), DisputeStatus::Active); + input.push(active_unknown_onchain.clone()); + + let active_unconcluded_onchain = (3, CandidateHash(Hash::random()), DisputeStatus::Active); + input.push(active_unconcluded_onchain.clone()); + onchain.insert( + (active_unconcluded_onchain.0, active_unconcluded_onchain.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0], + validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: None, + }, + ); + + let active_concluded_onchain = (4, CandidateHash(Hash::random()), DisputeStatus::Active); + input.push(active_concluded_onchain.clone()); + onchain.insert( + (active_concluded_onchain.0, active_concluded_onchain.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1, 0], + validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: Some(3), + }, + ); + + let inactive_concluded_onchain = ( + 5, + CandidateHash(Hash::random()), + DisputeStatus::ConcludedFor(time_now - ACTIVE_DURATION_SECS * 2), + ); + input.push(inactive_concluded_onchain.clone()); + onchain.insert( + (inactive_concluded_onchain.0, inactive_concluded_onchain.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 0, 0], + validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: Some(3), + }, + ); + + let result = partition_recent_disputes(input, &onchain); + + // Check results + assert_eq!(result.inactive_unknown_onchain.len(), 1); + assert_eq!( + result.inactive_unknown_onchain.get(0).unwrap(), + &(inactive_unknown_onchain.0, inactive_unknown_onchain.1) + ); + + assert_eq!(result.inactive_unconcluded_onchain.len(), 1); + assert_eq!( + result.inactive_unconcluded_onchain.get(0).unwrap(), + &(inactive_unconcluded_onchain.0, inactive_unconcluded_onchain.1) + ); + + assert_eq!(result.active_unknown_onchain.len(), 1); + assert_eq!( + result.active_unknown_onchain.get(0).unwrap(), + &(active_unknown_onchain.0, active_unknown_onchain.1) + ); + + assert_eq!(result.active_unconcluded_onchain.len(), 1); + assert_eq!( + result.active_unconcluded_onchain.get(0).unwrap(), + &(active_unconcluded_onchain.0, active_unconcluded_onchain.1) + ); + + assert_eq!(result.active_concluded_onchain.len(), 1); + assert_eq!( + result.active_concluded_onchain.get(0).unwrap(), + &(active_concluded_onchain.0, active_concluded_onchain.1) + ); + + assert_eq!(result.inactive_concluded_onchain.len(), 1); + assert_eq!( + result.inactive_concluded_onchain.get(0).unwrap(), + &(inactive_concluded_onchain.0, inactive_concluded_onchain.1) + ); +} + +// This test verifies the double voting behavior. Currently we don't care if a supermajority is achieved with or +// without the 'help' of a double vote (a validator voting for and against at the same time). This makes the test +// a bit pointless but anyway I'm leaving it here to make this decision explicit and have the test code ready in +// case this behavior needs to be further tested in the future. +// Link to the PR with the discussions: https://github.com/paritytech/polkadot/pull/5567 +#[test] +fn partitioning_doubled_onchain_vote() { + let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new(); + let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new(); + + // Dispute A relies on a 'double onchain vote' to conclude. Validator with index 0 has voted both `for` and `against`. + // Despite that this dispute should be considered 'can conclude onchain'. + let dispute_a = (3, CandidateHash(Hash::random()), DisputeStatus::Active); + // Dispute B has supermajority + 1 votes, so the doubled onchain vote doesn't affect it. It should be considered + // as 'can conclude onchain'. + let dispute_b = (4, CandidateHash(Hash::random()), DisputeStatus::Active); + input.push(dispute_a.clone()); + input.push(dispute_b.clone()); + onchain.insert( + (dispute_a.0, dispute_a.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 0, 0], + validators_against: bitvec![u8, Lsb0; 1, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: None, + }, + ); + onchain.insert( + (dispute_b.0, dispute_b.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 1, 1, 1, 1, 1, 0], + validators_against: bitvec![u8, Lsb0; 1, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: None, + }, + ); + + let result = partition_recent_disputes(input, &onchain); + + assert_eq!(result.active_unconcluded_onchain.len(), 0); + assert_eq!(result.active_concluded_onchain.len(), 2); +} + +#[test] +fn partitioning_duplicated_dispute() { + let mut input = Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new(); + let mut onchain = HashMap::<(u32, CandidateHash), DisputeState>::new(); + + let some_dispute = (3, CandidateHash(Hash::random()), DisputeStatus::Active); + input.push(some_dispute.clone()); + input.push(some_dispute.clone()); + onchain.insert( + (some_dispute.0, some_dispute.1.clone()), + DisputeState { + validators_for: bitvec![u8, Lsb0; 1, 1, 1, 0, 0, 0, 0, 0, 0], + validators_against: bitvec![u8, Lsb0; 0, 0, 0, 0, 0, 0, 0, 0, 0], + start: 1, + concluded_at: None, + }, + ); + + let result = partition_recent_disputes(input, &onchain); + + assert_eq!(result.active_unconcluded_onchain.len(), 1); + assert_eq!( + result.active_unconcluded_onchain.get(0).unwrap(), + &(some_dispute.0, some_dispute.1) + ); +} + +// +// end-to-end tests for select_disputes() +// + +async fn mock_overseer( + mut receiver: mpsc::UnboundedReceiver, + disputes_db: &mut TestDisputes, + vote_queries_count: &mut usize, +) { + while let Some(from_job) = receiver.next().await { + match from_job { + AllMessages::RuntimeApi(RuntimeApiMessage::Request( + _, + RuntimeApiRequest::Disputes(sender), + )) => { + let _ = sender.send(Ok(disputes_db + .onchain_disputes + .clone() + .into_iter() + .map(|(k, v)| (k.0, k.1, v)) + .collect::>())); + }, + AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"), + AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes(sender)) => { + let _ = sender.send(disputes_db.local_disputes.clone()); + }, + AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::QueryCandidateVotes( + disputes, + sender, + )) => { + *vote_queries_count += 1; + let mut res = Vec::new(); + for d in disputes.iter() { + let v = disputes_db.votes_db.get(d).unwrap().clone(); + res.push((d.0, d.1, v)); + } + + let _ = sender.send(res); + }, + _ => panic!("Unexpected message: {:?}", from_job), + } + } +} + +fn leaf() -> ActivatedLeaf { + ActivatedLeaf { + hash: Hash::repeat_byte(0xAA), + number: 0xAA, + status: LeafStatus::Fresh, + span: Arc::new(jaeger::Span::Disabled), + } +} + +struct TestDisputes { + pub local_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>, + pub votes_db: HashMap<(SessionIndex, CandidateHash), CandidateVotes>, + pub onchain_disputes: HashMap<(u32, CandidateHash), DisputeState>, + validators_count: usize, +} + +impl TestDisputes { + pub fn new(validators_count: usize) -> TestDisputes { + TestDisputes { + local_disputes: Vec::<(SessionIndex, CandidateHash, DisputeStatus)>::new(), + votes_db: HashMap::<(SessionIndex, CandidateHash), CandidateVotes>::new(), + onchain_disputes: HashMap::<(u32, CandidateHash), DisputeState>::new(), + validators_count, + } + } + + // Offchain disputes are on node side + fn add_offchain_dispute( + &mut self, + dispute: (SessionIndex, CandidateHash, DisputeStatus), + local_votes_count: usize, + dummy_receipt: CandidateReceipt, + ) { + self.local_disputes.push(dispute.clone()); + self.votes_db.insert( + (dispute.0, dispute.1), + CandidateVotes { + candidate_receipt: dummy_receipt, + valid: TestDisputes::generate_local_votes( + ValidDisputeStatementKind::Explicit, + 0, + local_votes_count, + ), + invalid: BTreeMap::new(), + }, + ); + } + + fn add_onchain_dispute( + &mut self, + dispute: (SessionIndex, CandidateHash, DisputeStatus), + onchain_votes_count: usize, + ) { + let concluded_at = match dispute.2 { + DisputeStatus::Active | DisputeStatus::Confirmed => None, + DisputeStatus::ConcludedAgainst(_) | DisputeStatus::ConcludedFor(_) => Some(1), + }; + self.onchain_disputes.insert( + (dispute.0, dispute.1.clone()), + DisputeState { + validators_for: TestDisputes::generate_bitvec( + self.validators_count, + 0, + onchain_votes_count, + ), + validators_against: bitvec![u8, Lsb0; 0; self.validators_count], + start: 1, + concluded_at, + }, + ); + } + + pub fn add_unconfirmed_disputes_concluded_onchain( + &mut self, + dispute_count: usize, + ) -> (u32, usize) { + let local_votes_count = self.validators_count * 90 / 100; + let onchain_votes_count = self.validators_count * 80 / 100; + let session_idx = 0; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + self.add_onchain_dispute(d, onchain_votes_count); + } + + (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) + } + + pub fn add_unconfirmed_disputes_unconcluded_onchain( + &mut self, + dispute_count: usize, + ) -> (u32, usize) { + let local_votes_count = self.validators_count * 90 / 100; + let onchain_votes_count = self.validators_count * 40 / 100; + let session_idx = 1; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + self.add_onchain_dispute(d, onchain_votes_count); + } + + (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) + } + + pub fn add_unconfirmed_disputes_unknown_onchain( + &mut self, + dispute_count: usize, + ) -> (u32, usize) { + let local_votes_count = self.validators_count * 90 / 100; + let session_idx = 2; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::Active); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + } + (session_idx, local_votes_count * dispute_count) + } + + pub fn add_concluded_disputes_known_onchain(&mut self, dispute_count: usize) -> (u32, usize) { + let local_votes_count = self.validators_count * 90 / 100; + let onchain_votes_count = self.validators_count * 75 / 100; + let session_idx = 3; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::ConcludedFor(0)); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + self.add_onchain_dispute(d, onchain_votes_count); + } + (session_idx, (local_votes_count - onchain_votes_count) * dispute_count) + } + + pub fn add_concluded_disputes_unknown_onchain(&mut self, dispute_count: usize) -> (u32, usize) { + let local_votes_count = self.validators_count * 90 / 100; + let session_idx = 4; + let lf = leaf(); + let dummy_receipt = test_helpers::dummy_candidate_receipt(lf.hash.clone()); + for _ in 0..dispute_count { + let d = (session_idx, CandidateHash(Hash::random()), DisputeStatus::ConcludedFor(0)); + self.add_offchain_dispute(d.clone(), local_votes_count, dummy_receipt.clone()); + } + (session_idx, local_votes_count * dispute_count) + } + + fn generate_local_votes( + statement_kind: T, + start_idx: usize, + count: usize, + ) -> BTreeMap { + assert!(start_idx < count); + (start_idx..count) + .map(|idx| { + ( + ValidatorIndex(idx as u32), + (statement_kind.clone(), test_helpers::dummy_signature()), + ) + }) + .collect::>() + } + + fn generate_bitvec( + validator_count: usize, + start_idx: usize, + count: usize, + ) -> BitVec { + assert!(start_idx < count); + assert!(start_idx + count < validator_count); + let mut res = bitvec![u8, Lsb0; 0; validator_count]; + for idx in start_idx..count { + res.set(idx, true); + } + + res + } +} + +#[test] +fn normal_flow() { + const VALIDATOR_COUNT: usize = 10; + const DISPUTES_PER_BATCH: usize = 2; + const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 1; + + let mut input = TestDisputes::new(VALIDATOR_COUNT); + + // active, concluded onchain + let (third_idx, third_votes) = + input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_BATCH); + + // active unconcluded onchain + let (first_idx, first_votes) = + input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_BATCH); + + //concluded disputes unknown onchain + let (fifth_idx, fifth_votes) = input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_BATCH); + + // concluded disputes known onchain - these should be ignored + let (_, _) = input.add_concluded_disputes_known_onchain(DISPUTES_PER_BATCH); + + // active disputes unknown onchain + let (second_idx, second_votes) = + input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_BATCH); + + let metrics = metrics::Metrics::new_dummy(); + let mut vote_queries: usize = 0; + test_harness( + |r| mock_overseer(r, &mut input, &mut vote_queries), + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let result = select_disputes(&mut tx, &metrics, &lf).await; + + assert!(!result.is_empty()); + + assert_eq!(result.len(), 4 * DISPUTES_PER_BATCH); + + // Naive checks that the result is partitioned correctly + let (first_batch, rest): (Vec, Vec) = + result.into_iter().partition(|d| d.session == first_idx); + assert_eq!(first_batch.len(), DISPUTES_PER_BATCH); + + let (second_batch, rest): (Vec, Vec) = + rest.into_iter().partition(|d| d.session == second_idx); + assert_eq!(second_batch.len(), DISPUTES_PER_BATCH); + + let (third_batch, rest): (Vec, Vec) = + rest.into_iter().partition(|d| d.session == third_idx); + assert_eq!(third_batch.len(), DISPUTES_PER_BATCH); + + let (fifth_batch, rest): (Vec, Vec) = + rest.into_iter().partition(|d| d.session == fifth_idx); + assert_eq!(fifth_batch.len(), DISPUTES_PER_BATCH); + + // Ensure there are no more disputes - fourth_batch should be dropped + assert_eq!(rest.len(), 0); + + assert_eq!( + first_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v), + first_votes + ); + assert_eq!( + second_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v), + second_votes + ); + assert_eq!( + third_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v), + third_votes + ); + assert_eq!( + fifth_batch.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v), + fifth_votes + ); + }, + ); + assert!(vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT); +} + +#[test] +fn many_batches() { + const VALIDATOR_COUNT: usize = 10; + const DISPUTES_PER_PARTITION: usize = 10; + + // 10 disputes per partition * 4 partitions = 40 disputes + // BATCH_SIZE = 11 + // => There should be no more than 40 / 11 queries ( ~4 ) + const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 4; + + let mut input = TestDisputes::new(VALIDATOR_COUNT); + + // active which can conclude onchain + input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_PARTITION); + + // active which can't conclude onchain + input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_PARTITION); + + //concluded disputes unknown onchain + input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_PARTITION); + + // concluded disputes known onchain + input.add_concluded_disputes_known_onchain(DISPUTES_PER_PARTITION); + + // active disputes unknown onchain + input.add_unconfirmed_disputes_unknown_onchain(DISPUTES_PER_PARTITION); + + let metrics = metrics::Metrics::new_dummy(); + let mut vote_queries: usize = 0; + test_harness( + |r| mock_overseer(r, &mut input, &mut vote_queries), + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let result = select_disputes(&mut tx, &metrics, &lf).await; + + assert!(!result.is_empty()); + + let vote_count = result.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v); + + assert!( + MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME - VALIDATOR_COUNT <= vote_count && + vote_count <= MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME, + "vote_count: {}", + vote_count + ); + }, + ); + + assert!( + vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT, + "vote_queries: {} ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: {}", + vote_queries, + ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT + ); +} + +#[test] +fn votes_above_limit() { + const VALIDATOR_COUNT: usize = 10; + const DISPUTES_PER_PARTITION: usize = 50; + const ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: usize = 4; + + let mut input = TestDisputes::new(VALIDATOR_COUNT); + + // active which can conclude onchain + let (_, second_votes) = + input.add_unconfirmed_disputes_concluded_onchain(DISPUTES_PER_PARTITION); + + // active which can't conclude onchain + let (_, first_votes) = + input.add_unconfirmed_disputes_unconcluded_onchain(DISPUTES_PER_PARTITION); + + //concluded disputes unknown onchain + let (_, third_votes) = input.add_concluded_disputes_unknown_onchain(DISPUTES_PER_PARTITION); + + assert!( + first_votes + second_votes + third_votes > 3 * MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME, + "Total relevant votes generated: {}", + first_votes + second_votes + third_votes + ); + + let metrics = metrics::Metrics::new_dummy(); + let mut vote_queries: usize = 0; + test_harness( + |r| mock_overseer(r, &mut input, &mut vote_queries), + |mut tx: TestSubsystemSender| async move { + let lf = leaf(); + let result = select_disputes(&mut tx, &metrics, &lf).await; + + assert!(!result.is_empty()); + + let vote_count = result.iter().map(|d| d.statements.len()).fold(0, |acc, v| acc + v); + + assert!( + MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME - VALIDATOR_COUNT <= vote_count && + vote_count <= MAX_DISPUTE_VOTES_FORWARDED_TO_RUNTIME, + "vote_count: {}", + vote_count + ); + }, + ); + + assert!( + vote_queries <= ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT, + "vote_queries: {} ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT: {}", + vote_queries, + ACCEPTABLE_RUNTIME_VOTES_QUERIES_COUNT + ); +} diff --git a/node/core/provisioner/src/disputes/random_selection/mod.rs b/node/core/provisioner/src/disputes/random_selection/mod.rs new file mode 100644 index 000000000000..7af025700bae --- /dev/null +++ b/node/core/provisioner/src/disputes/random_selection/mod.rs @@ -0,0 +1,194 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +//! This module selects all RECENT disputes, fetches the votes for them from dispute-coordinator and +//! returns them as MultiDisputeStatementSet. If the RECENT disputes are more than +//! `MAX_DISPUTES_FORWARDED_TO_RUNTIME` constant - the ACTIVE disputes plus a random selection of +//! RECENT disputes (up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME`) are returned instead. +//! If the ACTIVE disputes are also above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit - a random selection +//! of them is generated. + +use crate::{metrics, LOG_TARGET}; +use futures::channel::oneshot; +use polkadot_node_subsystem::{messages::DisputeCoordinatorMessage, overseer}; +use polkadot_primitives::v2::{ + CandidateHash, DisputeStatement, DisputeStatementSet, MultiDisputeStatementSet, SessionIndex, +}; +use std::collections::HashSet; + +/// The maximum number of disputes Provisioner will include in the inherent data. +/// Serves as a protection not to flood the Runtime with excessive data. +const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000; + +#[derive(Debug)] +enum RequestType { + /// Query recent disputes, could be an excessive amount. + Recent, + /// Query the currently active and very recently concluded disputes. + Active, +} + +/// Request open disputes identified by `CandidateHash` and the `SessionIndex`. +async fn request_disputes( + sender: &mut impl overseer::ProvisionerSenderTrait, + active_or_recent: RequestType, +) -> Vec<(SessionIndex, CandidateHash)> { + let disputes = match active_or_recent { + RequestType::Recent => { + let (tx, rx) = oneshot::channel(); + let msg = DisputeCoordinatorMessage::RecentDisputes(tx); + sender.send_unbounded_message(msg); + let recent_disputes = match rx.await { + Ok(r) => r, + Err(oneshot::Canceled) => { + gum::warn!( + target: LOG_TARGET, + "Channel closed: unable to gather {:?} disputes", + active_or_recent + ); + Vec::new() + }, + }; + recent_disputes + .into_iter() + .map(|(sesion_idx, candodate_hash, _)| (sesion_idx, candodate_hash)) + .collect::>() + }, + RequestType::Active => { + let (tx, rx) = oneshot::channel(); + let msg = DisputeCoordinatorMessage::ActiveDisputes(tx); + sender.send_unbounded_message(msg); + let active_disputes = match rx.await { + Ok(r) => r, + Err(oneshot::Canceled) => { + gum::warn!( + target: LOG_TARGET, + "Unable to gather {:?} disputes", + active_or_recent + ); + Vec::new() + }, + }; + active_disputes + }, + }; + + disputes +} + +/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent. +fn extend_by_random_subset_without_repetition( + acc: &mut Vec<(SessionIndex, CandidateHash)>, + extension: Vec<(SessionIndex, CandidateHash)>, + n: usize, +) { + use rand::Rng; + + let lut = acc.iter().cloned().collect::>(); + + let mut unique_new = + extension.into_iter().filter(|recent| !lut.contains(recent)).collect::>(); + + // we can simply add all + if unique_new.len() <= n { + acc.extend(unique_new) + } else { + acc.reserve(n); + let mut rng = rand::thread_rng(); + for _ in 0..n { + let idx = rng.gen_range(0..unique_new.len()); + acc.push(unique_new.swap_remove(idx)); + } + } + // assure sorting stays candid according to session index + acc.sort_unstable_by(|a, b| a.0.cmp(&b.0)); +} + +pub async fn select_disputes( + sender: &mut Sender, + metrics: &metrics::Metrics, +) -> MultiDisputeStatementSet +where + Sender: overseer::ProvisionerSenderTrait, +{ + gum::trace!(target: LOG_TARGET, "Selecting disputes for inherent data using random selection"); + + // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. + // It's heavier than `ActiveDisputes` but ensures that everything from the dispute + // window gets on-chain, unlike `ActiveDisputes`. + // In case of an overload condition, we limit ourselves to active disputes, and fill up to the + // upper bound of disputes to pass to wasm `fn create_inherent_data`. + // If the active ones are already exceeding the bounds, randomly select a subset. + let recent = request_disputes(sender, RequestType::Recent).await; + let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + gum::warn!( + target: LOG_TARGET, + "Recent disputes are excessive ({} > {}), reduce to active ones, and selected", + recent.len(), + MAX_DISPUTES_FORWARDED_TO_RUNTIME + ); + let mut active = request_disputes(sender, RequestType::Active).await; + let n_active = active.len(); + let active = if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { + let mut picked = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); + extend_by_random_subset_without_repetition( + &mut picked, + active, + MAX_DISPUTES_FORWARDED_TO_RUNTIME, + ); + picked + } else { + extend_by_random_subset_without_repetition( + &mut active, + recent, + MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), + ); + active + }; + active + } else { + recent + }; + + // Load all votes for all disputes from the coordinator. + let dispute_candidate_votes = super::request_votes(sender, disputes).await; + + // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. + dispute_candidate_votes + .into_iter() + .map(|(session_index, candidate_hash, votes)| { + let valid_statements = votes + .valid + .into_iter() + .map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig)); + + let invalid_statements = votes + .invalid + .into_iter() + .map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig)); + + metrics.inc_valid_statements_by(valid_statements.len()); + metrics.inc_invalid_statements_by(invalid_statements.len()); + metrics.inc_dispute_statement_sets_by(1); + + DisputeStatementSet { + candidate_hash, + session: session_index, + statements: valid_statements.chain(invalid_statements).collect(), + } + }) + .collect() +} diff --git a/node/core/provisioner/src/error.rs b/node/core/provisioner/src/error.rs index 05e437854eac..9fb958c4f339 100644 --- a/node/core/provisioner/src/error.rs +++ b/node/core/provisioner/src/error.rs @@ -88,9 +88,7 @@ pub enum GetOnchainDisputesError { #[error("runtime execution error occurred while fetching onchain disputes for parent {1}")] Execution(#[source] RuntimeApiError, Hash), - #[error( - "runtime doesn't support RuntimeApiRequest::Disputes/RuntimeApiRequest::StagingDisputes for parent {1}" - )] + #[error("runtime doesn't support RuntimeApiRequest::Disputes for parent {1}")] NotSupported(#[source] RuntimeApiError, Hash), } diff --git a/node/core/provisioner/src/lib.rs b/node/core/provisioner/src/lib.rs index 0f3099c7df33..301aec32c15b 100644 --- a/node/core/provisioner/src/lib.rs +++ b/node/core/provisioner/src/lib.rs @@ -25,29 +25,27 @@ use futures::{ }; use futures_timer::Delay; -use polkadot_node_primitives::CandidateVotes; use polkadot_node_subsystem::{ jaeger, messages::{ - CandidateBackingMessage, ChainApiMessage, DisputeCoordinatorMessage, ProvisionableData, - ProvisionerInherentData, ProvisionerMessage, + CandidateBackingMessage, ChainApiMessage, ProvisionableData, ProvisionerInherentData, + ProvisionerMessage, RuntimeApiMessage, RuntimeApiRequest, }, overseer, ActivatedLeaf, ActiveLeavesUpdate, FromOrchestra, LeafStatus, OverseerSignal, - PerLeafSpan, SpawnedSubsystem, SubsystemError, + PerLeafSpan, RuntimeApiError, SpawnedSubsystem, SubsystemError, }; use polkadot_node_subsystem_util::{ request_availability_cores, request_persisted_validation_data, TimeoutExt, }; use polkadot_primitives::v2::{ - BackedCandidate, BlockNumber, CandidateHash, CandidateReceipt, CoreState, DisputeState, - DisputeStatement, DisputeStatementSet, Hash, MultiDisputeStatementSet, OccupiedCoreAssumption, - SessionIndex, SignedAvailabilityBitfield, ValidatorIndex, + BackedCandidate, BlockNumber, CandidateReceipt, CoreState, Hash, OccupiedCoreAssumption, + SignedAvailabilityBitfield, ValidatorIndex, }; -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashMap}; +mod disputes; mod error; mod metrics; -mod onchain_disputes; pub use self::metrics::*; use error::{Error, FatalResult}; @@ -62,6 +60,9 @@ const SEND_INHERENT_DATA_TIMEOUT: std::time::Duration = core::time::Duration::fr const LOG_TARGET: &str = "parachain::provisioner"; +const PRIORITIZED_SELECTION_RUNTIME_VERSION_REQUIREMENT: u32 = + RuntimeApiRequest::DISPUTES_RUNTIME_REQUIREMENT; + /// The provisioner subsystem. pub struct ProvisionerSubsystem { metrics: Metrics, @@ -361,7 +362,18 @@ async fn send_inherent_data( relay_parent = ?leaf.hash, "Selecting disputes" ); - let disputes = select_disputes(from_job, metrics, leaf).await?; + + let disputes = match has_required_runtime( + from_job, + leaf.hash.clone(), + PRIORITIZED_SELECTION_RUNTIME_VERSION_REQUIREMENT, + ) + .await + { + true => disputes::prioritized_selection::select_disputes(from_job, metrics, leaf).await, + false => disputes::random_selection::select_disputes(from_job, metrics).await, + }; + gum::trace!( target: LOG_TARGET, relay_parent = ?leaf.hash, @@ -677,275 +689,55 @@ fn bitfields_indicate_availability( 3 * availability.count_ones() >= 2 * availability.len() } -#[derive(Debug)] -enum RequestType { - /// Query recent disputes, could be an excessive amount. - Recent, - /// Query the currently active and very recently concluded disputes. - Active, -} - -/// Request open disputes identified by `CandidateHash` and the `SessionIndex`. -async fn request_disputes( - sender: &mut impl overseer::ProvisionerSenderTrait, - active_or_recent: RequestType, -) -> Vec<(SessionIndex, CandidateHash)> { - let (tx, rx) = oneshot::channel(); - let msg = match active_or_recent { - RequestType::Recent => DisputeCoordinatorMessage::RecentDisputes(tx), - RequestType::Active => DisputeCoordinatorMessage::ActiveDisputes(tx), - }; - // Bounded by block production - `ProvisionerMessage::RequestInherentData`. - sender.send_unbounded_message(msg); - - let recent_disputes = match rx.await { - Ok(r) => r, - Err(oneshot::Canceled) => { - gum::warn!(target: LOG_TARGET, "Unable to gather {:?} disputes", active_or_recent); - Vec::new() - }, - }; - recent_disputes -} - -/// Request the relevant dispute statements for a set of disputes identified by `CandidateHash` and the `SessionIndex`. -async fn request_votes( +// If we have to be absolutely precise here, this method gets the version of the `ParachainHost` api. +// For brevity we'll just call it 'runtime version'. +async fn has_required_runtime( sender: &mut impl overseer::ProvisionerSenderTrait, - disputes_to_query: Vec<(SessionIndex, CandidateHash)>, -) -> Vec<(SessionIndex, CandidateHash, CandidateVotes)> { - // No need to send dummy request, if nothing to request: - if disputes_to_query.is_empty() { - gum::trace!(target: LOG_TARGET, "No disputes, nothing to request - returning empty `Vec`."); + relay_parent: Hash, + required_runtime_version: u32, +) -> bool { + gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching ParachainHost runtime api version"); - return Vec::new() - } let (tx, rx) = oneshot::channel(); - // Bounded by block production - `ProvisionerMessage::RequestInherentData`. - sender.send_unbounded_message(DisputeCoordinatorMessage::QueryCandidateVotes( - disputes_to_query, - tx, - )); + sender + .send_message(RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::Version(tx))) + .await; match rx.await { - Ok(v) => v, - Err(oneshot::Canceled) => { - gum::warn!(target: LOG_TARGET, "Unable to query candidate votes"); - Vec::new() + Result::Ok(Ok(runtime_version)) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?runtime_version, + ?required_runtime_version, + "Fetched ParachainHost runtime api version" + ); + runtime_version >= required_runtime_version }, - } -} - -/// Extend `acc` by `n` random, picks of not-yet-present in `acc` items of `recent` without repetition and additions of recent. -fn extend_by_random_subset_without_repetition( - acc: &mut Vec<(SessionIndex, CandidateHash)>, - extension: Vec<(SessionIndex, CandidateHash)>, - n: usize, -) { - use rand::Rng; - - let lut = acc.iter().cloned().collect::>(); - - let mut unique_new = - extension.into_iter().filter(|recent| !lut.contains(recent)).collect::>(); - - // we can simply add all - if unique_new.len() <= n { - acc.extend(unique_new) - } else { - acc.reserve(n); - let mut rng = rand::thread_rng(); - for _ in 0..n { - let idx = rng.gen_range(0..unique_new.len()); - acc.push(unique_new.swap_remove(idx)); - } - } - // assure sorting stays candid according to session index - acc.sort_unstable_by(|a, b| a.0.cmp(&b.0)); -} - -/// The maximum number of disputes Provisioner will include in the inherent data. -/// Serves as a protection not to flood the Runtime with excessive data. -const MAX_DISPUTES_FORWARDED_TO_RUNTIME: usize = 1_000; - -async fn select_disputes( - sender: &mut impl overseer::ProvisionerSenderTrait, - metrics: &metrics::Metrics, - _leaf: &ActivatedLeaf, -) -> Result { - // Helper lambda - // Gets the active disputes as input and partitions it in seen and unseen disputes by the Runtime - // Returns as much unseen disputes as possible and optionally some seen disputes up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. - let generate_unseen_active_subset = - |active: Vec<(SessionIndex, CandidateHash)>, - onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>| - -> Vec<(SessionIndex, CandidateHash)> { - let (seen_onchain, mut unseen_onchain): ( - Vec<(SessionIndex, CandidateHash)>, - Vec<(SessionIndex, CandidateHash)>, - ) = active.into_iter().partition(|d| onchain.contains_key(d)); - - if unseen_onchain.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { - // Even unseen on-chain don't fit within the limit. Add as many as possible. - let mut unseen_subset = Vec::with_capacity(MAX_DISPUTES_FORWARDED_TO_RUNTIME); - extend_by_random_subset_without_repetition( - &mut unseen_subset, - unseen_onchain, - MAX_DISPUTES_FORWARDED_TO_RUNTIME, - ); - unseen_subset - } else { - // Add all unseen onchain disputes and as much of the seen ones as there is space. - let n_unseen_onchain = unseen_onchain.len(); - extend_by_random_subset_without_repetition( - &mut unseen_onchain, - seen_onchain, - MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_unseen_onchain), - ); - unseen_onchain - } - }; - - // Helper lambda - // Extends the active disputes with recent ones up to `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit. Unseen recent disputes are prioritised. - let generate_active_and_unseen_recent_subset = - |recent: Vec<(SessionIndex, CandidateHash)>, - mut active: Vec<(SessionIndex, CandidateHash)>, - onchain: HashMap<(SessionIndex, CandidateHash), DisputeState>| - -> Vec<(SessionIndex, CandidateHash)> { - let mut n_active = active.len(); - // All active disputes can be sent. Fill the rest of the space with recent ones. - // We assume there is not enough space for all recent disputes. So we prioritise the unseen ones. - let (seen_onchain, unseen_onchain): ( - Vec<(SessionIndex, CandidateHash)>, - Vec<(SessionIndex, CandidateHash)>, - ) = recent.into_iter().partition(|d| onchain.contains_key(d)); - - extend_by_random_subset_without_repetition( - &mut active, - unseen_onchain, - MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), + Result::Ok(Err(RuntimeApiError::Execution { source: error, .. })) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + ?error, + "Execution error while fetching ParachainHost runtime api version" ); - n_active = active.len(); - - if n_active < MAX_DISPUTES_FORWARDED_TO_RUNTIME { - // Looks like we can add some of the seen disputes too - extend_by_random_subset_without_repetition( - &mut active, - seen_onchain, - MAX_DISPUTES_FORWARDED_TO_RUNTIME.saturating_sub(n_active), - ); - } - active - }; - - gum::trace!( - target: LOG_TARGET, - relay_parent = ?_leaf.hash, - "Request recent disputes" - ); - - // We use `RecentDisputes` instead of `ActiveDisputes` because redundancy is fine. - // It's heavier than `ActiveDisputes` but ensures that everything from the dispute - // window gets on-chain, unlike `ActiveDisputes`. - // In case of an overload condition, we limit ourselves to active disputes, and fill up to the - // upper bound of disputes to pass to wasm `fn create_inherent_data`. - // If the active ones are already exceeding the bounds, randomly select a subset. - let recent = request_disputes(sender, RequestType::Recent).await; - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Received recent disputes" - ); - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Request on chain disputes" - ); - - // On chain disputes are fetched from the runtime. We want to prioritise the inclusion of unknown - // disputes in the inherent data. The call relies on staging Runtime API. If the staging API is not - // enabled in the binary an empty set is generated which doesn't affect the rest of the logic. - let onchain = match onchain_disputes::get_onchain_disputes(sender, _leaf.hash.clone()).await { - Ok(r) => r, - Err(e) => { - gum::debug!( + false + }, + Result::Ok(Err(RuntimeApiError::NotSupported { .. })) => { + gum::trace!( target: LOG_TARGET, - ?e, - "Can't fetch onchain disputes. Will continue with empty onchain disputes set.", + ?relay_parent, + "NotSupported error while fetching ParachainHost runtime api version" ); - HashMap::new() + false }, - }; - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Received on chain disputes" - ); - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Filtering disputes" - ); - - let disputes = if recent.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { - gum::warn!( - target: LOG_TARGET, - "Recent disputes are excessive ({} > {}), reduce to active ones, and selected", - recent.len(), - MAX_DISPUTES_FORWARDED_TO_RUNTIME - ); - let active = request_disputes(sender, RequestType::Active).await; - if active.len() > MAX_DISPUTES_FORWARDED_TO_RUNTIME { - generate_unseen_active_subset(active, onchain) - } else { - generate_active_and_unseen_recent_subset(recent, active, onchain) - } - } else { - recent - }; - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Calling `request_votes`" - ); - - // Load all votes for all disputes from the coordinator. - let dispute_candidate_votes = request_votes(sender, disputes).await; - - gum::trace!( - target: LOG_TARGET, - relay_paent = ?_leaf.hash, - "Finished `request_votes`" - ); - - // Transform all `CandidateVotes` into `MultiDisputeStatementSet`. - Ok(dispute_candidate_votes - .into_iter() - .map(|(session_index, candidate_hash, votes)| { - let valid_statements = votes - .valid - .into_iter() - .map(|(i, (s, sig))| (DisputeStatement::Valid(s), i, sig)); - - let invalid_statements = votes - .invalid - .into_iter() - .map(|(i, (s, sig))| (DisputeStatement::Invalid(s), i, sig)); - - metrics.inc_valid_statements_by(valid_statements.len()); - metrics.inc_invalid_statements_by(invalid_statements.len()); - metrics.inc_dispute_statement_sets_by(1); - - DisputeStatementSet { - candidate_hash, - session: session_index, - statements: valid_statements.chain(invalid_statements).collect(), - } - }) - .collect()) + Result::Err(_) => { + gum::trace!( + target: LOG_TARGET, + ?relay_parent, + "Cancelled error while fetching ParachainHost runtime api version" + ); + false + }, + } } diff --git a/node/core/provisioner/src/metrics.rs b/node/core/provisioner/src/metrics.rs index 508c668f6e24..8b6bb37284cb 100644 --- a/node/core/provisioner/src/metrics.rs +++ b/node/core/provisioner/src/metrics.rs @@ -14,6 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use crate::disputes::prioritized_selection::PartitionedDisputes; use polkadot_node_subsystem_util::metrics::{self, prometheus}; #[derive(Clone)] @@ -32,6 +33,9 @@ struct MetricsInner { /// 4 hours on Polkadot. The metrics are updated only when the node authors a block, so values vary across nodes. inherent_data_dispute_statement_sets: prometheus::Counter, inherent_data_dispute_statements: prometheus::CounterVec, + + /// The disputes received from `disputes-coordinator` by partition + partitioned_disputes: prometheus::CounterVec, } /// Provisioner metrics. @@ -101,6 +105,44 @@ impl Metrics { .inc_by(disputes.try_into().unwrap_or(0)); } } + + pub(crate) fn on_partition_recent_disputes(&self, disputes: &PartitionedDisputes) { + if let Some(metrics) = &self.0 { + let PartitionedDisputes { + inactive_unknown_onchain, + inactive_unconcluded_onchain: inactive_unconcluded_known_onchain, + active_unknown_onchain, + active_unconcluded_onchain, + active_concluded_onchain, + inactive_concluded_onchain: inactive_concluded_known_onchain, + } = disputes; + + metrics + .partitioned_disputes + .with_label_values(&["inactive_unknown_onchain"]) + .inc_by(inactive_unknown_onchain.len().try_into().unwrap_or(0)); + metrics + .partitioned_disputes + .with_label_values(&["inactive_unconcluded_known_onchain"]) + .inc_by(inactive_unconcluded_known_onchain.len().try_into().unwrap_or(0)); + metrics + .partitioned_disputes + .with_label_values(&["active_unknown_onchain"]) + .inc_by(active_unknown_onchain.len().try_into().unwrap_or(0)); + metrics + .partitioned_disputes + .with_label_values(&["active_unconcluded_onchain"]) + .inc_by(active_unconcluded_onchain.len().try_into().unwrap_or(0)); + metrics + .partitioned_disputes + .with_label_values(&["active_concluded_onchain"]) + .inc_by(active_concluded_onchain.len().try_into().unwrap_or(0)); + metrics + .partitioned_disputes + .with_label_values(&["inactive_concluded_known_onchain"]) + .inc_by(inactive_concluded_known_onchain.len().try_into().unwrap_or(0)); + } + } } impl metrics::Metrics for Metrics { @@ -156,6 +198,16 @@ impl metrics::Metrics for Metrics { )?, registry, )?, + partitioned_disputes: prometheus::register( + prometheus::CounterVec::new( + prometheus::Opts::new( + "polkadot_parachain_provisioner_partitioned_disputes", + "some fancy description", + ), + &["partition"], + )?, + ®istry, + )?, }; Ok(Metrics(Some(metrics))) } diff --git a/node/core/provisioner/src/onchain_disputes.rs b/node/core/provisioner/src/onchain_disputes.rs deleted file mode 100644 index 6810f512173f..000000000000 --- a/node/core/provisioner/src/onchain_disputes.rs +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2017-2022 Parity Technologies (UK) Ltd. -// This file is part of Polkadot. - -// Polkadot is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. - -// Polkadot is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. - -// You should have received a copy of the GNU General Public License -// along with Polkadot. If not, see . - -use crate::error::GetOnchainDisputesError; -use polkadot_node_subsystem::overseer; -use polkadot_primitives::v2::{CandidateHash, DisputeState, Hash, SessionIndex}; -use std::collections::HashMap; - -pub async fn get_onchain_disputes( - _sender: &mut Sender, - _relay_parent: Hash, -) -> Result, GetOnchainDisputesError> -where - Sender: overseer::ProvisionerSenderTrait, -{ - let _onchain = Result::< - HashMap<(SessionIndex, CandidateHash), DisputeState>, - GetOnchainDisputesError, - >::Ok(HashMap::new()); - #[cfg(feature = "staging-client")] - let _onchain = self::staging_impl::get_onchain_disputes(_sender, _relay_parent).await; - - _onchain -} - -// Merge this module with the outer (current one) when promoting to stable -#[cfg(feature = "staging-client")] -mod staging_impl { - use super::*; // remove this when promoting to stable - use crate::LOG_TARGET; - use futures::channel::oneshot; - use polkadot_node_subsystem::{ - errors::RuntimeApiError, - messages::{RuntimeApiMessage, RuntimeApiRequest}, - SubsystemSender, - }; - - /// Gets the on-chain disputes at a given block number and returns them as a `HashSet` so that searching in them is cheap. - pub async fn get_onchain_disputes( - sender: &mut impl SubsystemSender, - relay_parent: Hash, - ) -> Result, GetOnchainDisputesError> { - gum::trace!(target: LOG_TARGET, ?relay_parent, "Fetching on-chain disputes"); - let (tx, rx) = oneshot::channel(); - sender - .send_message( - RuntimeApiMessage::Request(relay_parent, RuntimeApiRequest::StagingDisputes(tx)) - .into(), - ) - .await; - - rx.await - .map_err(|_| GetOnchainDisputesError::Channel) - .and_then(|res| { - res.map_err(|e| match e { - RuntimeApiError::Execution { .. } => - GetOnchainDisputesError::Execution(e, relay_parent), - RuntimeApiError::NotSupported { .. } => - GetOnchainDisputesError::NotSupported(e, relay_parent), - }) - }) - .map(|v| v.into_iter().map(|e| ((e.0, e.1), e.2)).collect()) - } -} diff --git a/node/core/provisioner/src/tests.rs b/node/core/provisioner/src/tests.rs index d0ca425210ed..08eba8eabe80 100644 --- a/node/core/provisioner/src/tests.rs +++ b/node/core/provisioner/src/tests.rs @@ -195,7 +195,7 @@ mod select_availability_bitfields { } } -mod common { +pub(crate) mod common { use super::super::*; use futures::channel::mpsc; use polkadot_node_subsystem::messages::AllMessages; @@ -497,403 +497,3 @@ mod select_candidates { ) } } - -mod select_disputes { - use super::{super::*, common::test_harness}; - use futures::channel::mpsc; - use polkadot_node_subsystem::{ - messages::{AllMessages, DisputeCoordinatorMessage, RuntimeApiMessage, RuntimeApiRequest}, - RuntimeApiError, - }; - use polkadot_node_subsystem_test_helpers::TestSubsystemSender; - use polkadot_primitives::v2::DisputeState; - use std::sync::Arc; - use test_helpers; - - // Global Test Data - fn recent_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> { - let mut res = Vec::with_capacity(len); - for _ in 0..len { - res.push((0, CandidateHash(Hash::random()))); - } - - res - } - - // same as recent_disputes() but with SessionIndex set to 1 - fn active_disputes(len: usize) -> Vec<(SessionIndex, CandidateHash)> { - let mut res = Vec::with_capacity(len); - for _ in 0..len { - res.push((1, CandidateHash(Hash::random()))); - } - - res - } - - fn leaf() -> ActivatedLeaf { - ActivatedLeaf { - hash: Hash::repeat_byte(0xAA), - number: 0xAA, - status: LeafStatus::Fresh, - span: Arc::new(jaeger::Span::Disabled), - } - } - - async fn mock_overseer( - leaf: ActivatedLeaf, - mut receiver: mpsc::UnboundedReceiver, - onchain_disputes: Result, RuntimeApiError>, - recent_disputes: Vec<(SessionIndex, CandidateHash)>, - active_disputes: Vec<(SessionIndex, CandidateHash)>, - ) { - while let Some(from_job) = receiver.next().await { - match from_job { - AllMessages::RuntimeApi(RuntimeApiMessage::Request( - _, - RuntimeApiRequest::StagingDisputes(sender), - )) => { - let _ = sender.send(onchain_disputes.clone()); - }, - AllMessages::RuntimeApi(_) => panic!("Unexpected RuntimeApi request"), - AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::RecentDisputes( - sender, - )) => { - let _ = sender.send(recent_disputes.clone()); - }, - AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes( - sender, - )) => { - let _ = sender.send(active_disputes.clone()); - }, - AllMessages::DisputeCoordinator( - DisputeCoordinatorMessage::QueryCandidateVotes(disputes, sender), - ) => { - let mut res = Vec::new(); - let v = CandidateVotes { - candidate_receipt: test_helpers::dummy_candidate_receipt(leaf.hash.clone()), - valid: BTreeMap::new(), - invalid: BTreeMap::new(), - }; - for r in disputes.iter() { - res.push((r.0, r.1, v.clone())); - } - - let _ = sender.send(res); - }, - _ => panic!("Unexpected message: {:?}", from_job), - } - } - } - - #[test] - fn recent_disputes_are_withing_onchain_limit() { - const RECENT_DISPUTES_SIZE: usize = 10; - let metrics = metrics::Metrics::new_dummy(); - let onchain_disputes = Ok(Vec::new()); - let active_disputes = Vec::new(); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - - let recent_disputes_overseer = recent_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes_overseer, - active_disputes, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - - assert!(!disputes.is_empty()); - - let result = disputes.iter().zip(recent_disputes.iter()); - // We should get all recent disputes. - for (d, r) in result { - assert_eq!(d.session, r.0); - assert_eq!(d.candidate_hash, r.1); - } - }, - ) - } - - #[test] - fn recent_disputes_are_too_much_but_active_are_within_limit() { - const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME; - let metrics = metrics::Metrics::new_dummy(); - let onchain_disputes = Ok(Vec::new()); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); - - let active_disputes_overseer = active_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes, - active_disputes_overseer, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - - assert!(!disputes.is_empty()); - - let result = disputes.iter().zip(active_disputes.iter()); - // We should get all active disputes. - for (d, r) in result { - assert_eq!(d.session, r.0); - assert_eq!(d.candidate_hash, r.1); - } - }, - ) - } - - #[test] - fn recent_disputes_are_too_much_but_active_are_less_than_the_limit() { - // In this case all active disputes + a random set of recent disputes should be returned - const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10; - let metrics = metrics::Metrics::new_dummy(); - let onchain_disputes = Ok(Vec::new()); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); - - let active_disputes_overseer = active_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes, - active_disputes_overseer, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - - assert!(!disputes.is_empty()); - - // Recent disputes are generated with `SessionIndex` = 0 - let (res_recent, res_active): (Vec, Vec) = - disputes.into_iter().partition(|d| d.session == 0); - - // It should be good enough the count the number of active disputes and not compare them one by one. Checking the exact values is already covered by the previous tests. - assert_eq!(res_active.len(), active_disputes.len()); // We have got all active disputes - assert_eq!(res_active.len() + res_recent.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); - // And some recent ones. - }, - ) - } - - //These tests rely on staging Runtime functions so they are separated and compiled conditionally. - #[cfg(feature = "staging-client")] - mod staging_tests { - use super::*; - - fn dummy_dispute_state() -> DisputeState { - DisputeState { - validators_for: BitVec::new(), - validators_against: BitVec::new(), - start: 0, - concluded_at: None, - } - } - - #[test] - fn recent_disputes_are_too_much_active_fits_test_recent_prioritisation() { - // In this case recent disputes are above `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit and the active ones are below it. - // The expected behaviour is to send all active disputes and extend the set with recent ones. During the extension the disputes unknown for the Runtime are added with priority. - const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME - 10; - const ONCHAIN_DISPUTE_SIZE: usize = RECENT_DISPUTES_SIZE - 9; - let metrics = metrics::Metrics::new_dummy(); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); - let onchain_disputes: Result< - Vec<(SessionIndex, CandidateHash, DisputeState)>, - RuntimeApiError, - > = Ok(Vec::from(&recent_disputes[0..ONCHAIN_DISPUTE_SIZE]) - .iter() - .map(|(session_index, candidate_hash)| { - (*session_index, candidate_hash.clone(), dummy_dispute_state()) - }) - .collect()); - let active_disputes_overseer = active_disputes.clone(); - let recent_disputes_overseer = recent_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes_overseer, - active_disputes_overseer, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - - assert!(!disputes.is_empty()); - - // Recent disputes are generated with `SessionIndex` = 0 - let (res_recent, res_active): ( - Vec, - Vec, - ) = disputes.into_iter().partition(|d| d.session == 0); - - // It should be good enough the count the number of the disputes and not compare them one by one as this was already covered in other tests. - assert_eq!(res_active.len(), active_disputes.len()); // We've got all active disputes. - assert_eq!( - res_recent.len(), - MAX_DISPUTES_FORWARDED_TO_RUNTIME - active_disputes.len() - ); // And some recent ones. - - // Check if the recent disputes were unknown for the Runtime. - let expected_recent_disputes = - Vec::from(&recent_disputes[ONCHAIN_DISPUTE_SIZE..]); - let res_recent_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( - res_recent.iter().map(|d| (d.session, d.candidate_hash)), - ); - - // Explicitly check that all unseen disputes are sent to the Runtime. - for d in &expected_recent_disputes { - assert_eq!(res_recent_set.contains(d), true); - } - }, - ) - } - - #[test] - fn active_disputes_are_too_much_test_active_prioritisation() { - // In this case the active disputes are above the `MAX_DISPUTES_FORWARDED_TO_RUNTIME` limit so the unseen ones should be prioritised. - const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ONCHAIN_DISPUTE_SIZE: usize = ACTIVE_DISPUTES_SIZE - 9; - - let metrics = metrics::Metrics::new_dummy(); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); - let onchain_disputes: Result< - Vec<(SessionIndex, CandidateHash, DisputeState)>, - RuntimeApiError, - > = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]) - .iter() - .map(|(session_index, candidate_hash)| { - (*session_index, candidate_hash.clone(), dummy_dispute_state()) - }) - .collect()); - let active_disputes_overseer = active_disputes.clone(); - let recent_disputes_overseer = recent_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes_overseer, - active_disputes_overseer, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - - assert!(!disputes.is_empty()); - - // Recent disputes are generated with `SessionIndex` = 0 - let (res_recent, res_active): ( - Vec, - Vec, - ) = disputes.into_iter().partition(|d| d.session == 0); - - // It should be good enough the count the number of the disputes and not compare them one by one - assert_eq!(res_recent.len(), 0); // We expect no recent disputes - assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); - - let expected_active_disputes = - Vec::from(&active_disputes[ONCHAIN_DISPUTE_SIZE..]); - let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( - res_active.iter().map(|d| (d.session, d.candidate_hash)), - ); - - // Explicitly check that the unseen disputes are delivered to the Runtime. - for d in &expected_active_disputes { - assert_eq!(res_active_set.contains(d), true); - } - }, - ) - } - - #[test] - fn active_disputes_are_too_much_and_are_all_unseen() { - // In this case there are a lot of active disputes unseen by the Runtime. The focus of the test is to verify that in such cases known disputes are NOT sent to the Runtime. - const RECENT_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 10; - const ACTIVE_DISPUTES_SIZE: usize = MAX_DISPUTES_FORWARDED_TO_RUNTIME + 5; - const ONCHAIN_DISPUTE_SIZE: usize = 5; - - let metrics = metrics::Metrics::new_dummy(); - let recent_disputes = recent_disputes(RECENT_DISPUTES_SIZE); - let active_disputes = active_disputes(ACTIVE_DISPUTES_SIZE); - let onchain_disputes: Result< - Vec<(SessionIndex, CandidateHash, DisputeState)>, - RuntimeApiError, - > = Ok(Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]) - .iter() - .map(|(session_index, candidate_hash)| { - (*session_index, candidate_hash.clone(), dummy_dispute_state()) - }) - .collect()); - let active_disputes_overseer = active_disputes.clone(); - let recent_disputes_overseer = recent_disputes.clone(); - test_harness( - |r| { - mock_overseer( - leaf(), - r, - onchain_disputes, - recent_disputes_overseer, - active_disputes_overseer, - ) - }, - |mut tx: TestSubsystemSender| async move { - let lf = leaf(); - let disputes = select_disputes(&mut tx, &metrics, &lf).await.unwrap(); - assert!(!disputes.is_empty()); - - // Recent disputes are generated with `SessionIndex` = 0 - let (res_recent, res_active): ( - Vec, - Vec, - ) = disputes.into_iter().partition(|d| d.session == 0); - - // It should be good enough the count the number of the disputes and not compare them one by one - assert_eq!(res_recent.len(), 0); - assert_eq!(res_active.len(), MAX_DISPUTES_FORWARDED_TO_RUNTIME); - - // For sure we don't want to see any of this disputes in the result - let unexpected_active_disputes = - Vec::from(&active_disputes[0..ONCHAIN_DISPUTE_SIZE]); - let res_active_set: HashSet<(SessionIndex, CandidateHash)> = HashSet::from_iter( - res_active.iter().map(|d| (d.session, d.candidate_hash)), - ); - - // Verify that the result DOESN'T contain known disputes (because there is an excessive number of unknown onces). - for d in &unexpected_active_disputes { - assert_eq!(res_active_set.contains(d), false); - } - }, - ) - } - } -} diff --git a/node/core/runtime-api/src/cache.rs b/node/core/runtime-api/src/cache.rs index 6f5fdc5d4657..0fe9b74dc86d 100644 --- a/node/core/runtime-api/src/cache.rs +++ b/node/core/runtime-api/src/cache.rs @@ -463,5 +463,5 @@ pub(crate) enum RequestResult { SubmitPvfCheckStatement(Hash, PvfCheckStatement, ValidatorSignature, ()), ValidationCodeHash(Hash, ParaId, OccupiedCoreAssumption, Option), Version(Hash, u32), - StagingDisputes(Hash, Vec<(SessionIndex, CandidateHash, DisputeState)>), + Disputes(Hash, Vec<(SessionIndex, CandidateHash, DisputeState)>), } diff --git a/node/core/runtime-api/src/lib.rs b/node/core/runtime-api/src/lib.rs index a815b76a8d7c..36355b5759e6 100644 --- a/node/core/runtime-api/src/lib.rs +++ b/node/core/runtime-api/src/lib.rs @@ -153,7 +153,7 @@ where .cache_validation_code_hash((relay_parent, para_id, assumption), hash), Version(relay_parent, version) => self.requests_cache.cache_version(relay_parent, version), - StagingDisputes(relay_parent, disputes) => + Disputes(relay_parent, disputes) => self.requests_cache.cache_disputes(relay_parent, disputes), } } @@ -256,8 +256,8 @@ where Request::ValidationCodeHash(para, assumption, sender) => query!(validation_code_hash(para, assumption), sender) .map(|sender| Request::ValidationCodeHash(para, assumption, sender)), - Request::StagingDisputes(sender) => - query!(disputes(), sender).map(|sender| Request::StagingDisputes(sender)), + Request::Disputes(sender) => + query!(disputes(), sender).map(|sender| Request::Disputes(sender)), } } @@ -351,8 +351,9 @@ where let _timer = metrics.time_make_runtime_api_request(); macro_rules! query { - ($req_variant:ident, $api_name:ident ($($param:expr),*), ver = $version:literal, $sender:expr) => {{ + ($req_variant:ident, $api_name:ident ($($param:expr),*), ver = $version:expr, $sender:expr) => {{ let sender = $sender; + let version: u32 = $version; // enforce type for the version expression let runtime_version = client.api_version_parachain_host(relay_parent).await .unwrap_or_else(|e| { gum::warn!( @@ -370,7 +371,7 @@ where 0 }); - let res = if runtime_version >= $version { + let res = if runtime_version >= version { client.$api_name(relay_parent $(, $param.clone() )*).await .map_err(|e| RuntimeApiError::Execution { runtime_api_name: stringify!($api_name), @@ -499,7 +500,7 @@ where }, Request::ValidationCodeHash(para, assumption, sender) => query!(ValidationCodeHash, validation_code_hash(para, assumption), ver = 2, sender), - Request::StagingDisputes(sender) => - query!(StagingDisputes, staging_get_disputes(), ver = 2, sender), + Request::Disputes(sender) => + query!(Disputes, disputes(), ver = Request::DISPUTES_RUNTIME_REQUIREMENT, sender), } } diff --git a/node/core/runtime-api/src/tests.rs b/node/core/runtime-api/src/tests.rs index eccfbeaa17c4..2fab84179433 100644 --- a/node/core/runtime-api/src/tests.rs +++ b/node/core/runtime-api/src/tests.rs @@ -23,11 +23,11 @@ use polkadot_node_subsystem_test_helpers::make_subsystem_context; use polkadot_primitives::{ runtime_api::ParachainHost, v2::{ - AuthorityDiscoveryId, Block, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, OccupiedCoreAssumption, - PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, SessionIndex, SessionInfo, - ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, ValidatorSignature, + AuthorityDiscoveryId, Block, CandidateEvent, CommittedCandidateReceipt, CoreState, + GroupRotationInfo, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + OccupiedCoreAssumption, PersistedValidationData, PvfCheckStatement, ScrapedOnChainVotes, + SessionIndex, SessionInfo, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, + ValidatorSignature, }, }; use sp_api::ProvideRuntimeApi; @@ -196,10 +196,6 @@ sp_api::mock_impl_runtime_apis! { ) -> Option { self.validation_code_hash.get(¶).map(|c| c.clone()) } - - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { - unimplemented!() - } } impl BabeApi for MockRuntimeApi { diff --git a/node/network/approval-distribution/Cargo.toml b/node/network/approval-distribution/Cargo.toml index ac34d57d586b..fa0e4fff2c91 100644 --- a/node/network/approval-distribution/Cargo.toml +++ b/node/network/approval-distribution/Cargo.toml @@ -21,6 +21,7 @@ sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", polkadot-node-subsystem-util = { path = "../../subsystem-util" } polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" } +polkadot-primitives-test-helpers = { path = "../../../primitives/test-helpers" } assert_matches = "1.4.0" schnorrkel = { version = "0.9.1", default-features = false } diff --git a/node/network/approval-distribution/src/tests.rs b/node/network/approval-distribution/src/tests.rs index b3d44bfe8c1e..a96a89bb58eb 100644 --- a/node/network/approval-distribution/src/tests.rs +++ b/node/network/approval-distribution/src/tests.rs @@ -25,6 +25,7 @@ use polkadot_node_subsystem::messages::{network_bridge_event, AllMessages, Appro use polkadot_node_subsystem_test_helpers as test_helpers; use polkadot_node_subsystem_util::TimeoutExt as _; use polkadot_primitives::v2::{AuthorityDiscoveryId, BlakeTwo256, HashT}; +use polkadot_primitives_test_helpers::dummy_signature; use rand::SeedableRng; use sp_authority_discovery::AuthorityPair as AuthorityDiscoveryPair; use sp_core::crypto::Pair as PairT; @@ -32,10 +33,6 @@ use std::time::Duration; type VirtualOverseer = test_helpers::TestSubsystemContextHandle; -fn dummy_signature() -> polkadot_primitives::v2::ValidatorSignature { - sp_core::crypto::UncheckedFrom::unchecked_from([1u8; 64]) -} - fn test_harness>( mut state: State, test_fn: impl FnOnce(VirtualOverseer) -> T, diff --git a/node/primitives/src/disputes/mod.rs b/node/primitives/src/disputes/mod.rs index ec7bb6abc3b7..ee047c7bcc22 100644 --- a/node/primitives/src/disputes/mod.rs +++ b/node/primitives/src/disputes/mod.rs @@ -30,6 +30,8 @@ use polkadot_primitives::v2::{ /// `DisputeMessage` and related types. mod message; pub use message::{DisputeMessage, Error as DisputeMessageCheckError, UncheckedDisputeMessage}; +mod status; +pub use status::{dispute_is_inactive, DisputeStatus, Timestamp, ACTIVE_DURATION_SECS}; /// A checked dispute statement from an associated validator. #[derive(Debug, Clone)] diff --git a/node/primitives/src/disputes/status.rs b/node/primitives/src/disputes/status.rs new file mode 100644 index 000000000000..44aed9b78e20 --- /dev/null +++ b/node/primitives/src/disputes/status.rs @@ -0,0 +1,125 @@ +// Copyright 2017-2022 Parity Technologies (UK) Ltd. +// This file is part of Polkadot. + +// Polkadot is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// Polkadot is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with Polkadot. If not, see . + +use parity_scale_codec::{Decode, Encode}; + +/// Timestamp based on the 1 Jan 1970 UNIX base, which is persistent across node restarts and OS reboots. +pub type Timestamp = u64; + +/// The status of dispute. This is a state machine which can be altered by the +/// helper methods. +#[derive(Debug, Clone, Copy, Encode, Decode, PartialEq)] +pub enum DisputeStatus { + /// The dispute is active and unconcluded. + #[codec(index = 0)] + Active, + /// The dispute has been concluded in favor of the candidate + /// since the given timestamp. + #[codec(index = 1)] + ConcludedFor(Timestamp), + /// The dispute has been concluded against the candidate + /// since the given timestamp. + /// + /// This takes precedence over `ConcludedFor` in the case that + /// both are true, which is impossible unless a large amount of + /// validators are participating on both sides. + #[codec(index = 2)] + ConcludedAgainst(Timestamp), + /// Dispute has been confirmed (more than `byzantine_threshold` have already participated/ or + /// we have seen the candidate included already/participated successfully ourselves). + #[codec(index = 3)] + Confirmed, +} + +impl DisputeStatus { + /// Initialize the status to the active state. + pub fn active() -> DisputeStatus { + DisputeStatus::Active + } + + /// Move status to confirmed status, if not yet concluded/confirmed already. + pub fn confirm(self) -> DisputeStatus { + match self { + DisputeStatus::Active => DisputeStatus::Confirmed, + DisputeStatus::Confirmed => DisputeStatus::Confirmed, + DisputeStatus::ConcludedFor(_) | DisputeStatus::ConcludedAgainst(_) => self, + } + } + + /// Check whether the dispute is not a spam dispute. + pub fn is_confirmed_concluded(&self) -> bool { + match self { + &DisputeStatus::Confirmed | + &DisputeStatus::ConcludedFor(_) | + DisputeStatus::ConcludedAgainst(_) => true, + &DisputeStatus::Active => false, + } + } + + /// Transition the status to a new status after observing the dispute has concluded for the candidate. + /// This may be a no-op if the status was already concluded. + pub fn concluded_for(self, now: Timestamp) -> DisputeStatus { + match self { + DisputeStatus::Active | DisputeStatus::Confirmed => DisputeStatus::ConcludedFor(now), + DisputeStatus::ConcludedFor(at) => DisputeStatus::ConcludedFor(std::cmp::min(at, now)), + against => against, + } + } + + /// Transition the status to a new status after observing the dispute has concluded against the candidate. + /// This may be a no-op if the status was already concluded. + pub fn concluded_against(self, now: Timestamp) -> DisputeStatus { + match self { + DisputeStatus::Active | DisputeStatus::Confirmed => + DisputeStatus::ConcludedAgainst(now), + DisputeStatus::ConcludedFor(at) => + DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), + DisputeStatus::ConcludedAgainst(at) => + DisputeStatus::ConcludedAgainst(std::cmp::min(at, now)), + } + } + + /// Whether the disputed candidate is possibly invalid. + pub fn is_possibly_invalid(&self) -> bool { + match self { + DisputeStatus::Active | + DisputeStatus::Confirmed | + DisputeStatus::ConcludedAgainst(_) => true, + DisputeStatus::ConcludedFor(_) => false, + } + } + + /// Yields the timestamp this dispute concluded at, if any. + pub fn concluded_at(&self) -> Option { + match self { + DisputeStatus::Active | DisputeStatus::Confirmed => None, + DisputeStatus::ConcludedFor(at) | DisputeStatus::ConcludedAgainst(at) => Some(*at), + } + } +} + +/// The choice here is fairly arbitrary. But any dispute that concluded more than a few minutes ago +/// is not worth considering anymore. Changing this value has little to no bearing on consensus, +/// and really only affects the work that the node might do on startup during periods of many +/// disputes. +pub const ACTIVE_DURATION_SECS: Timestamp = 180; + +/// Returns true if the dispute has concluded for longer than ACTIVE_DURATION_SECS +pub fn dispute_is_inactive(status: &DisputeStatus, now: &Timestamp) -> bool { + let at = status.concluded_at(); + + at.is_some() && at.unwrap() + ACTIVE_DURATION_SECS < *now +} diff --git a/node/primitives/src/lib.rs b/node/primitives/src/lib.rs index cbc2a132fc9a..4551ce9855e3 100644 --- a/node/primitives/src/lib.rs +++ b/node/primitives/src/lib.rs @@ -46,8 +46,9 @@ pub mod approval; /// Disputes related types. pub mod disputes; pub use disputes::{ - CandidateVotes, DisputeMessage, DisputeMessageCheckError, InvalidDisputeVote, - SignedDisputeStatement, UncheckedDisputeMessage, ValidDisputeVote, + dispute_is_inactive, CandidateVotes, DisputeMessage, DisputeMessageCheckError, DisputeStatus, + InvalidDisputeVote, SignedDisputeStatement, Timestamp, UncheckedDisputeMessage, + ValidDisputeVote, ACTIVE_DURATION_SECS, }; // For a 16-ary Merkle Prefix Trie, we can expect at most 16 32-byte hashes per node diff --git a/node/service/Cargo.toml b/node/service/Cargo.toml index 80c424e76e04..a9c9484b6eba 100644 --- a/node/service/Cargo.toml +++ b/node/service/Cargo.toml @@ -204,5 +204,3 @@ runtime-metrics = [ "polkadot-runtime?/runtime-metrics", "polkadot-runtime-parachains/runtime-metrics" ] - -staging-client = ["polkadot-node-core-provisioner/staging-client"] diff --git a/node/subsystem-types/src/messages.rs b/node/subsystem-types/src/messages.rs index 10a5201cc524..c37f773b3839 100644 --- a/node/subsystem-types/src/messages.rs +++ b/node/subsystem-types/src/messages.rs @@ -35,8 +35,8 @@ use polkadot_node_network_protocol::{ use polkadot_node_primitives::{ approval::{BlockApprovalMeta, IndirectAssignmentCert, IndirectSignedApprovalVote}, AvailableData, BabeEpoch, BlockWeight, CandidateVotes, CollationGenerationConfig, - CollationSecondedSignal, DisputeMessage, ErasureChunk, PoV, SignedDisputeStatement, - SignedFullStatement, ValidationResult, + CollationSecondedSignal, DisputeMessage, DisputeStatus, ErasureChunk, PoV, + SignedDisputeStatement, SignedFullStatement, ValidationResult, }; use polkadot_primitives::v2::{ AuthorityDiscoveryId, BackedCandidate, BlockNumber, CandidateEvent, CandidateHash, @@ -271,7 +271,7 @@ pub enum DisputeCoordinatorMessage { /// Fetch a list of all recent disputes the co-ordinator is aware of. /// These are disputes which have occurred any time in recent sessions, /// and which may have already concluded. - RecentDisputes(oneshot::Sender>), + RecentDisputes(oneshot::Sender>), /// Fetch a list of all active disputes that the coordinator is aware of. /// These disputes are either not yet concluded or recently concluded. ActiveDisputes(oneshot::Sender>), @@ -699,10 +699,15 @@ pub enum RuntimeApiRequest { OccupiedCoreAssumption, RuntimeApiSender>, ), - /// Returns all on-chain disputes at given block number. - StagingDisputes( - RuntimeApiSender)>>, - ), + /// Returns all on-chain disputes at given block number. Available in v3. + Disputes(RuntimeApiSender)>>), +} + +impl RuntimeApiRequest { + /// Runtime version requirements for each message + + /// `Disputes` + pub const DISPUTES_RUNTIME_REQUIREMENT: u32 = 3; } /// A message to the Runtime API subsystem. diff --git a/node/subsystem-types/src/runtime_client.rs b/node/subsystem-types/src/runtime_client.rs index 2aa9e2bffb82..259c94fd4e51 100644 --- a/node/subsystem-types/src/runtime_client.rs +++ b/node/subsystem-types/src/runtime_client.rs @@ -186,7 +186,7 @@ pub trait RuntimeApiSubsystemClient { /// Returns all onchain disputes. /// This is a staging method! Do not use on production runtimes! - async fn staging_get_disputes( + async fn disputes( &self, at: Hash, ) -> Result)>, ApiError>; @@ -375,10 +375,10 @@ where self.runtime_api().session_info_before_version_2(&BlockId::Hash(at), index) } - async fn staging_get_disputes( + async fn disputes( &self, at: Hash, ) -> Result)>, ApiError> { - self.runtime_api().staging_get_disputes(&BlockId::Hash(at)) + self.runtime_api().disputes(&BlockId::Hash(at)) } } diff --git a/primitives/src/lib.rs b/primitives/src/lib.rs index 121f7cb40d23..168b5795b040 100644 --- a/primitives/src/lib.rs +++ b/primitives/src/lib.rs @@ -22,9 +22,9 @@ // `v2` is currently the latest stable version of the runtime API. pub mod v2; -// The 'staging' version is special - while other versions are set in stone, -// the staging version is malleable. Once it's released, it gets the next -// version number. +// The 'staging' version is special - it contains primitives which are +// still in development. Once they are considered stable, they will be +// moved to a new versioned module. pub mod vstaging; // `runtime_api` contains the actual API implementation. It contains stable and diff --git a/primitives/src/runtime_api.rs b/primitives/src/runtime_api.rs index 84d2cf0ec4ca..d0d0b7220bb9 100644 --- a/primitives/src/runtime_api.rs +++ b/primitives/src/runtime_api.rs @@ -18,31 +18,97 @@ //! of the Runtime API exposed from the Runtime to the Host. //! //! The functions in trait ParachainHost` can be part of the stable API -//! (which is versioned) or they can be staging (aka unstable functions). +//! (which is versioned) or they can be staging (aka unstable/testing +//! functions). //! -//! All stable API functions should use primitives from the latest version. -//! In the time of writing of this document - this is v2. So for example: -//! ```ignore -//! fn validators() -> Vec; -//! ``` -//! indicates a function from the stable v2 API. +//! The separation outlined above is achieved with the versioned api feature +//! of `decl_runtime_apis!` and `impl_runtime_apis!`. Before moving on let's +//! see a quick example about how api versioning works. //! -//! On the other hand a staging function's name should be prefixed with -//! `staging_` like this: -//! ```ignore -//! fn staging_get_disputes() -> Vec<(vstaging::SessionIndex, vstaging::CandidateHash, vstaging::DisputeState)>; +//! # Runtime api versioning crash course +//! +//! The versioning is achieved with the `api_version` attribute. It can be +//! placed on: +//! * trait declaration - represents the base version of the api. +//! * method declaration (inside a trait declaration) - represents a versioned +//! method, which is not available in the base version. +//! * trait implementation - represents which version of the api is being +//! implemented. +//! +//! Let's see a quick example: +//! +//! ```rust(ignore) +//! sp_api::decl_runtime_apis! { +//! #[api_version(2)] +//! pub trait MyApi { +//! fn fn1(); +//! fn fn2(); +//! #[api_version(3)] +//! fn fn3(); +//! #[api_version(4)] +//! fn fn4(); +//! } +//! } +//! +//! struct Runtime {} +//! +//! sp_api::impl_runtime_apis! { +//! #[api_version(3)] +//! impl self::MyApi for Runtime { +//! fn fn1() {} +//! fn fn2() {} +//! fn fn3() {} +//! } +//! } //! ``` +//! A new api named `MyApi` is declared with `decl_runtime_apis!`. The trait declaration +//! has got an `api_version` attribute which represents its base version - 2 in this case. +//! +//! The api has got three methods - `fn1`, `fn2`, `fn3` and `fn4`. `fn3` and `fn4` has got +//! an `api_version` attribute which makes them versioned methods. These methods do not exist +//! in the base version of the api. Behind the scenes the declaration above creates three +//! runtime apis: +//! * MyApiV2 with `fn1` and `fn2` +//! * MyApiV3 with `fn1`, `fn2` and `fn3`. +//! * MyApiV4 with `fn1`, `fn2`, `fn3` and `fn4`. //! -//! How a staging function becomes stable? +//! Please note that v4 contains all methods from v3, v3 all methods from v2 and so on. //! -//! Once a staging function is ready to be versioned the `renamed` macro -//! should be used to rename it and version it. For the example above: +//! Back to our example. At the end runtime api is implemented for `struct Runtime` with +//! `impl_runtime_apis` macro. `api_version` attribute is attached to the impl block which +//! means that a version different from the base one is being implemented - in our case this +//! is v3. +//! +//! This version of the api contains three methods so the `impl` block has got definitions +//! for them. Note that `fn4` is not implemented as it is not part of this version of the api. +//! `impl_runtime_apis` generates a default implementation for it calling `unimplemented!()`. +//! +//! Hopefully this should be all you need to know in order to use versioned methods in the node. +//! For more details about how the api versioning works refer to `spi_api` +//! documentation [here](https://docs.substrate.io/rustdocs/latest/sp_api/macro.decl_runtime_apis.html). +//! +//! # How versioned methods are used for `ParachainHost` +//! +//! Let's introduce two types of `ParachainHost` api implementation: +//! * stable - used on stable production networks like Polkadot and Kusama. There is only one +//! stable api at a single point in time. +//! * staging - used on test networks like Westend or Rococo. Depending on the development needs +//! there can be zero, one or multiple staging apis. +//! +//! The stable version of `ParachainHost` is indicated by the base version of the api. Any staging +//! method must use `api_version` attribute so that it is assigned to a specific version of a +//! staging api. This way in a single declaration one can see what's the stable version of +//! `ParachainHost` and what staging versions/functions are available. +//! +//! All stable api functions should use primitives from the latest version. +//! In the time of writing of this document - this is v2. So for example: //! ```ignore -//! #[renamed("staging_get_session_disputes", 3)] -//! fn get_session_disputes() -> Vec<(v3::SessionIndex, v3::CandidateHash, v3::DisputeState)>; +//! fn validators() -> Vec; //! ``` -//! For more details about how the API versioning works refer to `spi_api` -//! documentation [here](https://docs.substrate.io/rustdocs/latest/sp_api/macro.decl_runtime_apis.html). +//! indicates a function from the stable v2 API. +//! +//! All staging api functions should use primitives from vstaging. They should be clearly separated +//! from the stable primitives. use crate::v2; use parity_scale_codec::{Decode, Encode}; @@ -153,7 +219,7 @@ sp_api::decl_runtime_apis! { /***** STAGING *****/ /// Returns all onchain disputes. - /// This is a staging method! Do not use on production runtimes! - fn staging_get_disputes() -> Vec<(v2::SessionIndex, v2::CandidateHash, v2::DisputeState)>; + #[api_version(3)] + fn disputes() -> Vec<(v2::SessionIndex, v2::CandidateHash, v2::DisputeState)>; } } diff --git a/primitives/src/vstaging/mod.rs b/primitives/src/vstaging/mod.rs index 2f29ffbe60b7..64671bd48a60 100644 --- a/primitives/src/vstaging/mod.rs +++ b/primitives/src/vstaging/mod.rs @@ -16,4 +16,4 @@ //! Staging Primitives. -// Put any primitives used by staging API functions here +// Put any primitives used by staging APIs functions here diff --git a/primitives/test-helpers/Cargo.toml b/primitives/test-helpers/Cargo.toml index ed086c87cfc9..bbd6f45a45bd 100644 --- a/primitives/test-helpers/Cargo.toml +++ b/primitives/test-helpers/Cargo.toml @@ -8,5 +8,6 @@ edition = "2021" sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master" } sp-application-crypto = { package = "sp-application-crypto", git = "https://github.com/paritytech/substrate", branch = "master", default-features = false } sp-runtime = { git = "https://github.com/paritytech/substrate", branch = "master" } +sp-core = { git = "https://github.com/paritytech/substrate", branch = "master", features = ["std"] } polkadot-primitives = { path = "../" } rand = "0.8.5" diff --git a/primitives/test-helpers/src/lib.rs b/primitives/test-helpers/src/lib.rs index 02ba009b13cc..8873d69cdb2f 100644 --- a/primitives/test-helpers/src/lib.rs +++ b/primitives/test-helpers/src/lib.rs @@ -255,3 +255,7 @@ impl rand::RngCore for AlwaysZeroRng { Ok(()) } } + +pub fn dummy_signature() -> polkadot_primitives::v2::ValidatorSignature { + sp_core::crypto::UncheckedFrom::unchecked_from([1u8; 64]) +} diff --git a/runtime/kusama/src/lib.rs b/runtime/kusama/src/lib.rs index b4e6a3427217..d889408504f9 100644 --- a/runtime/kusama/src/lib.rs +++ b/runtime/kusama/src/lib.rs @@ -22,11 +22,10 @@ use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use primitives::v2::{ - AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, OccupiedCoreAssumption, - PersistedValidationData, ScrapedOnChainVotes, SessionInfo, Signature, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, + AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CommittedCandidateReceipt, + CoreState, GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + Moment, Nonce, OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, + SessionInfo, Signature, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, }; use runtime_common::{ auctions, claims, crowdloan, impl_runtime_weights, impls::DealWithFees, paras_registrar, @@ -1682,10 +1681,6 @@ sp_api::impl_runtime_apis! { { parachains_runtime_api_impl::validation_code_hash::(para_id, assumption) } - - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { - unimplemented!() - } } impl beefy_primitives::BeefyApi for Runtime { diff --git a/runtime/parachains/Cargo.toml b/runtime/parachains/Cargo.toml index 1feeb3540b1c..40f05ddda6a8 100644 --- a/runtime/parachains/Cargo.toml +++ b/runtime/parachains/Cargo.toml @@ -109,4 +109,3 @@ try-runtime = [ "pallet-vesting/try-runtime", ] runtime-metrics = ["sp-tracing/with-tracing", "polkadot-runtime-metrics/runtime-metrics"] -vstaging = [] diff --git a/runtime/parachains/src/runtime_api_impl/mod.rs b/runtime/parachains/src/runtime_api_impl/mod.rs index 603b6c4cb385..c045b4747868 100644 --- a/runtime/parachains/src/runtime_api_impl/mod.rs +++ b/runtime/parachains/src/runtime_api_impl/mod.rs @@ -17,9 +17,14 @@ //! Runtime API implementations for Parachains. //! //! These are exposed as different modules using different sets of primitives. -//! At the moment there is only a v2 module and it is not completely clear how migration -//! to a v2 would be done. - +//! At the moment there is a v2 module for the current stable api and +//! vstaging module for all staging methods. +//! When new version of the stable api is released it will be based on v2 and +//! will contain methods from vstaging. +//! The promotion consists of the following steps: +//! 1. Bump the version of the stable module (e.g. v2 becomes v3) +//! 2. Move methods from vstaging to v3. The new stable version should include +//! all methods from vstaging tagged with the new version number (e.g. all +//! v3 methods). pub mod v2; -#[cfg(feature = "vstaging")] pub mod vstaging; diff --git a/runtime/parachains/src/runtime_api_impl/vstaging.rs b/runtime/parachains/src/runtime_api_impl/vstaging.rs index 8715cdc53121..7ae235c8133a 100644 --- a/runtime/parachains/src/runtime_api_impl/vstaging.rs +++ b/runtime/parachains/src/runtime_api_impl/vstaging.rs @@ -14,7 +14,7 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . -// Put implementations of functions from staging API here. +//! Put implementations of functions from staging APIs here. use crate::disputes; use primitives::v2::{CandidateHash, DisputeState, SessionIndex}; diff --git a/runtime/polkadot/src/lib.rs b/runtime/polkadot/src/lib.rs index a97472d47a36..11330b51df21 100644 --- a/runtime/polkadot/src/lib.rs +++ b/runtime/polkadot/src/lib.rs @@ -51,11 +51,10 @@ use pallet_session::historical as session_historical; use pallet_transaction_payment::{FeeDetails, RuntimeDispatchInfo}; use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use primitives::v2::{ - AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash, Id as ParaId, - InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, OccupiedCoreAssumption, - PersistedValidationData, ScrapedOnChainVotes, SessionInfo, Signature, ValidationCode, - ValidationCodeHash, ValidatorId, ValidatorIndex, + AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CommittedCandidateReceipt, + CoreState, GroupRotationInfo, Hash, Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, + Moment, Nonce, OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, + SessionInfo, Signature, ValidationCode, ValidationCodeHash, ValidatorId, ValidatorIndex, }; use sp_core::OpaqueMetadata; use sp_mmr_primitives as mmr; @@ -1770,10 +1769,6 @@ sp_api::impl_runtime_apis! { { parachains_runtime_api_impl::validation_code_hash::(para_id, assumption) } - - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { - unimplemented!() - } } impl beefy_primitives::BeefyApi for Runtime { diff --git a/runtime/rococo/src/lib.rs b/runtime/rococo/src/lib.rs index 4ac03b638cd1..992d0ce05688 100644 --- a/runtime/rococo/src/lib.rs +++ b/runtime/rococo/src/lib.rs @@ -1552,6 +1552,7 @@ sp_api::impl_runtime_apis! { } } + #[api_version(3)] impl primitives::runtime_api::ParachainHost for Runtime { fn validators() -> Vec { parachains_runtime_api_impl::validators::() @@ -1650,8 +1651,8 @@ sp_api::impl_runtime_apis! { parachains_runtime_api_impl::validation_code_hash::(para_id, assumption) } - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { - unimplemented!() + fn disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { + runtime_parachains::runtime_api_impl::vstaging::get_session_disputes::() } } diff --git a/runtime/test-runtime/Cargo.toml b/runtime/test-runtime/Cargo.toml index 68c9b7116a45..c4f10e7d6db4 100644 --- a/runtime/test-runtime/Cargo.toml +++ b/runtime/test-runtime/Cargo.toml @@ -58,7 +58,7 @@ runtime-common = { package = "polkadot-runtime-common", path = "../common", defa primitives = { package = "polkadot-primitives", path = "../../primitives", default-features = false } pallet-xcm = { path = "../../xcm/pallet-xcm", default-features = false } polkadot-parachain = { path = "../../parachain", default-features = false } -polkadot-runtime-parachains = { path = "../parachains", default-features = false, features = ["vstaging"]} +polkadot-runtime-parachains = { path = "../parachains", default-features = false } xcm-builder = { path = "../../xcm/xcm-builder", default-features = false } xcm-executor = { path = "../../xcm/xcm-executor", default-features = false } xcm = { path = "../../xcm", default-features = false } diff --git a/runtime/test-runtime/src/lib.rs b/runtime/test-runtime/src/lib.rs index 7bacde8b57b5..00057014a9f3 100644 --- a/runtime/test-runtime/src/lib.rs +++ b/runtime/test-runtime/src/lib.rs @@ -45,12 +45,11 @@ use pallet_session::historical as session_historical; use pallet_transaction_payment::{FeeDetails, RuntimeDispatchInfo}; use polkadot_runtime_parachains::reward_points::RewardValidatorsWithEraPoints; use primitives::v2::{ - AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CandidateHash, - CommittedCandidateReceipt, CoreState, DisputeState, GroupRotationInfo, Hash as HashT, - Id as ParaId, InboundDownwardMessage, InboundHrmpMessage, Moment, Nonce, - OccupiedCoreAssumption, PersistedValidationData, ScrapedOnChainVotes, - SessionInfo as SessionInfoData, Signature, ValidationCode, ValidationCodeHash, ValidatorId, - ValidatorIndex, + AccountId, AccountIndex, Balance, BlockNumber, CandidateEvent, CommittedCandidateReceipt, + CoreState, GroupRotationInfo, Hash as HashT, Id as ParaId, InboundDownwardMessage, + InboundHrmpMessage, Moment, Nonce, OccupiedCoreAssumption, PersistedValidationData, + ScrapedOnChainVotes, SessionInfo as SessionInfoData, Signature, ValidationCode, + ValidationCodeHash, ValidatorId, ValidatorIndex, }; use runtime_common::{ claims, impl_runtime_weights, paras_sudo_wrapper, BlockHashCount, BlockLength, @@ -906,10 +905,6 @@ sp_api::impl_runtime_apis! { { runtime_impl::validation_code_hash::(para_id, assumption) } - - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { - polkadot_runtime_parachains::runtime_api_impl::vstaging::get_session_disputes::() - } } impl beefy_primitives::BeefyApi for Runtime { diff --git a/runtime/westend/Cargo.toml b/runtime/westend/Cargo.toml index 892e812ae8d9..92ec42bfc98d 100644 --- a/runtime/westend/Cargo.toml +++ b/runtime/westend/Cargo.toml @@ -87,7 +87,7 @@ hex-literal = { version = "0.3.4", optional = true } runtime-common = { package = "polkadot-runtime-common", path = "../common", default-features = false } primitives = { package = "polkadot-primitives", path = "../../primitives", default-features = false } polkadot-parachain = { path = "../../parachain", default-features = false } -runtime-parachains = { package = "polkadot-runtime-parachains", path = "../parachains", default-features = false, features = ["vstaging"] } +runtime-parachains = { package = "polkadot-runtime-parachains", path = "../parachains", default-features = false } xcm = { package = "xcm", path = "../../xcm", default-features = false } xcm-executor = { package = "xcm-executor", path = "../../xcm/xcm-executor", default-features = false } diff --git a/runtime/westend/src/lib.rs b/runtime/westend/src/lib.rs index 1f8fc652209b..a5d79e89bf9f 100644 --- a/runtime/westend/src/lib.rs +++ b/runtime/westend/src/lib.rs @@ -1294,6 +1294,7 @@ sp_api::impl_runtime_apis! { } } + #[api_version(3)] impl primitives::runtime_api::ParachainHost for Runtime { fn validators() -> Vec { parachains_runtime_api_impl::validators::() @@ -1392,7 +1393,7 @@ sp_api::impl_runtime_apis! { parachains_runtime_api_impl::validation_code_hash::(para_id, assumption) } - fn staging_get_disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { + fn disputes() -> Vec<(SessionIndex, CandidateHash, DisputeState)> { runtime_parachains::runtime_api_impl::vstaging::get_session_disputes::() } }