Skip to content
This repository has been archived by the owner on Nov 15, 2023. It is now read-only.

Commit

Permalink
Backing and collator protocol traces including para-id (#2620)
Browse files Browse the repository at this point in the history
* improve backing/provisioner spans

* span for collation requests

* add para_id to unbacked candidate spans

* differentiate validation-construction and find-assignment in selection

* better find-assignment spans

* organize unbacked-candidate spans directly under job root

* Update node/core/provisioner/src/lib.rs

Co-authored-by: Andronik Ordian <[email protected]>

Co-authored-by: Andronik Ordian <[email protected]>
  • Loading branch information
rphmeier and ordian authored Mar 14, 2021
1 parent a38eea6 commit fb3c6c8
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 32 deletions.
80 changes: 58 additions & 22 deletions node/core/backing/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,9 @@ async fn validate_and_make_available(
let v = {
let _span = span.as_ref().map(|s| {
s.child_builder("request-validation")
.with_pov(&pov)
.build()
.with_pov(&pov)
.with_para_id(candidate.descriptor().para_id)
.build()
});
request_candidate_validation(&mut tx_from, candidate.descriptor.clone(), pov.clone()).await?
};
Expand Down Expand Up @@ -611,6 +612,7 @@ impl CandidateBackingJob {
async fn validate_and_second(
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
candidate: &CandidateReceipt,
pov: Arc<PoV>,
) -> Result<(), Error> {
Expand All @@ -623,7 +625,13 @@ impl CandidateBackingJob {
}

let candidate_hash = candidate.hash();
let span = self.get_unbacked_validation_child(parent_span, candidate_hash);
let mut span = self.get_unbacked_validation_child(
root_span,
candidate_hash,
candidate.descriptor().para_id,
);

span.as_mut().map(|s| s.add_follows_from(parent_span));

tracing::debug!(
target: LOG_TARGET,
Expand Down Expand Up @@ -745,17 +753,17 @@ impl CandidateBackingJob {
Ok(summary)
}

#[tracing::instrument(level = "trace", skip(self, span), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
#[tracing::instrument(level = "trace", skip(self, root_span), fields(subsystem = LOG_TARGET))]
async fn process_msg(&mut self, root_span: &jaeger::Span, msg: CandidateBackingMessage) -> Result<(), Error> {
match msg {
CandidateBackingMessage::Second(_relay_parent, candidate, pov) => {
CandidateBackingMessage::Second(relay_parent, candidate, pov) => {
let _timer = self.metrics.time_process_second();

let span = span.child_builder("second")
let span = root_span.child_builder("second")
.with_stage(jaeger::Stage::CandidateBacking)
.with_pov(&pov)
.with_candidate(&candidate.hash())
.with_relay_parent(&_relay_parent)
.with_relay_parent(&relay_parent)
.build();

// Sanity check that candidate is from our assignment.
Expand All @@ -772,20 +780,20 @@ impl CandidateBackingJob {

if !self.issued_statements.contains(&candidate_hash) {
let pov = Arc::new(pov);
self.validate_and_second(&span, &candidate, pov).await?;
self.validate_and_second(&span, &root_span, &candidate, pov).await?;
}
}
}
CandidateBackingMessage::Statement(_relay_parent, statement) => {
let _timer = self.metrics.time_process_statement();
let span = span.child_builder("statement")
let span = root_span.child_builder("statement")
.with_stage(jaeger::Stage::CandidateBacking)
.with_candidate(&statement.payload().candidate_hash())
.with_relay_parent(&_relay_parent)
.build();

self.check_statement_signature(&statement)?;
match self.maybe_validate_and_import(&span, statement).await {
match self.maybe_validate_and_import(&span, &root_span, statement).await {
Err(Error::ValidationFailed(_)) => return Ok(()),
Err(e) => return Err(e),
Ok(()) => (),
Expand Down Expand Up @@ -865,12 +873,17 @@ impl CandidateBackingJob {
async fn maybe_validate_and_import(
&mut self,
parent_span: &jaeger::Span,
root_span: &jaeger::Span,
statement: SignedFullStatement,
) -> Result<(), Error> {
if let Some(summary) = self.import_statement(&statement, parent_span).await? {
if let Statement::Seconded(_) = statement.payload() {
if Some(summary.group_id) == self.assignment {
let span = self.get_unbacked_validation_child(parent_span, summary.candidate);
let span = self.get_unbacked_validation_child(
root_span,
summary.candidate,
summary.group_id,
);

self.kick_off_validation_work(summary, span).await?;
}
Expand Down Expand Up @@ -910,20 +923,37 @@ impl CandidateBackingJob {
}

/// Insert or get the unbacked-span for the given candidate hash.
fn insert_or_get_unbacked_span(&mut self, parent_span: &jaeger::Span, hash: CandidateHash) -> Option<&jaeger::Span> {
fn insert_or_get_unbacked_span(
&mut self,
parent_span: &jaeger::Span,
hash: CandidateHash,
para_id: Option<ParaId>
) -> Option<&jaeger::Span> {
if !self.backed.contains(&hash) {
// only add if we don't consider this backed.
let span = self.unbacked_candidates.entry(hash).or_insert_with(|| {
parent_span.child_with_candidate("unbacked-candidate", &hash)
let s = parent_span.child_builder("unbacked-candidate").with_candidate(&hash);
let s = if let Some(para_id) = para_id {
s.with_para_id(para_id)
} else {
s
};

s.build()
});
Some(span)
} else {
None
}
}

fn get_unbacked_validation_child(&mut self, parent_span: &jaeger::Span, hash: CandidateHash) -> Option<jaeger::Span> {
self.insert_or_get_unbacked_span(parent_span, hash)
fn get_unbacked_validation_child(
&mut self,
parent_span: &jaeger::Span,
hash: CandidateHash,
para_id: ParaId,
) -> Option<jaeger::Span> {
self.insert_or_get_unbacked_span(parent_span, hash, Some(para_id))
.map(|span| {
span.child_builder("validation")
.with_candidate(&hash)
Expand All @@ -938,7 +968,7 @@ impl CandidateBackingJob {
hash: CandidateHash,
validator: ValidatorIndex,
) -> Option<jaeger::Span> {
self.insert_or_get_unbacked_span(parent_span, hash).map(|span| {
self.insert_or_get_unbacked_span(parent_span, hash, None).map(|span| {
span.child_builder("import-statement")
.with_candidate(&hash)
.with_validator_index(validator)
Expand Down Expand Up @@ -1055,8 +1085,7 @@ impl util::JobTrait for CandidateBackingJob {
};

drop(_span);
let _span = span.child("calc-validator-groups");

let mut assignments_span = span.child("compute-assignments");

let mut groups = HashMap::new();

Expand Down Expand Up @@ -1085,11 +1114,18 @@ impl util::JobTrait for CandidateBackingJob {
};

let (assignment, required_collator) = match assignment {
None => (None, None),
Some((assignment, required_collator)) => (Some(assignment), required_collator),
None => {
assignments_span.add_string_tag("assigned", "false");
(None, None)
}
Some((assignment, required_collator)) => {
assignments_span.add_string_tag("assigned", "true");
assignments_span.add_para_id(assignment);
(Some(assignment), required_collator)
}
};

drop(_span);
drop(assignments_span);
let _span = span.child("wait-for-job");

let (background_tx, background_rx) = mpsc::channel(16);
Expand Down
21 changes: 17 additions & 4 deletions node/core/candidate-selection/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ impl JobTrait for CandidateSelectionJob {
let cores = try_runtime_api!(cores);

drop(_span);
let _span = span.child_builder("find-assignment")
let _span = span.child_builder("validator-construction")
.with_relay_parent(&relay_parent)
.with_stage(jaeger::Stage::CandidateSelection)
.build();
Expand All @@ -134,6 +134,11 @@ impl JobTrait for CandidateSelectionJob {
Err(err) => return Err(Error::Util(err)),
};

let mut assignment_span = span.child_builder("find-assignment")
.with_relay_parent(&relay_parent)
.with_stage(jaeger::Stage::CandidateSelection)
.build();

let mut assignment = None;

for (idx, core) in cores.into_iter().enumerate() {
Expand All @@ -151,11 +156,19 @@ impl JobTrait for CandidateSelectionJob {
}

let assignment = match assignment {
Some(assignment) => assignment,
None => return Ok(()),
Some(assignment) => {
assignment_span.add_string_tag("assigned", "true");
assignment_span.add_para_id(assignment);

assignment
}
None => {
assignment_span.add_string_tag("assigned", "false");
return Ok(())
}
};

drop(_span);
drop(assignment_span);

CandidateSelectionJob::new(assignment, metrics, sender, receiver).run_loop(&span).await
}.boxed()
Expand Down
8 changes: 5 additions & 3 deletions node/core/provisioner/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,10 @@ impl ProvisioningJob {
}
}
Some(ProvisionableData(_, data)) => {
let _span = span.child("provisionable-data");
let span = span.child("provisionable-data");
let _timer = self.metrics.time_provisionable_data();

self.note_provisionable_data(data);
self.note_provisionable_data(&span, data);
}
None => break,
},
Expand Down Expand Up @@ -239,12 +239,14 @@ impl ProvisioningJob {
}

#[tracing::instrument(level = "trace", skip(self), fields(subsystem = LOG_TARGET))]
fn note_provisionable_data(&mut self, provisionable_data: ProvisionableData) {
fn note_provisionable_data(&mut self, span: &jaeger::Span, provisionable_data: ProvisionableData) {
match provisionable_data {
ProvisionableData::Bitfield(_, signed_bitfield) => {
self.signed_bitfields.push(signed_bitfield)
}
ProvisionableData::BackedCandidate(backed_candidate) => {
let mut span = span.child("provisionable-backed");
span.add_para_id(backed_candidate.descriptor().para_id);
self.backed_candidates.push(backed_candidate)
}
_ => {}
Expand Down
15 changes: 14 additions & 1 deletion node/jaeger/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
//! ```
use sp_core::traits::SpawnNamed;
use polkadot_primitives::v1::{CandidateHash, Hash, PoV, ValidatorIndex, BlakeTwo256, HashT};
use polkadot_primitives::v1::{CandidateHash, Hash, PoV, ValidatorIndex, BlakeTwo256, HashT, Id as ParaId};
use parity_scale_codec::Encode;
use sc_network::PeerId;

Expand Down Expand Up @@ -204,6 +204,14 @@ impl SpanBuilder {
self.span.add_string_tag("candidate-hash", &format!("{:?}", candidate_hash.0));
self
}

/// Attach a para-id to the span.
#[inline(always)]
pub fn with_para_id(mut self, para_id: ParaId) -> Self {
self.span.add_para_id(para_id);
self
}

/// Attach a candidate stage.
/// Should always come with a `CandidateHash`.
#[inline(always)]
Expand Down Expand Up @@ -305,6 +313,11 @@ impl Span {
}
}

/// Add the para-id to the span.
pub fn add_para_id(&mut self, para_id: ParaId) {
self.add_int_tag("para-id", u32::from(para_id) as i64);
}

/// Add an additional tag to the span.
pub fn add_string_tag(&mut self, tag: &str, value: &str) {
match self {
Expand Down
19 changes: 17 additions & 2 deletions node/network/collator-protocol/src/validator_side.rs
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,9 @@ struct CollationRequest {

// The id of this request.
request_id: RequestId,

// A jaeger span corresponding to the lifetime of the request.
span: Option<jaeger::Span>,
}

impl CollationRequest {
Expand All @@ -147,11 +150,18 @@ impl CollationRequest {
received,
timeout,
request_id,
mut span,
} = self;

match received.timeout(timeout).await {
None => Timeout(request_id),
Some(_) => Received(request_id),
None => {
span.as_mut().map(|s| s.add_string_tag("success", "false"));
Timeout(request_id)
}
Some(_) => {
span.as_mut().map(|s| s.add_string_tag("success", "true"));
Received(request_id)
}
}
}
}
Expand Down Expand Up @@ -466,6 +476,11 @@ where
received: rx,
timeout: state.request_timeout,
request_id,
span: state.span_per_relay_parent.get(&relay_parent).map(|s| {
s.child_builder("collation-request")
.with_para_id(para_id)
.build()
}),
};

state.requested_collations.insert((relay_parent, para_id.clone(), peer_id.clone()), request_id);
Expand Down

0 comments on commit fb3c6c8

Please sign in to comment.