Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Cache SessionInfo on new activated leaf in dispute-distribution #6993

Merged
merged 4 commits into from
Apr 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions node/network/dispute-distribution/src/sender/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,12 @@ async fn get_active_session_indices<Context>(
// Iterate all heads we track as active and fetch the child' session indices.
for head in active_heads {
let session_index = runtime.get_session_index_for_child(ctx.sender(), *head).await?;
// Cache session info
if let Err(err) =
runtime.get_session_info_by_index(ctx.sender(), *head, session_index).await
{
gum::debug!(target: LOG_TARGET, ?err, ?session_index, "Can't cache SessionInfo");
}
indeces.insert(session_index, *head);
}
Ok(indeces)
Expand Down
90 changes: 16 additions & 74 deletions node/network/dispute-distribution/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ fn send_dispute_sends_dispute() {

let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
send_dispute(&mut handle, candidate, true).await;
send_dispute(&mut handle, candidate).await;
conclude(&mut handle).await;
};
test_harness(test);
Expand All @@ -96,15 +96,15 @@ fn send_honors_rate_limit() {
let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
let before_request = Instant::now();
send_dispute(&mut handle, candidate, true).await;
send_dispute(&mut handle, candidate).await;
// First send should not be rate limited:
gum::trace!("Passed time: {:#?}", Instant::now().saturating_duration_since(before_request));
// This test would likely be flaky on CI:
//assert!(Instant::now().saturating_duration_since(before_request) < SEND_RATE_LIMIT);

let relay_parent = Hash::random();
let candidate = make_candidate_receipt(relay_parent);
send_dispute(&mut handle, candidate, false).await;
send_dispute(&mut handle, candidate).await;
// Second send should be rate limited:
gum::trace!(
"Passed time for send_dispute: {:#?}",
Expand All @@ -120,7 +120,6 @@ fn send_honors_rate_limit() {
async fn send_dispute(
handle: &mut TestSubsystemContextHandle<DisputeDistributionMessage>,
candidate: CandidateReceipt,
needs_session_info: bool,
) {
let before_request = Instant::now();
let message = make_dispute_message(candidate.clone(), ALICE_INDEX, FERDIE_INDEX);
Expand All @@ -138,25 +137,6 @@ async fn send_dispute(
"Passed time for sending message: {:#?}",
Instant::now().saturating_duration_since(before_request)
);
if needs_session_info {
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);
}

let expected_receivers = {
let info = &MOCK_SESSION_INFO;
Expand Down Expand Up @@ -492,23 +472,6 @@ fn send_dispute_gets_cleaned_up() {
msg: DisputeDistributionMessage::SendDispute(message.clone()),
})
.await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);

let expected_receivers = {
let info = &MOCK_SESSION_INFO;
Expand Down Expand Up @@ -558,23 +521,6 @@ fn dispute_retries_and_works_across_session_boundaries() {
msg: DisputeDistributionMessage::SendDispute(message.clone()),
})
.await;
// Requests needed session info:
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(
RuntimeApiMessage::Request(
hash,
RuntimeApiRequest::SessionInfo(session_index, tx)
)
) => {
assert_eq!(session_index, MOCK_SESSION_INDEX);
assert_eq!(
hash,
message.candidate_receipt().descriptor.relay_parent
);
tx.send(Ok(Some(MOCK_SESSION_INFO.clone()))).expect("Receiver should stay alive.");
}
);

let expected_receivers: HashSet<_> = {
let info = &MOCK_SESSION_INFO;
Expand Down Expand Up @@ -776,7 +722,6 @@ async fn activate_leaf(
// Currently active disputes to send to the subsystem.
active_disputes: Vec<(SessionIndex, CandidateHash, DisputeStatus)>,
) {
let has_active_disputes = !active_disputes.is_empty();
handle
.send(FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate {
activated: Some(ActivatedLeaf {
Expand All @@ -798,27 +743,24 @@ async fn activate_leaf(
tx.send(Ok(session_index)).expect("Receiver should stay alive.");
}
);
assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(tx)) => {
tx.send(active_disputes).expect("Receiver should stay alive.");
}
);

let new_session = match (new_session, has_active_disputes) {
(Some(new_session), true) => new_session,
_ => return,
};

assert_matches!(
if let Some(session_info) = new_session {
assert_matches!(
handle.recv().await,
AllMessages::RuntimeApi(RuntimeApiMessage::Request(
h,
RuntimeApiRequest::SessionInfo(i, tx)
h,
RuntimeApiRequest::SessionInfo(session_idx, tx)
)) => {
assert_eq!(h, activate);
assert_eq!(i, session_index);
tx.send(Ok(Some(new_session))).expect("Receiver should stay alive.");
assert_eq!(session_index, session_idx);
tx.send(Ok(Some(session_info))).expect("Receiver should stay alive.");
});
}

assert_matches!(
handle.recv().await,
AllMessages::DisputeCoordinator(DisputeCoordinatorMessage::ActiveDisputes(tx)) => {
tx.send(active_disputes).expect("Receiver should stay alive.");
}
);
}
Expand Down