diff --git a/Cargo.lock b/Cargo.lock index 6f6d3024f2a2..92b6c18c0c49 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6559,6 +6559,7 @@ dependencies = [ "env_logger 0.9.0", "futures", "log", + "polkadot-node-jaeger", "polkadot-node-metrics", "polkadot-node-network-protocol", "polkadot-node-primitives", diff --git a/node/core/approval-voting/src/import.rs b/node/core/approval-voting/src/import.rs index f36b79c7a4e1..8e8ad818fe49 100644 --- a/node/core/approval-voting/src/import.rs +++ b/node/core/approval-voting/src/import.rs @@ -329,13 +329,17 @@ pub(crate) async fn handle_new_head( finalized_number: &Option, ) -> SubsystemResult> { const MAX_HEADS_LOOK_BACK: BlockNumber = MAX_FINALITY_LAG; - - let mut span = jaeger::Span::new(head, "approval-checking-import"); + let _handle_new_head_span = state + .spans + .get(&head) + .map(|span| span.child("handle-new-head")) + .unwrap_or_else(|| jaeger::Span::new(head, "handle-new-head")) + .with_string_tag("head", format!("{:?}", head)) + .with_stage(jaeger::Stage::ApprovalChecking); let header = { let (h_tx, h_rx) = oneshot::channel(); ctx.send_message(ChainApiMessage::BlockHeader(head, h_tx)).await; - match h_rx.await? { Err(e) => { gum::debug!( @@ -343,11 +347,12 @@ pub(crate) async fn handle_new_head( "Chain API subsystem temporarily unreachable {}", e, ); - + // May be a better way of handling errors here. return Ok(Vec::new()) }, Ok(None) => { gum::warn!(target: LOG_TARGET, "Missing header for new head {}", head); + // May be a better way of handling warnings here. return Ok(Vec::new()) }, Ok(Some(h)) => h, @@ -363,7 +368,6 @@ pub(crate) async fn handle_new_head( ?e, "Could not cache session info when processing head.", ); - return Ok(Vec::new()) }, Ok(Some(a @ SessionWindowUpdate::Advanced { .. })) => { @@ -391,8 +395,6 @@ pub(crate) async fn handle_new_head( .map_err(|e| SubsystemError::with_origin("approval-voting", e)) .await?; - span.add_uint_tag("new-blocks", new_blocks.len() as u64); - if new_blocks.is_empty() { return Ok(Vec::new()) } @@ -473,6 +475,7 @@ pub(crate) async fn handle_new_head( ); (block_tick, no_show_duration) }; + let needed_approvals = session_info.needed_approvals; let validator_group_lens: Vec = session_info.validator_groups.iter().map(|v| v.len()).collect(); @@ -507,11 +510,9 @@ pub(crate) async fn handle_new_head( result.len(), ); } - result } }; - // If all bits are already set, then send an approve message. if approved_bitfield.count_ones() == approved_bitfield.len() { ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; @@ -602,7 +603,6 @@ pub(crate) async fn handle_new_head( ); ctx.send_unbounded_message(ApprovalDistributionMessage::NewBlocks(approval_meta)); - Ok(imported_candidates) } @@ -661,6 +661,7 @@ pub(crate) mod tests { assignment_criteria: Box::new(MockAssignmentCriteria), db, db_config: TEST_CONFIG, + spans: HashMap::new(), } } diff --git a/node/core/approval-voting/src/lib.rs b/node/core/approval-voting/src/lib.rs index 59db8732a429..a26ea4e59dab 100644 --- a/node/core/approval-voting/src/lib.rs +++ b/node/core/approval-voting/src/lib.rs @@ -21,6 +21,7 @@ //! of others. It uses this information to determine when candidates and blocks have //! been sufficiently approved to finalize. +use jaeger::{hash_to_trace_identifier, PerLeafSpan}; use polkadot_node_jaeger as jaeger; use polkadot_node_primitives::{ approval::{ @@ -478,7 +479,11 @@ impl Wakeups { self.wakeups.entry(tick).or_default().push((block_hash, candidate_hash)); } - fn prune_finalized_wakeups(&mut self, finalized_number: BlockNumber) { + fn prune_finalized_wakeups( + &mut self, + finalized_number: BlockNumber, + spans: &mut HashMap, + ) { let after = self.block_numbers.split_off(&(finalized_number + 1)); let pruned_blocks: HashSet<_> = std::mem::replace(&mut self.block_numbers, after) .into_iter() @@ -502,6 +507,9 @@ impl Wakeups { } } } + + // Remove all spans that are associated with pruned blocks. + spans.retain(|h, _| !pruned_blocks.contains(h)); } // Get the wakeup for a particular block/candidate combo, if any. @@ -639,6 +647,7 @@ struct State { // Require for `RollingSessionWindow`. db_config: DatabaseConfig, db: Arc, + spans: HashMap, } #[overseer::contextbounds(ApprovalVoting, prefix = self::overseer)] @@ -777,6 +786,7 @@ where assignment_criteria, db_config: subsystem.db_config, db: subsystem.db, + spans: HashMap::new(), }; let mut wakeups = Wakeups::default(); @@ -798,14 +808,13 @@ where loop { let mut overlayed_db = OverlayedBackend::new(&backend); let actions = futures::select! { - (tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { + (_tick, woken_block, woken_candidate) = wakeups.next(&*state.clock).fuse() => { subsystem.metrics.on_wakeup(); process_wakeup( &mut state, &mut overlayed_db, woken_block, woken_candidate, - tick, &subsystem.metrics, )? } @@ -878,7 +887,6 @@ where if !overlayed_db.is_empty() { let _timer = subsystem.metrics.time_db_transaction(); - let ops = overlayed_db.into_write_ops(); backend.write(ops)?; } @@ -919,12 +927,12 @@ async fn handle_actions( actions: Vec, ) -> SubsystemResult { let mut conclude = false; - let mut actions_iter = actions.into_iter(); while let Some(action) = actions_iter.next() { match action { - Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => - wakeups.schedule(block_hash, block_number, candidate_hash, tick), + Action::ScheduleWakeup { block_hash, block_number, candidate_hash, tick } => { + wakeups.schedule(block_hash, block_number, candidate_hash, tick); + }, Action::IssueApproval(candidate_hash, approval_request) => { // Note that the IssueApproval action will create additional // actions that will need to all be processed before we can @@ -968,8 +976,18 @@ async fn handle_actions( continue } + let mut launch_approval_span = state + .spans + .get(&relay_block_hash) + .map(|span| span.child("launch-approval")) + .unwrap_or_else(|| jaeger::Span::new(candidate_hash, "launch-approval")) + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_stage(jaeger::Stage::ApprovalChecking); + metrics.on_assignment_produced(assignment_tranche); let block_hash = indirect_cert.block_hash; + launch_approval_span.add_string_tag("block-hash", format!("{:?}", block_hash)); let validator_index = indirect_cert.validator; ctx.send_unbounded_message(ApprovalDistributionMessage::DistributeAssignment( @@ -1004,6 +1022,7 @@ async fn handle_actions( validator_index, block_hash, backing_group, + &launch_approval_span, ) .await }, @@ -1014,12 +1033,21 @@ async fn handle_actions( } }, Action::NoteApprovedInChainSelection(block_hash) => { + let _span = state + .spans + .get(&block_hash) + .map(|span| span.child("note-approved-in-chain-selection")) + .unwrap_or_else(|| { + jaeger::Span::new(block_hash, "note-approved-in-chain-selection") + }) + .with_string_tag("block-hash", format!("{:?}", block_hash)) + .with_stage(jaeger::Stage::ApprovalChecking); ctx.send_message(ChainSelectionMessage::Approved(block_hash)).await; }, Action::BecomeActive => { *mode = Mode::Active; - let messages = distribution_messages_for_activation(overlayed_db)?; + let messages = distribution_messages_for_activation(overlayed_db, state)?; ctx.send_messages(messages.into_iter()).await; }, @@ -1034,6 +1062,7 @@ async fn handle_actions( fn distribution_messages_for_activation( db: &OverlayedBackend<'_, impl Backend>, + state: &mut State, ) -> SubsystemResult> { let all_blocks: Vec = db.load_all_blocks()?; @@ -1043,6 +1072,15 @@ fn distribution_messages_for_activation( messages.push(ApprovalDistributionMessage::NewBlocks(Vec::new())); // dummy value. for block_hash in all_blocks { + let mut distribution_message_span = state + .spans + .get(&block_hash) + .map(|span| span.child("distribution-messages-for-activation")) + .unwrap_or_else(|| { + jaeger::Span::new(block_hash, "distribution-messages-for-activation") + }) + .with_stage(jaeger::Stage::ApprovalChecking) + .with_string_tag("block-hash", format!("{:?}", block_hash)); let block_entry = match db.load_block_entry(&block_hash)? { Some(b) => b, None => { @@ -1051,6 +1089,10 @@ fn distribution_messages_for_activation( continue }, }; + + distribution_message_span.add_string_tag("block-hash", &block_hash.to_string()); + distribution_message_span + .add_string_tag("parent-hash", &block_entry.parent_hash().to_string()); approval_meta.push(BlockApprovalMeta { hash: block_hash, number: block_entry.block_number(), @@ -1061,6 +1103,8 @@ fn distribution_messages_for_activation( }); for (i, (_, candidate_hash)) in block_entry.candidates().iter().enumerate() { + let _candidate_span = + distribution_message_span.child("candidate").with_candidate(*candidate_hash); let candidate_entry = match db.load_candidate_entry(&candidate_hash)? { Some(c) => c, None => { @@ -1140,9 +1184,11 @@ async fn handle_from_overseer( let actions = match x { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { let mut actions = Vec::new(); - if let Some(activated) = update.activated { let head = activated.hash; + let approval_voting_span = + jaeger::PerLeafSpan::new(activated.span, "approval-voting"); + state.spans.insert(head, approval_voting_span); match import::handle_new_head(ctx, state, db, head, &*last_finalized_height).await { Err(e) => return Err(SubsystemError::with_origin("db", e)), Ok(block_imported_candidates) => { @@ -1199,7 +1245,12 @@ async fn handle_from_overseer( crate::ops::canonicalize(db, block_number, block_hash) .map_err(|e| SubsystemError::with_origin("db", e))?; - wakeups.prune_finalized_wakeups(block_number); + // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans accordingly. + wakeups.prune_finalized_wakeups(block_number, &mut state.spans); + + // // `prune_finalized_wakeups` prunes all finalized block hashes. We prune spans accordingly. + // let hash_set = wakeups.block_numbers.values().flatten().collect::>(); + // state.spans.retain(|hash, _| hash_set.contains(hash)); Vec::new() }, @@ -1220,7 +1271,23 @@ async fn handle_from_overseer( })? .0, ApprovalVotingMessage::ApprovedAncestor(target, lower_bound, res) => { - match handle_approved_ancestor(ctx, db, target, lower_bound, wakeups).await { + let mut approved_ancestor_span = state + .spans + .get(&target) + .map(|span| span.child("approved-ancestor")) + .unwrap_or_else(|| jaeger::Span::new(target, "approved-ancestor")) + .with_stage(jaeger::Stage::ApprovalChecking) + .with_string_tag("leaf", format!("{:?}", target)); + match handle_approved_ancestor( + ctx, + db, + target, + lower_bound, + wakeups, + &mut approved_ancestor_span, + ) + .await + { Ok(v) => { let _ = res.send(v); }, @@ -1342,15 +1409,15 @@ async fn handle_approved_ancestor( target: Hash, lower_bound: BlockNumber, wakeups: &Wakeups, + span: &mut jaeger::Span, ) -> SubsystemResult> { const MAX_TRACING_WINDOW: usize = 200; const ABNORMAL_DEPTH_THRESHOLD: usize = 5; - + let mut span = span + .child("handle-approved-ancestor") + .with_stage(jaeger::Stage::ApprovalChecking); use bitvec::{order::Lsb0, vec::BitVec}; - let mut span = - jaeger::Span::new(&target, "approved-ancestor").with_stage(jaeger::Stage::ApprovalChecking); - let mut all_approved_max = None; let target_number = { @@ -1365,13 +1432,12 @@ async fn handle_approved_ancestor( } }; + span.add_uint_tag("leaf-number", target_number as u64); + span.add_uint_tag("lower-bound", lower_bound as u64); if target_number <= lower_bound { return Ok(None) } - span.add_string_fmt_debug_tag("target-number", target_number); - span.add_string_fmt_debug_tag("target-hash", target); - // request ancestors up to but not including the lower bound, // as a vote on the lower bound is implied if we cannot find // anything else. @@ -1397,6 +1463,9 @@ async fn handle_approved_ancestor( let mut bits: BitVec = Default::default(); for (i, block_hash) in std::iter::once(target).chain(ancestry).enumerate() { + let mut entry_span = + span.child("load-block-entry").with_stage(jaeger::Stage::ApprovalChecking); + entry_span.add_string_tag("block-hash", format!("{:?}", block_hash)); // Block entries should be present as the assumption is that // nothing here is finalized. If we encounter any missing block // entries we can fail. @@ -1452,7 +1521,7 @@ async fn handle_approved_ancestor( unapproved.len(), entry.candidates().len(), ); - + entry_span.add_uint_tag("unapproved-candidates", unapproved.len() as u64); for candidate_hash in unapproved { match db.load_candidate_entry(&candidate_hash)? { None => { @@ -1575,8 +1644,8 @@ async fn handle_approved_ancestor( }); match all_approved_max { Some(HighestApprovedAncestorBlock { ref hash, ref number, .. }) => { - span.add_uint_tag("approved-number", *number as u64); - span.add_string_fmt_debug_tag("approved-hash", hash); + span.add_uint_tag("highest-approved-number", *number as u64); + span.add_string_fmt_debug_tag("highest-approved-hash", hash); }, None => { span.add_string_tag("reached-lower-bound", "true"); @@ -1677,6 +1746,15 @@ fn check_and_import_assignment( ) -> SubsystemResult<(AssignmentCheckResult, Vec)> { let tick_now = state.clock.tick_now(); + let mut check_and_import_assignment_span = state + .spans + .get(&assignment.block_hash) + .map(|span| span.child("check-and-import-assignment")) + .unwrap_or_else(|| jaeger::Span::new(assignment.block_hash, "check-and-import-assignment")) + .with_relay_parent(assignment.block_hash) + .with_uint_tag("candidate-index", candidate_index as u64) + .with_stage(jaeger::Stage::ApprovalChecking); + let block_entry = match db.load_block_entry(&assignment.block_hash)? { Some(b) => b, None => @@ -1711,6 +1789,13 @@ fn check_and_import_assignment( )), // no candidate at core. }; + check_and_import_assignment_span + .add_string_tag("candidate-hash", format!("{:?}", assigned_candidate_hash)); + check_and_import_assignment_span.add_string_tag( + "traceID", + format!("{:?}", jaeger::hash_to_trace_identifier(assigned_candidate_hash.0)), + ); + let mut candidate_entry = match db.load_candidate_entry(&assigned_candidate_hash)? { Some(c) => c, None => @@ -1769,6 +1854,8 @@ fn check_and_import_assignment( }, }; + check_and_import_assignment_span.add_uint_tag("tranche", tranche as u64); + let is_duplicate = approval_entry.is_assigned(assignment.validator); approval_entry.import_assignment(tranche, assignment.validator, tick_now); @@ -1822,6 +1909,15 @@ fn check_and_import_approval( }}; } + let mut span = state + .spans + .get(&approval.block_hash) + .map(|span| span.child("check-and-import-approval")) + .unwrap_or_else(|| jaeger::Span::new(approval.block_hash, "check-and-import-approval")) + .with_uint_tag("candidate-index", approval.candidate_index as u64) + .with_relay_parent(approval.block_hash) + .with_stage(jaeger::Stage::ApprovalChecking); + let block_entry = match db.load_block_entry(&approval.block_hash)? { Some(b) => b, None => { @@ -1847,6 +1943,12 @@ fn check_and_import_approval( )), }; + span.add_string_tag("candidate-hash", format!("{:?}", approved_candidate_hash)); + span.add_string_tag( + "traceID", + format!("{:?}", hash_to_trace_identifier(approved_candidate_hash.0)), + ); + let pubkey = match session_info.validators.get(approval.validator) { Some(k) => k, None => respond_early!(ApprovalCheckResult::Bad( @@ -2120,16 +2222,17 @@ fn process_wakeup( db: &mut OverlayedBackend<'_, impl Backend>, relay_block: Hash, candidate_hash: CandidateHash, - expected_tick: Tick, metrics: &Metrics, ) -> SubsystemResult> { - let _span = jaeger::Span::from_encodable( - (relay_block, candidate_hash, expected_tick), - "process-approval-wakeup", - ) - .with_relay_parent(relay_block) - .with_candidate(candidate_hash) - .with_stage(jaeger::Stage::ApprovalChecking); + let mut span = state + .spans + .get(&relay_block) + .map(|span| span.child("process-wakeup")) + .unwrap_or_else(|| jaeger::Span::new(candidate_hash, "process-wakeup")) + .with_trace_id(candidate_hash) + .with_relay_parent(relay_block) + .with_candidate(candidate_hash) + .with_stage(jaeger::Stage::ApprovalChecking); let block_entry = db.load_block_entry(&relay_block)?; let candidate_entry = db.load_candidate_entry(&candidate_hash)?; @@ -2159,9 +2262,8 @@ fn process_wakeup( state.slot_duration_millis, Slot::from(u64::from(session_info.no_show_slots)), ); - let tranche_now = state.clock.tranche_now(state.slot_duration_millis, block_entry.slot()); - + span.add_uint_tag("tranche", tranche_now as u64); gum::trace!( target: LOG_TARGET, tranche = tranche_now, @@ -2195,6 +2297,8 @@ fn process_wakeup( (should_trigger, approval_entry.backing_group()) }; + gum::trace!(target: LOG_TARGET, "Wakeup processed. Should trigger: {}", should_trigger); + let mut actions = Vec::new(); let candidate_receipt = candidate_entry.candidate_receipt().clone(); @@ -2243,7 +2347,6 @@ fn process_wakeup( }); } } - // Although we checked approval earlier in this function, // this wakeup might have advanced the state to approved via // a no-show that was immediately covered and therefore @@ -2275,6 +2378,7 @@ async fn launch_approval( validator_index: ValidatorIndex, block_hash: Hash, backing_group: GroupIndex, + span: &jaeger::Span, ) -> SubsystemResult> { let (a_tx, a_rx) = oneshot::channel(); let (code_tx, code_rx) = oneshot::channel(); @@ -2306,9 +2410,15 @@ async fn launch_approval( let candidate_hash = candidate.hash(); let para_id = candidate.descriptor.para_id; - gum::trace!(target: LOG_TARGET, ?candidate_hash, ?para_id, "Recovering data."); + let request_validation_data_span = span + .child("request-validation-data") + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_string_tag("block-hash", format!("{:?}", block_hash)) + .with_stage(jaeger::Stage::ApprovalChecking); + let timer = metrics.time_recover_and_approve(); ctx.send_message(AvailabilityRecoveryMessage::RecoverAvailableData( candidate.clone(), @@ -2318,6 +2428,13 @@ async fn launch_approval( )) .await; + let request_validation_result_span = span + .child("request-validation-result") + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_string_tag("block-hash", format!("{:?}", block_hash)) + .with_stage(jaeger::Stage::ApprovalChecking); + ctx.send_message(RuntimeApiMessage::Request( block_hash, RuntimeApiRequest::ValidationCodeByHash(candidate.descriptor.validation_code_hash, code_tx), @@ -2330,10 +2447,6 @@ async fn launch_approval( let background = async move { // Force the move of the timer into the background task. let _timer = timer; - let _span = jaeger::Span::from_encodable((block_hash, candidate_hash), "launch-approval") - .with_relay_parent(block_hash) - .with_candidate(candidate_hash) - .with_stage(jaeger::Stage::ApprovalChecking); let available_data = match a_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), @@ -2371,6 +2484,7 @@ async fn launch_approval( return ApprovalState::failed(validator_index, candidate_hash) }, }; + drop(request_validation_data_span); let validation_code = match code_rx.await { Err(_) => return ApprovalState::failed(validator_index, candidate_hash), @@ -2392,7 +2506,6 @@ async fn launch_approval( }; let (val_tx, val_rx) = oneshot::channel(); - sender .send_message(CandidateValidationMessage::ValidateFromExhaustive( available_data.validation_data, @@ -2430,7 +2543,6 @@ async fn launch_approval( candidate_hash, candidate.clone(), ); - metrics_guard.take().on_approval_invalid(); return ApprovalState::failed(validator_index, candidate_hash) }, @@ -2443,11 +2555,11 @@ async fn launch_approval( "Failed to validate candidate due to internal error", ); metrics_guard.take().on_approval_error(); + drop(request_validation_result_span); return ApprovalState::failed(validator_index, candidate_hash) }, } }; - let (background, remote_handle) = background.remote_handle(); ctx.spawn("approval-checks", Box::pin(background)).map(move |()| remote_handle) } @@ -2463,6 +2575,17 @@ async fn issue_approval( candidate_hash: CandidateHash, ApprovalVoteRequest { validator_index, block_hash }: ApprovalVoteRequest, ) -> SubsystemResult> { + let mut issue_approval_span = state + .spans + .get(&block_hash) + .map(|span| span.child("issue-approval")) + .unwrap_or_else(|| jaeger::Span::new(block_hash, "issue-approval")) + .with_trace_id(candidate_hash) + .with_string_tag("block-hash", format!("{:?}", block_hash)) + .with_candidate(candidate_hash) + .with_validator_index(validator_index) + .with_stage(jaeger::Stage::ApprovalChecking); + let block_entry = match db.load_block_entry(&block_hash)? { Some(b) => b, None => { @@ -2487,6 +2610,7 @@ async fn issue_approval( }, Some(idx) => idx, }; + issue_approval_span.add_int_tag("candidate_index", candidate_index as i64); let session_info = match state.session_info(block_entry.session()) { Some(s) => s, diff --git a/node/jaeger/src/spans.rs b/node/jaeger/src/spans.rs index de85867d169f..b67ca0f9f260 100644 --- a/node/jaeger/src/spans.rs +++ b/node/jaeger/src/spans.rs @@ -149,6 +149,7 @@ pub enum Stage { AvailabilityRecovery = 6, BitfieldDistribution = 7, ApprovalChecking = 8, + ApprovalDistribution = 9, // Expand as needed, numbers should be ascending according to the stage // through the inclusion pipeline, or according to the descriptions // in [the path of a para chain block] @@ -283,6 +284,13 @@ impl Span { } } + /// Attach a 'traceID' tag set to the decimal representation of the candidate hash. + #[inline(always)] + pub fn with_trace_id(mut self, candidate_hash: CandidateHash) -> Self { + self.add_string_tag("traceID", hash_to_trace_identifier(candidate_hash.0)); + self + } + #[inline(always)] pub fn with_string_tag(mut self, tag: &'static str, val: V) -> Self { self.add_string_tag::(tag, val); diff --git a/node/network/approval-distribution/Cargo.toml b/node/network/approval-distribution/Cargo.toml index 6df854072aa6..87e7d8456188 100644 --- a/node/network/approval-distribution/Cargo.toml +++ b/node/network/approval-distribution/Cargo.toml @@ -10,6 +10,7 @@ polkadot-node-network-protocol = { path = "../protocol" } polkadot-node-primitives = { path = "../../primitives" } polkadot-node-subsystem = { path = "../../subsystem" } polkadot-primitives = { path = "../../../primitives" } +polkadot-node-jaeger = { path = "../../jaeger" } rand = "0.8" futures = "0.3.21" diff --git a/node/network/approval-distribution/src/lib.rs b/node/network/approval-distribution/src/lib.rs index 3c6ed8661e0e..f0df22b559e6 100644 --- a/node/network/approval-distribution/src/lib.rs +++ b/node/network/approval-distribution/src/lib.rs @@ -21,6 +21,7 @@ #![warn(missing_docs)] use futures::{channel::oneshot, FutureExt as _}; +use polkadot_node_jaeger as jaeger; use polkadot_node_network_protocol::{ self as net_protocol, grid_topology::{RandomRouting, RequiredRouting, SessionGridTopologies, SessionGridTopology}, @@ -35,7 +36,7 @@ use polkadot_node_subsystem::{ ApprovalCheckResult, ApprovalDistributionMessage, ApprovalVotingMessage, AssignmentCheckResult, NetworkBridgeEvent, NetworkBridgeTxMessage, }, - overseer, ActiveLeavesUpdate, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, + overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; use polkadot_primitives::{ BlockNumber, CandidateIndex, Hash, SessionIndex, ValidatorIndex, ValidatorSignature, @@ -180,6 +181,9 @@ struct State { /// Config for aggression. aggression_config: AggressionConfig, + + /// HashMap from active leaves to spans + spans: HashMap, } #[derive(Debug, Clone, Copy, PartialEq, Eq)] @@ -390,9 +394,18 @@ impl State { ) { let mut new_hashes = HashSet::new(); for meta in &metas { + let mut span = self + .spans + .get(&meta.hash) + .map(|span| span.child(&"handle-new-blocks")) + .unwrap_or_else(|| jaeger::Span::new(meta.hash, &"handle-new-blocks")) + .with_string_tag("block-hash", format!("{:?}", meta.hash)) + .with_stage(jaeger::Stage::ApprovalDistribution); + match self.blocks.entry(meta.hash) { hash_map::Entry::Vacant(entry) => { let candidates_count = meta.candidates.len(); + span.add_uint_tag("candidates-count", candidates_count as u64); let mut candidates = Vec::with_capacity(candidates_count); candidates.resize_with(candidates_count, Default::default); @@ -690,6 +703,7 @@ impl State { if let Some(block_entry) = self.blocks.remove(relay_block) { self.topologies.dec_session_refs(block_entry.session); } + self.spans.remove(&relay_block); }); // If a block was finalized, this means we may need to move our aggression @@ -1230,6 +1244,14 @@ impl State { ) -> HashMap { let mut all_sigs = HashMap::new(); for (hash, index) in indices { + let _span = self + .spans + .get(&hash) + .map(|span| span.child("get-approval-signatures")) + .unwrap_or_else(|| jaeger::Span::new(&hash, "get-approval-signatures")) + .with_string_tag("block-hash", format!("{:?}", hash)) + .with_stage(jaeger::Stage::ApprovalDistribution); + let block_entry = match self.blocks.get(&hash) { None => { gum::debug!( @@ -1650,13 +1672,18 @@ impl ApprovalDistribution { match message { FromOrchestra::Communication { msg } => Self::handle_incoming(&mut ctx, state, msg, &self.metrics, rng).await, - FromOrchestra::Signal(OverseerSignal::ActiveLeaves(ActiveLeavesUpdate { - .. - })) => { + FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { gum::trace!(target: LOG_TARGET, "active leaves signal (ignored)"); // the relay chain blocks relevant to the approval subsystems // are those that are available, but not finalized yet - // actived and deactivated heads hence are irrelevant to this subsystem + // actived and deactivated heads hence are irrelevant to this subsystem, other than + // for tracing purposes. + if let Some(activated) = update.activated { + let head = activated.hash; + let approval_distribution_span = + jaeger::PerLeafSpan::new(activated.span, "approval-distribution"); + state.spans.insert(head, approval_distribution_span); + } }, FromOrchestra::Signal(OverseerSignal::BlockFinalized(_hash, number)) => { gum::trace!(target: LOG_TARGET, number = %number, "finalized signal"); @@ -1682,6 +1709,14 @@ impl ApprovalDistribution { state.handle_new_blocks(ctx, metrics, metas, rng).await; }, ApprovalDistributionMessage::DistributeAssignment(cert, candidate_index) => { + let _span = state + .spans + .get(&cert.block_hash) + .map(|span| span.child("import-and-distribute-assignment")) + .unwrap_or_else(|| jaeger::Span::new(&cert.block_hash, "distribute-assignment")) + .with_string_tag("block-hash", format!("{:?}", cert.block_hash)) + .with_stage(jaeger::Stage::ApprovalDistribution); + gum::debug!( target: LOG_TARGET, "Distributing our assignment on candidate (block={}, index={})", @@ -1701,6 +1736,14 @@ impl ApprovalDistribution { .await; }, ApprovalDistributionMessage::DistributeApproval(vote) => { + let _span = state + .spans + .get(&vote.block_hash) + .map(|span| span.child("import-and-distribute-approval")) + .unwrap_or_else(|| jaeger::Span::new(&vote.block_hash, "distribute-approval")) + .with_string_tag("block-hash", format!("{:?}", vote.block_hash)) + .with_stage(jaeger::Stage::ApprovalDistribution); + gum::debug!( target: LOG_TARGET, "Distributing our approval vote on candidate (block={}, index={})", diff --git a/node/network/availability-distribution/src/lib.rs b/node/network/availability-distribution/src/lib.rs index 41702cd7a874..f961d95de96d 100644 --- a/node/network/availability-distribution/src/lib.rs +++ b/node/network/availability-distribution/src/lib.rs @@ -20,9 +20,11 @@ use sp_keystore::KeystorePtr; use polkadot_node_network_protocol::request_response::{v1, IncomingRequestReceiver}; use polkadot_node_subsystem::{ - messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal, + jaeger, messages::AvailabilityDistributionMessage, overseer, FromOrchestra, OverseerSignal, SpawnedSubsystem, SubsystemError, }; +use polkadot_primitives::Hash; +use std::collections::HashMap; /// Error and [`Result`] type for this subsystem. mod error; @@ -91,6 +93,7 @@ impl AvailabilityDistributionSubsystem { /// Start processing work as passed on from the Overseer. async fn run(self, mut ctx: Context) -> std::result::Result<(), FatalError> { let Self { mut runtime, recvs, metrics } = self; + let mut spans: HashMap = HashMap::new(); let IncomingRequestReceivers { pov_req_receiver, chunk_req_receiver } = recvs; let mut requester = Requester::new(metrics.clone()).fuse(); @@ -131,15 +134,24 @@ impl AvailabilityDistributionSubsystem { }; match message { FromOrchestra::Signal(OverseerSignal::ActiveLeaves(update)) => { + let cloned_leaf = match update.activated.clone() { + Some(activated) => activated, + None => continue, + }; + let span = + jaeger::PerLeafSpan::new(cloned_leaf.span, "availability-distribution"); + spans.insert(cloned_leaf.hash, span); log_error( requester .get_mut() - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await, "Error in Requester::update_fetching_heads", )?; }, - FromOrchestra::Signal(OverseerSignal::BlockFinalized(..)) => {}, + FromOrchestra::Signal(OverseerSignal::BlockFinalized(hash, _)) => { + spans.remove(&hash); + }, FromOrchestra::Signal(OverseerSignal::Conclude) => return Ok(()), FromOrchestra::Communication { msg: @@ -152,6 +164,15 @@ impl AvailabilityDistributionSubsystem { tx, }, } => { + let span = spans + .get(&relay_parent) + .map(|span| span.child("fetch-pov")) + .unwrap_or_else(|| jaeger::Span::new(&relay_parent, "fetch-pov")) + .with_trace_id(candidate_hash) + .with_candidate(candidate_hash) + .with_relay_parent(relay_parent) + .with_stage(jaeger::Stage::AvailabilityDistribution); + log_error( pov_requester::fetch_pov( &mut ctx, @@ -163,6 +184,7 @@ impl AvailabilityDistributionSubsystem { pov_hash, tx, metrics.clone(), + &span, ) .await, "pov_requester::fetch_pov", diff --git a/node/network/availability-distribution/src/pov_requester/mod.rs b/node/network/availability-distribution/src/pov_requester/mod.rs index 2d9f53e63f5f..ebe33f3b9423 100644 --- a/node/network/availability-distribution/src/pov_requester/mod.rs +++ b/node/network/availability-distribution/src/pov_requester/mod.rs @@ -52,7 +52,18 @@ pub async fn fetch_pov( pov_hash: Hash, tx: oneshot::Sender, metrics: Metrics, + span: &jaeger::Span, ) -> Result<()> { + let _span = span + .child("fetch-pov") + .with_trace_id(candidate_hash) + .with_validator_index(from_validator) + .with_candidate(candidate_hash) + .with_para_id(para_id) + .with_relay_parent(parent) + .with_string_tag("pov-hash", format!("{:?}", pov_hash)) + .with_stage(jaeger::Stage::AvailabilityDistribution); + let info = &runtime.get_session_info(ctx.sender(), parent).await?.session_info; let authority_id = info .discovery_keys @@ -71,13 +82,9 @@ pub async fn fetch_pov( )) .await; - let span = jaeger::Span::new(candidate_hash, "fetch-pov") - .with_validator_index(from_validator) - .with_relay_parent(parent) - .with_para_id(para_id); ctx.spawn( "pov-fetcher", - fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), span, tx, metrics) + fetch_pov_job(para_id, pov_hash, authority_id, pending_response.boxed(), tx, metrics) .boxed(), ) .map_err(|e| FatalError::SpawnTask(e))?; @@ -90,11 +97,10 @@ async fn fetch_pov_job( pov_hash: Hash, authority_id: AuthorityDiscoveryId, pending_response: BoxFuture<'static, std::result::Result>, - span: jaeger::Span, tx: oneshot::Sender, metrics: Metrics, ) { - if let Err(err) = do_fetch_pov(pov_hash, pending_response, span, tx, metrics).await { + if let Err(err) = do_fetch_pov(pov_hash, pending_response, tx, metrics).await { gum::warn!(target: LOG_TARGET, ?err, ?para_id, ?pov_hash, ?authority_id, "fetch_pov_job"); } } @@ -103,7 +109,6 @@ async fn fetch_pov_job( async fn do_fetch_pov( pov_hash: Hash, pending_response: BoxFuture<'static, std::result::Result>, - _span: jaeger::Span, tx: oneshot::Sender, metrics: Metrics, ) -> Result<()> { @@ -182,6 +187,7 @@ mod tests { pov_hash, tx, Metrics::new_dummy(), + &jaeger::Span::Disabled, ) .await .expect("Should succeed"); diff --git a/node/network/availability-distribution/src/requester/fetch_task/mod.rs b/node/network/availability-distribution/src/requester/fetch_task/mod.rs index 09e12aece47f..adc2c32e30bd 100644 --- a/node/network/availability-distribution/src/requester/fetch_task/mod.rs +++ b/node/network/availability-distribution/src/requester/fetch_task/mod.rs @@ -140,7 +140,18 @@ impl FetchTaskConfig { sender: mpsc::Sender, metrics: Metrics, session_info: &SessionInfo, + span: jaeger::Span, ) -> Self { + let span = span + .child("fetch-task-config") + .with_trace_id(core.candidate_hash) + .with_string_tag("leaf", format!("{:?}", leaf)) + .with_validator_index(session_info.our_index) + .with_uint_tag("group-index", core.group_responsible.0 as u64) + .with_relay_parent(core.candidate_descriptor.relay_parent) + .with_string_tag("pov-hash", format!("{:?}", core.candidate_descriptor.pov_hash)) + .with_stage(jaeger::Stage::AvailabilityDistribution); + let live_in = vec![leaf].into_iter().collect(); // Don't run tasks for our backing group: @@ -148,9 +159,6 @@ impl FetchTaskConfig { return FetchTaskConfig { live_in, prepared_running: None } } - let span = jaeger::Span::new(core.candidate_hash, "availability-distribution") - .with_stage(jaeger::Stage::AvailabilityDistribution); - let prepared_running = RunningTask { session_index: session_info.session_index, group_index: core.group_responsible, @@ -251,20 +259,18 @@ impl RunningTask { let mut bad_validators = Vec::new(); let mut succeeded = false; let mut count: u32 = 0; - let mut _span = self - .span - .child("fetch-task") - .with_chunk_index(self.request.index.0) - .with_relay_parent(self.relay_parent); + let mut span = self.span.child("run-fetch-chunk-task").with_relay_parent(self.relay_parent); // Try validators in reverse order: while let Some(validator) = self.group.pop() { - let _try_span = _span.child("try"); // Report retries: if count > 0 { self.metrics.on_retry(); } count += 1; - + let _chunk_fetch_span = span + .child("fetch-chunk-request") + .with_chunk_index(self.request.index.0) + .with_stage(jaeger::Stage::AvailabilityDistribution); // Send request: let resp = match self.do_request(&validator).await { Ok(resp) => resp, @@ -281,6 +287,12 @@ impl RunningTask { continue }, }; + // We drop the span here, so that the span is not active while we recombine the chunk. + drop(_chunk_fetch_span); + let _chunk_recombine_span = span + .child("recombine-chunk") + .with_chunk_index(self.request.index.0) + .with_stage(jaeger::Stage::AvailabilityDistribution); let chunk = match resp { ChunkFetchingResponse::Chunk(resp) => resp.recombine_into_chunk(&self.request), ChunkFetchingResponse::NoSuchChunk => { @@ -298,6 +310,12 @@ impl RunningTask { continue }, }; + // We drop the span so that the span is not active whilst we validate and store the chunk. + drop(_chunk_recombine_span); + let _chunk_validate_and_store_span = span + .child("validate-and-store-chunk") + .with_chunk_index(self.request.index.0) + .with_stage(jaeger::Stage::AvailabilityDistribution); // Data genuine? if !self.validate_chunk(&validator, &chunk) { @@ -308,10 +326,9 @@ impl RunningTask { // Ok, let's store it and be happy: self.store_chunk(chunk).await; succeeded = true; - _span.add_string_tag("success", "true"); break } - _span.add_int_tag("tries", count as _); + span.add_int_tag("tries", count as _); if succeeded { self.metrics.on_fetch(SUCCEEDED); self.conclude(bad_validators).await; diff --git a/node/network/availability-distribution/src/requester/mod.rs b/node/network/availability-distribution/src/requester/mod.rs index 088937b6e995..58a04dbcbf76 100644 --- a/node/network/availability-distribution/src/requester/mod.rs +++ b/node/network/availability-distribution/src/requester/mod.rs @@ -33,6 +33,7 @@ use futures::{ }; use polkadot_node_subsystem::{ + jaeger, messages::{ChainApiMessage, RuntimeApiMessage}, overseer, ActivatedLeaf, ActiveLeavesUpdate, LeafStatus, }; @@ -100,14 +101,22 @@ impl Requester { ctx: &mut Context, runtime: &mut RuntimeInfo, update: ActiveLeavesUpdate, + spans: &HashMap, ) -> Result<()> { gum::trace!(target: LOG_TARGET, ?update, "Update fetching heads"); let ActiveLeavesUpdate { activated, deactivated } = update; // Stale leaves happen after a reversion - we don't want to re-run availability there. if let Some(leaf) = activated.filter(|leaf| leaf.status == LeafStatus::Fresh) { + let span = spans + .get(&leaf.hash) + .map(|span| span.child("update-fetching-heads")) + .unwrap_or_else(|| jaeger::Span::new(&leaf.hash, "update-fetching-heads")) + .with_string_tag("leaf", format!("{:?}", leaf.hash)) + .with_stage(jaeger::Stage::AvailabilityDistribution); + // Order important! We need to handle activated, prior to deactivated, otherwise we might // cancel still needed jobs. - self.start_requesting_chunks(ctx, runtime, leaf).await?; + self.start_requesting_chunks(ctx, runtime, leaf, &span).await?; } self.stop_requesting_chunks(deactivated.into_iter()); @@ -123,7 +132,13 @@ impl Requester { ctx: &mut Context, runtime: &mut RuntimeInfo, new_head: ActivatedLeaf, + span: &jaeger::Span, ) -> Result<()> { + let mut span = span + .child("request-chunks-new-head") + .with_string_tag("leaf", format!("{:?}", new_head.hash)) + .with_stage(jaeger::Stage::AvailabilityDistribution); + let sender = &mut ctx.sender().clone(); let ActivatedLeaf { hash: leaf, .. } = new_head; let (leaf_session_index, ancestors_in_session) = get_block_ancestors_in_same_session( @@ -133,8 +148,15 @@ impl Requester { Self::LEAF_ANCESTRY_LEN_WITHIN_SESSION, ) .await?; + span.add_uint_tag("ancestors-in-session", ancestors_in_session.len() as u64); + // Also spawn or bump tasks for candidates in ancestry in the same session. for hash in std::iter::once(leaf).chain(ancestors_in_session) { + let span = span + .child("request-chunks-ancestor") + .with_string_tag("leaf", format!("{:?}", hash.clone())) + .with_stage(jaeger::Stage::AvailabilityDistribution); + let cores = get_occupied_cores(sender, hash).await?; gum::trace!( target: LOG_TARGET, @@ -148,7 +170,7 @@ impl Requester { // The next time the subsystem receives leaf update, some of spawned task will be bumped // to be live in fresh relay parent, while some might get dropped due to the current leaf // being deactivated. - self.add_cores(ctx, runtime, leaf, leaf_session_index, cores).await?; + self.add_cores(ctx, runtime, leaf, leaf_session_index, cores, span).await?; } Ok(()) @@ -178,15 +200,24 @@ impl Requester { leaf: Hash, leaf_session_index: SessionIndex, cores: impl IntoIterator, + span: jaeger::Span, ) -> Result<()> { for core in cores { + let mut span = span + .child("check-fetch-candidate") + .with_trace_id(core.candidate_hash) + .with_string_tag("leaf", format!("{:?}", leaf)) + .with_candidate(core.candidate_hash) + .with_stage(jaeger::Stage::AvailabilityDistribution); match self.fetches.entry(core.candidate_hash) { Entry::Occupied(mut e) => // Just book keeping - we are already requesting that chunk: { + span.add_string_tag("already-requested-chunk", "true"); e.get_mut().add_leaf(leaf); }, Entry::Vacant(e) => { + span.add_string_tag("already-requested-chunk", "false"); let tx = self.tx.clone(); let metrics = self.metrics.clone(); @@ -201,7 +232,7 @@ impl Requester { // be fetchable by the state trie. leaf, leaf_session_index, - |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info), + |info| FetchTaskConfig::new(leaf, &core, tx, metrics, info, span), ) .await .map_err(|err| { diff --git a/node/network/availability-distribution/src/requester/tests.rs b/node/network/availability-distribution/src/requester/tests.rs index 59db97176a76..abb81d28131f 100644 --- a/node/network/availability-distribution/src/requester/tests.rs +++ b/node/network/availability-distribution/src/requester/tests.rs @@ -14,6 +14,8 @@ // You should have received a copy of the GNU General Public License // along with Polkadot. If not, see . +use std::collections::HashMap; + use std::{future::Future, sync::Arc}; use futures::FutureExt; @@ -196,7 +198,7 @@ fn check_ancestry_lookup_in_same_session() { test_harness(test_state.clone(), |mut ctx| async move { let chain = &test_state.relay_chain; - + let spans: HashMap = HashMap::new(); let block_number = 1; let update = ActiveLeavesUpdate { activated: Some(ActivatedLeaf { @@ -209,7 +211,7 @@ fn check_ancestry_lookup_in_same_session() { }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; @@ -229,7 +231,7 @@ fn check_ancestry_lookup_in_same_session() { }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; @@ -255,7 +257,7 @@ fn check_ancestry_lookup_in_same_session() { deactivated: vec![chain[1], chain[2]].into(), }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; @@ -283,7 +285,7 @@ fn check_ancestry_lookup_in_different_sessions() { test_harness(test_state.clone(), |mut ctx| async move { let chain = &test_state.relay_chain; - + let spans: HashMap = HashMap::new(); let block_number = 3; let update = ActiveLeavesUpdate { activated: Some(ActivatedLeaf { @@ -296,7 +298,7 @@ fn check_ancestry_lookup_in_different_sessions() { }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; @@ -314,7 +316,7 @@ fn check_ancestry_lookup_in_different_sessions() { }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; @@ -332,7 +334,7 @@ fn check_ancestry_lookup_in_different_sessions() { }; requester - .update_fetching_heads(&mut ctx, &mut runtime, update) + .update_fetching_heads(&mut ctx, &mut runtime, update, &spans) .await .expect("Leaf processing failed"); let fetch_tasks = &requester.fetches; diff --git a/node/network/availability-distribution/src/responder.rs b/node/network/availability-distribution/src/responder.rs index daf0c3175bf3..d8e5a9c1a659 100644 --- a/node/network/availability-distribution/src/responder.rs +++ b/node/network/availability-distribution/src/responder.rs @@ -186,7 +186,10 @@ where { let span = jaeger::Span::new(req.payload.candidate_hash, "answer-chunk-request"); - let _child_span = span.child("answer-chunk-request").with_chunk_index(req.payload.index.0); + let _child_span = span + .child("answer-chunk-request") + .with_trace_id(req.payload.candidate_hash) + .with_chunk_index(req.payload.index.0); let chunk = query_chunk(sender, req.payload.candidate_hash, req.payload.index).await?;