Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'origin/master' into gav-better-fund-tra…
Browse files Browse the repository at this point in the history
…cking
  • Loading branch information
muharem committed Jan 9, 2023
2 parents 86c67a4 + 1c935f3 commit 0a18479
Show file tree
Hide file tree
Showing 60 changed files with 2,495 additions and 1,470 deletions.
1,616 changes: 1,314 additions & 302 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ tikv-jemallocator = "0.5.0"
assert_cmd = "2.0.4"
nix = "0.24.1"
tempfile = "3.2.0"
tokio = "1.22.0"
tokio = "1.24.1"
substrate-rpc-client = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-core-primitives = { path = "core-primitives" }

Expand Down
13 changes: 8 additions & 5 deletions node/client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use sp_api::{CallApiAt, Encode, NumberFor, ProvideRuntimeApi};
use sp_blockchain::{HeaderBackend, HeaderMetadata};
use sp_consensus::BlockStatus;
use sp_runtime::{
generic::{BlockId, SignedBlock},
generic::SignedBlock,
traits::{BlakeTwo256, Block as BlockT},
Justifications,
};
Expand Down Expand Up @@ -338,22 +338,25 @@ impl sc_client_api::BlockBackend<Block> for Client {
}
}

fn block(&self, id: &BlockId<Block>) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
fn block(
&self,
hash: <Block as BlockT>::Hash,
) -> sp_blockchain::Result<Option<SignedBlock<Block>>> {
with_client! {
self,
client,
{
client.block(id)
client.block(hash)
}
}
}

fn block_status(&self, id: &BlockId<Block>) -> sp_blockchain::Result<BlockStatus> {
fn block_status(&self, hash: <Block as BlockT>::Hash) -> sp_blockchain::Result<BlockStatus> {
with_client! {
self,
client,
{
client.block_status(id)
client.block_status(hash)
}
}
}
Expand Down
8 changes: 8 additions & 0 deletions node/core/av-store/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,14 @@ async fn run_iteration<Context>(
FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, number)) => {
let _timer = subsystem.metrics.time_process_block_finalized();

if !subsystem.known_blocks.is_known(&hash) {
// If we haven't processed this block yet,
// make sure we write the metadata about the
// candidates backed in this finalized block.
// Otherwise, we won't be able to store our chunk
// for these candidates.
process_block_activated(ctx, subsystem, hash).await?;
}
subsystem.finalized_number = Some(number);
subsystem.known_blocks.prune_finalized(number);
process_block_finalized(
Expand Down
45 changes: 43 additions & 2 deletions node/core/av-store/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,49 @@ fn we_dont_miss_anything_if_import_notifications_are_missed() {
let test_state = TestState::default();

test_harness(test_state.clone(), store.clone(), |mut virtual_overseer| async move {
overseer_signal(&mut virtual_overseer, OverseerSignal::BlockFinalized(Hash::zero(), 1))
.await;
let block_hash = Hash::repeat_byte(1);
overseer_signal(&mut virtual_overseer, OverseerSignal::BlockFinalized(block_hash, 1)).await;

let header = Header {
parent_hash: Hash::repeat_byte(0),
number: 1,
state_root: Hash::zero(),
extrinsics_root: Hash::zero(),
digest: Default::default(),
};

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::ChainApi(ChainApiMessage::BlockHeader(
relay_parent,
tx,
)) => {
assert_eq!(relay_parent, block_hash);
tx.send(Ok(Some(header))).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::CandidateEvents(tx),
)) => {
assert_eq!(relay_parent, block_hash);
tx.send(Ok(Vec::new())).unwrap();
}
);

assert_matches!(
overseer_recv(&mut virtual_overseer).await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
relay_parent,
RuntimeApiRequest::Validators(tx),
)) => {
assert_eq!(relay_parent, Hash::zero());
tx.send(Ok(Vec::new())).unwrap();
}
);

let header = Header {
parent_hash: Hash::repeat_byte(3),
Expand Down
5 changes: 3 additions & 2 deletions node/core/candidate-validation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ parity-scale-codec = { version = "3.1.5", default-features = false, features = [
polkadot-primitives = { path = "../../../primitives" }
polkadot-parachain = { path = "../../../parachain" }
polkadot-node-primitives = { path = "../../primitives" }
polkadot-node-subsystem = {path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
polkadot-node-subsystem = { path = "../../subsystem" }
polkadot-node-metrics = { path = "../../metrics" }

[target.'cfg(not(any(target_os = "android", target_os = "unknown")))'.dependencies]
polkadot-node-core-pvf = { path = "../pvf" }
Expand All @@ -27,5 +27,6 @@ sp-keyring = { git = "https://github.com/paritytech/substrate", branch = "master
futures = { version = "0.3.21", features = ["thread-pool"] }
assert_matches = "1.4.0"
polkadot-node-subsystem-test-helpers = { path = "../../subsystem-test-helpers" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sp-core = { git = "https://github.com/paritytech/substrate", branch = "master" }
test-helpers = { package = "polkadot-primitives-test-helpers", path = "../../../primitives/test-helpers" }
18 changes: 14 additions & 4 deletions node/core/candidate-validation/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -604,13 +604,16 @@ async fn validate_candidate_exhaustive(

#[async_trait]
trait ValidationBackend {
/// Tries executing a PVF a single time (no retries).
async fn validate_candidate(
&mut self,
pvf: Pvf,
timeout: Duration,
encoded_params: Vec<u8>,
) -> Result<WasmValidationResult, ValidationError>;

/// Tries executing a PVF. Will retry once if an error is encountered that may have been
/// transient.
async fn validate_candidate_with_retry(
&mut self,
raw_validation_code: Vec<u8>,
Expand All @@ -620,7 +623,7 @@ trait ValidationBackend {
// Construct the PVF a single time, since it is an expensive operation. Cloning it is cheap.
let pvf = Pvf::from_code(raw_validation_code);

let validation_result =
let mut validation_result =
self.validate_candidate(pvf.clone(), timeout, params.encode()).await;

// If we get an AmbiguousWorkerDeath error, retry once after a brief delay, on the
Expand All @@ -630,12 +633,19 @@ trait ValidationBackend {
{
// Wait a brief delay before retrying.
futures_timer::Delay::new(PVF_EXECUTION_RETRY_DELAY).await;

gum::debug!(
target: LOG_TARGET,
?pvf,
"Re-trying failed candidate validation due to AmbiguousWorkerDeath."
);

// Encode the params again when re-trying. We expect the retry case to be relatively
// rare, and we want to avoid unconditionally cloning data.
self.validate_candidate(pvf, timeout, params.encode()).await
} else {
validation_result
validation_result = self.validate_candidate(pvf, timeout, params.encode()).await;
}

validation_result
}

async fn precheck_pvf(&mut self, pvf: Pvf) -> Result<Duration, PrepareError>;
Expand Down
2 changes: 1 addition & 1 deletion node/core/candidate-validation/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use super::{ValidationFailed, ValidationResult};
use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_metrics::metrics::{self, prometheus};

#[derive(Clone)]
pub(crate) struct MetricsInner {
Expand Down
2 changes: 1 addition & 1 deletion node/core/chain-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ futures = "0.3.21"
gum = { package = "tracing-gum", path = "../../gum" }
sp-blockchain = { git = "https://github.com/paritytech/substrate", branch = "master" }
polkadot-primitives = { path = "../../../primitives" }
polkadot-node-metrics = { path = "../../metrics" }
polkadot-node-subsystem = {path = "../../subsystem" }
polkadot-node-subsystem-util = { path = "../../subsystem-util" }
sc-client-api = { git = "https://github.com/paritytech/substrate", branch = "master" }
sc-consensus-babe = { git = "https://github.com/paritytech/substrate", branch = "master" }

Expand Down
2 changes: 1 addition & 1 deletion node/core/chain-api/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
// You should have received a copy of the GNU General Public License
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use polkadot_node_subsystem_util::metrics::{self, prometheus};
use polkadot_node_metrics::metrics::{self, prometheus};

#[derive(Clone)]
pub(crate) struct MetricsInner {
Expand Down
35 changes: 22 additions & 13 deletions node/core/dispute-coordinator/src/initialized.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,8 +269,13 @@ impl Initialized {
update: ActiveLeavesUpdate,
now: u64,
) -> Result<()> {
let on_chain_votes =
let scraped_updates =
self.scraper.process_active_leaves_update(ctx.sender(), &update).await?;
log_error(
self.participation
.bump_to_priority_for_candidates(ctx, &scraped_updates.included_receipts)
.await,
)?;
self.participation.process_active_leaves_update(ctx, &update).await?;

if let Some(new_leaf) = update.activated {
Expand Down Expand Up @@ -308,7 +313,7 @@ impl Initialized {

// The `runtime-api` subsystem has an internal queue which serializes the execution,
// so there is no point in running these in parallel.
for votes in on_chain_votes {
for votes in scraped_updates.on_chain_votes {
let _ = self.process_on_chain_votes(ctx, overlay_db, votes, now).await.map_err(
|error| {
gum::warn!(
Expand Down Expand Up @@ -416,6 +421,8 @@ impl Initialized {
})
.collect();

// Importantly, handling import statements for backing votes also
// clears spam slots for any newly backed candidates
let import_result = self
.handle_import_statements(
ctx,
Expand Down Expand Up @@ -837,8 +844,15 @@ impl Initialized {
let new_state = import_result.new_state();

let is_included = self.scraper.is_candidate_included(&candidate_hash);

let potential_spam = !is_included && !new_state.is_confirmed() && !new_state.has_own_vote();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_confirmed = new_state.is_confirmed();
let potential_spam =
!is_included && !is_backed && !new_state.is_confirmed() && !new_state.has_own_vote();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

gum::trace!(
target: LOG_TARGET,
Expand All @@ -851,8 +865,11 @@ impl Initialized {
"Is spam?"
);

// This check is responsible for all clearing of spam slots. It runs
// whenever a vote is imported from on or off chain, and decrements
// slots whenever a candidate is newly backed, confirmed, or has our
// own vote.
if !potential_spam {
// Former spammers have not been spammers after all:
self.spam_slots.clear(&(session, candidate_hash));

// Potential spam:
Expand Down Expand Up @@ -880,14 +897,6 @@ impl Initialized {
}
}

let has_own_vote = new_state.has_own_vote();
let is_disputed = new_state.is_disputed();
let has_controlled_indices = !env.controlled_indices().is_empty();
let is_backed = self.scraper.is_candidate_backed(&candidate_hash);
let is_confirmed = new_state.is_confirmed();
// We participate only in disputes which are included, backed or confirmed
let allow_participation = is_included || is_backed || is_confirmed;

// Participate in dispute if we did not cast a vote before and actually have keys to cast a
// local vote. Disputes should fall in one of the categories below, otherwise we will refrain
// from participation:
Expand Down
16 changes: 16 additions & 0 deletions node/core/dispute-coordinator/src/participation/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,10 @@ pub use queues::{ParticipationPriority, ParticipationRequest, QueueError};
/// This should be a relatively low value, while we might have a speedup once we fetched the data,
/// due to multi-core architectures, but the fetching itself can not be improved by parallel
/// requests. This means that higher numbers make it harder for a single dispute to resolve fast.
#[cfg(not(test))]
const MAX_PARALLEL_PARTICIPATIONS: usize = 3;
#[cfg(test)]
pub(crate) const MAX_PARALLEL_PARTICIPATIONS: usize = 1;

/// Keep track of disputes we need to participate in.
///
Expand Down Expand Up @@ -212,6 +215,19 @@ impl Participation {
Ok(())
}

/// Moving any request concerning the given candidates from best-effort to
/// priority, ignoring any candidates that don't have any queued participation requests.
pub async fn bump_to_priority_for_candidates<Context>(
&mut self,
ctx: &mut Context,
included_receipts: &Vec<CandidateReceipt>,
) -> Result<()> {
for receipt in included_receipts {
self.queue.prioritize_if_present(ctx.sender(), receipt).await?;
}
Ok(())
}

/// Dequeue until `MAX_PARALLEL_PARTICIPATIONS` is reached.
async fn dequeue_until_capacity<Context>(
&mut self,
Expand Down
25 changes: 25 additions & 0 deletions node/core/dispute-coordinator/src/participation/queues/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,31 @@ impl Queues {
self.pop_best_effort().map(|d| d.1)
}

/// Reprioritizes any participation requests pertaining to the
/// passed candidates from best effort to priority.
pub async fn prioritize_if_present(
&mut self,
sender: &mut impl overseer::DisputeCoordinatorSenderTrait,
receipt: &CandidateReceipt,
) -> Result<()> {
let comparator = CandidateComparator::new(sender, receipt).await?;
self.prioritize_with_comparator(comparator)?;
Ok(())
}

fn prioritize_with_comparator(
&mut self,
comparator: CandidateComparator,
) -> std::result::Result<(), QueueError> {
if self.priority.len() >= PRIORITY_QUEUE_SIZE {
return Err(QueueError::PriorityFull)
}
if let Some(request) = self.best_effort.remove(&comparator) {
self.priority.insert(comparator, request);
}
Ok(())
}

fn queue_with_comparator(
&mut self,
comparator: CandidateComparator,
Expand Down
Loading

0 comments on commit 0a18479

Please sign in to comment.