Skip to content

Commit

Permalink
Merge #2823
Browse files Browse the repository at this point in the history
2823: [All] OpenTracing to OpenTelemetry migration r=SaveTheRbtz a=SaveTheRbtz

Issue: #2796

OpenTracing spec is deprecated: opentracing/specification#163.  Switching to the supported OpenTelemetry implementation.

Note that new tracing code would honor [OTEL env vars](https://github.com/open-telemetry/opentelemetry-specification/blob/v1.12.0/specification/protocol/exporter.md) instead of JAEGER ones, e.g.:
`JAEGER_ENDPOINT` -> `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT`, `OTEL_EXPORTER_OTLP_TRACES_INSECURE`, etc

(cc: `@sjonpaulbrown` `@haroldsphinx)`

New API is slightly slower and consumes more memory but does substantially less allocs (~ -50%):
```
name                              old time/op    new time/op    delta
StartSpanFromParent-8                485ns ± 6%     639ns ± 1%  +31.71%  (p=0.000 n=10+9)
StartTransactionSpan/cacheHit-8      993ns ± 4%     927ns ± 3%   -6.69%  (p=0.000 n=10+10)
StartTransactionSpan/cacheMiss-8    2.24µs ± 1%    2.19µs ± 1%   -2.42%  (p=0.000 n=10+10)

name                              old alloc/op   new alloc/op   delta
StartSpanFromParent-8                 577B ± 0%      864B ± 0%  +49.74%  (p=0.000 n=10+10)
StartTransactionSpan/cacheHit-8       776B ± 1%      944B ± 0%  +21.65%  (p=0.000 n=10+8)
StartTransactionSpan/cacheMiss-8    2.16kB ± 1%    2.39kB ± 0%  +10.62%  (p=0.000 n=10+10)

name                              old allocs/op  new allocs/op  delta
StartSpanFromParent-8                 7.00 ± 0%      4.00 ± 0%  -42.86%  (p=0.000 n=10+10)
StartTransactionSpan/cacheHit-8       10.3 ± 7%       6.0 ± 0%  -41.75%  (p=0.000 n=10+10)
StartTransactionSpan/cacheMiss-8      35.3 ± 2%      18.0 ± 0%  -49.01%  (p=0.000 n=10+10)
```

An example of both extensive tracing and cadence tracing enabled:
<img width="1459" alt="Screen Shot 2022-07-25 at 10 21 07 PM" src="https://user-images.githubusercontent.com/169976/180929610-edbfdc54-e604-401f-aea5-501af2f861b2.png">

- [x] test against localnet's Tempo instance.
- [x] noop tracer.
- [x] log tracer.
- [x] merge cadence bits onflow/cadence#1824.
- [x] merge onflow/flow-core-contracts#298.
- [x] update cadence.

Co-authored-by: Alexey Ivanov <[email protected]>
  • Loading branch information
bors[bot] and SaveTheRbtz authored Jul 26, 2022
2 parents 888b34b + 93370c8 commit f9c16a5
Show file tree
Hide file tree
Showing 47 changed files with 847 additions and 945 deletions.
6 changes: 3 additions & 3 deletions engine/common/follower/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (e *Engine) onBlockResponse(originID flow.Identifier, res *messages.BlockRe
func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.BlockProposal, inRangeBlockResponse bool) error {

span, ctx, _ := e.tracer.StartBlockSpan(context.Background(), proposal.Header.ID(), trace.FollowerOnBlockProposal)
defer span.Finish()
defer span.End()

header := proposal.Header

Expand Down Expand Up @@ -356,7 +356,7 @@ func (e *Engine) onBlockProposal(originID flow.Identifier, proposal *messages.Bl
func (e *Engine) processBlockAndDescendants(ctx context.Context, proposal *messages.BlockProposal, inRangeBlockResponse bool) error {

span, ctx := e.tracer.StartSpanFromContext(ctx, trace.FollowerProcessBlockProposal)
defer span.Finish()
defer span.End()

header := proposal.Header

Expand Down Expand Up @@ -431,7 +431,7 @@ func (e *Engine) processBlockAndDescendants(ctx context.Context, proposal *messa
func (e *Engine) processPendingChildren(ctx context.Context, header *flow.Header, inRangeBlockResponse bool) error {

span, ctx := e.tracer.StartSpanFromContext(ctx, trace.FollowerProcessPendingChildren)
defer span.Finish()
defer span.End()

blockID := header.ID()

Expand Down
31 changes: 16 additions & 15 deletions engine/consensus/compliance/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,8 @@ import (
"fmt"
"time"

"github.com/opentracing/opentracing-go/log"
"github.com/rs/zerolog"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
Expand Down Expand Up @@ -99,16 +98,14 @@ func (c *Core) OnBlockProposal(originID flow.Identifier, proposal *messages.Bloc

span, _, isSampled := c.tracer.StartBlockSpan(context.Background(), proposal.Header.ID(), trace.CONCompOnBlockProposal)
if isSampled {
span.LogFields(log.Uint64("view", proposal.Header.View))
span.LogFields(log.String("origin_id", originID.String()))

// set proposer as a tag so we can filter based on proposer
span.SetTag("proposer", proposal.Header.ProposerID.String())
if sc, ok := span.Context().(jaeger.SpanContext); ok {
traceID = sc.TraceID().String()
}
span.SetAttributes(
attribute.Int64("view", int64(proposal.Header.View)),
attribute.String("origin_id", originID.String()),
attribute.String("proposer", proposal.Header.ProposerID.String()),
)
traceID = span.SpanContext().TraceID().String()
}
defer span.Finish()
defer span.End()

header := proposal.Header
log := c.log.With().
Expand Down Expand Up @@ -298,9 +295,11 @@ func (c *Core) processBlockProposal(proposal *messages.BlockProposal) error {

span, ctx, isSampled := c.tracer.StartBlockSpan(context.Background(), proposal.Header.ID(), trace.ConCompProcessBlockProposal)
if isSampled {
span.SetTag("proposer", proposal.Header.ProposerID.String())
span.SetAttributes(
attribute.String("proposer", proposal.Header.ProposerID.String()),
)
}
defer span.Finish()
defer span.End()

header := proposal.Header
log := c.log.With().
Expand Down Expand Up @@ -353,9 +352,11 @@ func (c *Core) OnBlockVote(originID flow.Identifier, vote *messages.BlockVote) e

span, _, isSampled := c.tracer.StartBlockSpan(context.Background(), vote.BlockID, trace.CONCompOnBlockVote)
if isSampled {
span.LogFields(log.String("origin_id", originID.String()))
span.SetAttributes(
attribute.String("origin_id", originID.String()),
)
}
defer span.Finish()
defer span.End()

v := &model.Vote{
View: vote.View,
Expand Down
7 changes: 5 additions & 2 deletions engine/consensus/ingestion/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"

"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -61,9 +62,11 @@ func (e *Core) OnGuarantee(originID flow.Identifier, guarantee *flow.CollectionG

span, _, isSampled := e.tracer.StartCollectionSpan(context.Background(), guarantee.CollectionID, trace.CONIngOnCollectionGuarantee)
if isSampled {
span.LogKV("originID", originID.String())
span.SetAttributes(
attribute.String("originID", originID.String()),
)
}
defer span.Finish()
defer span.End()

guaranteeID := guarantee.ID()

Expand Down
12 changes: 7 additions & 5 deletions engine/consensus/matching/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"math"
"time"

glog "github.com/opentracing/opentracing-go/log"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"

"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/model/flow"
Expand Down Expand Up @@ -163,10 +163,12 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {

receiptSpan, _, isSampled := c.tracer.StartBlockSpan(context.Background(), receipt.ExecutionResult.BlockID, trace.CONMatchProcessReceipt)
if isSampled {
receiptSpan.LogFields(glog.String("result_id", receipt.ExecutionResult.ID().String()))
receiptSpan.LogFields(glog.String("executor", receipt.ExecutorID.String()))
receiptSpan.SetAttributes(
attribute.String("result_id", receipt.ExecutionResult.ID().String()),
attribute.String("executor", receipt.ExecutorID.String()),
)
}
defer receiptSpan.Finish()
defer receiptSpan.End()

initialState, finalState, err := getStartAndEndStates(receipt)
if err != nil {
Expand Down Expand Up @@ -207,7 +209,7 @@ func (c *Core) processReceipt(receipt *flow.ExecutionReceipt) (bool, error) {

childSpan := c.tracer.StartSpanFromParent(receiptSpan, trace.CONMatchProcessReceiptVal)
err = c.receiptValidator.Validate(receipt)
childSpan.Finish()
childSpan.End()

if engine.IsUnverifiableInputError(err) {
// If previous result is missing, we can't validate this receipt.
Expand Down
24 changes: 13 additions & 11 deletions engine/consensus/sealing/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ import (
"time"

"github.com/gammazero/workerpool"
"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/rs/zerolog"
"go.opentelemetry.io/otel/attribute"
otelTrace "go.opentelemetry.io/otel/trace"

"github.com/onflow/flow-go/crypto/hash"
"github.com/onflow/flow-go/engine"
Expand Down Expand Up @@ -304,7 +304,7 @@ func (c *Core) processIncorporatedResult(incRes *flow.IncorporatedResult) error
func (c *Core) ProcessIncorporatedResult(result *flow.IncorporatedResult) error {

span, _, _ := c.tracer.StartBlockSpan(context.Background(), result.Result.BlockID, trace.CONSealingProcessIncorporatedResult)
defer span.Finish()
defer span.End()

err := c.processIncorporatedResult(result)
// We expect only engine.OutdatedInputError. If we encounter UnverifiableInputError or InvalidInputError, we
Expand Down Expand Up @@ -354,10 +354,12 @@ func (c *Core) ProcessApproval(approval *flow.ResultApproval) error {

span, _, isSampled := c.tracer.StartBlockSpan(context.Background(), approval.Body.BlockID, trace.CONSealingProcessApproval)
if isSampled {
span.LogFields(log.String("approverId", approval.Body.ApproverID.String()))
span.LogFields(log.Uint64("chunkIndex", approval.Body.ChunkIndex))
span.SetAttributes(
attribute.String("approverId", approval.Body.ApproverID.String()),
attribute.Int64("chunkIndex", int64(approval.Body.ChunkIndex)),
)
}
defer span.Finish()
defer span.End()

startTime := time.Now()
err := c.processApproval(approval)
Expand Down Expand Up @@ -503,7 +505,7 @@ func (c *Core) processPendingApprovals(collector approvals.AssignmentCollectorSt
func (c *Core) ProcessFinalizedBlock(finalizedBlockID flow.Identifier) error {

processFinalizedBlockSpan, _, _ := c.tracer.StartBlockSpan(context.Background(), finalizedBlockID, trace.CONSealingProcessFinalizedBlock)
defer processFinalizedBlockSpan.Finish()
defer processFinalizedBlockSpan.End()

// STEP 0: Collect auxiliary information
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -545,14 +547,14 @@ func (c *Core) ProcessFinalizedBlock(finalizedBlockID flow.Identifier) error {
checkEmergencySealingSpan := c.tracer.StartSpanFromParent(processFinalizedBlockSpan, trace.CONSealingCheckForEmergencySealableBlocks)
// check if there are stale results qualified for emergency sealing
err = c.checkEmergencySealing(sealingObservation, lastBlockWithFinalizedSeal.Height, finalized.Height)
checkEmergencySealingSpan.Finish()
checkEmergencySealingSpan.End()
if err != nil {
return fmt.Errorf("could not check emergency sealing at block %v", finalizedBlockID)
}

requestPendingApprovalsSpan := c.tracer.StartSpanFromParent(processFinalizedBlockSpan, trace.CONSealingRequestingPendingApproval)
err = c.requestPendingApprovals(sealingObservation, lastBlockWithFinalizedSeal.Height, finalized.Height)
requestPendingApprovalsSpan.Finish()
requestPendingApprovalsSpan.End()
if err != nil {
return fmt.Errorf("internal error while requesting pending approvals: %w", err)
}
Expand All @@ -574,9 +576,9 @@ func (c *Core) ProcessFinalizedBlock(finalizedBlockID flow.Identifier) error {
// Furthermore, it removes obsolete entries from AssignmentCollectorTree, RequestTracker
// and IncorporatedResultSeals mempool.
// We do _not_ expect any errors during normal operations.
func (c *Core) prune(parentSpan opentracing.Span, finalized, lastSealed *flow.Header) error {
func (c *Core) prune(parentSpan otelTrace.Span, finalized, lastSealed *flow.Header) error {
pruningSpan := c.tracer.StartSpanFromParent(parentSpan, trace.CONSealingPruning)
defer pruningSpan.Finish()
defer pruningSpan.End()

err := c.collectorTree.FinalizeForkAtLevel(finalized, lastSealed) // stop collecting approvals for orphan collectors
if err != nil {
Expand Down
59 changes: 29 additions & 30 deletions engine/execution/computation/computer/computer.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ import (
"sync"
"time"

"github.com/opentracing/opentracing-go"
"github.com/opentracing/opentracing-go/log"
"github.com/rs/zerolog"
"github.com/uber/jaeger-client-go"
"go.opentelemetry.io/otel/attribute"
otelTrace "go.opentelemetry.io/otel/trace"

"github.com/onflow/flow-go/engine/execution"
"github.com/onflow/flow-go/engine/execution/state/delta"
Expand Down Expand Up @@ -101,9 +100,9 @@ func (e *blockComputer) ExecuteBlock(

span, _, isSampled := e.tracer.StartBlockSpan(ctx, block.ID(), trace.EXEComputeBlock)
if isSampled {
span.LogFields(log.Int("collection_counts", len(block.CompleteCollections)))
span.SetAttributes(attribute.Int("collection_counts", len(block.CompleteCollections)))
}
defer span.Finish()
defer span.End()

results, err := e.executeBlock(span, block, stateView, program)
if err != nil {
Expand All @@ -116,7 +115,7 @@ func (e *blockComputer) ExecuteBlock(
}

func (e *blockComputer) executeBlock(
blockSpan opentracing.Span,
blockSpan otelTrace.Span,
block *entity.ExecutableBlock,
stateView state.View,
programs *programs.Programs,
Expand Down Expand Up @@ -223,7 +222,7 @@ func (e *blockComputer) executeBlock(
}

func (e *blockComputer) executeSystemCollection(
blockSpan opentracing.Span,
blockSpan otelTrace.Span,
collectionIndex int,
txIndex uint32,
systemChunkCtx fvm.Context,
Expand All @@ -233,7 +232,7 @@ func (e *blockComputer) executeSystemCollection(
) (uint32, error) {

colSpan := e.tracer.StartSpanFromParent(blockSpan, trace.EXEComputeSystemCollection)
defer colSpan.Finish()
defer colSpan.End()

tx, err := blueprints.SystemChunkTransaction(e.vmCtx.Chain)
if err != nil {
Expand Down Expand Up @@ -266,7 +265,7 @@ func (e *blockComputer) executeSystemCollection(
}

func (e *blockComputer) executeCollection(
blockSpan opentracing.Span,
blockSpan otelTrace.Span,
collectionIndex int,
txIndex uint32,
blockCtx fvm.Context,
Expand All @@ -286,11 +285,11 @@ func (e *blockComputer) executeCollection(
computationUsedUpToNow := res.ComputationUsed
colSpan := e.tracer.StartSpanFromParent(blockSpan, trace.EXEComputeCollection)
defer func() {
colSpan.SetTag("collection.txCount", len(collection.Transactions))
colSpan.LogFields(
log.String("collection.hash", collection.Guarantee.CollectionID.String()),
colSpan.SetAttributes(
attribute.Int("collection.txCount", len(collection.Transactions)),
attribute.String("collection.hash", collection.Guarantee.CollectionID.String()),
)
colSpan.Finish()
colSpan.End()
}()

txCtx := fvm.NewContextFromParent(blockCtx, fvm.WithMetricsReporter(e.metrics), fvm.WithTracer(e.tracer))
Expand All @@ -316,7 +315,7 @@ func (e *blockComputer) executeCollection(

func (e *blockComputer) executeTransaction(
txBody *flow.TransactionBody,
colSpan opentracing.Span,
colSpan otelTrace.Span,
collectionView state.View,
programs *programs.Programs,
ctx fvm.Context,
Expand All @@ -331,20 +330,20 @@ func (e *blockComputer) executeTransaction(

// we capture two spans one for tx-based view and one for the current context (block-based) view
txSpan := e.tracer.StartSpanFromParent(colSpan, trace.EXEComputeTransaction)
txSpan.LogFields(log.String("tx_id", txID.String()))
txSpan.LogFields(log.Uint32("tx_index", txIndex))
txSpan.LogFields(log.Int("col_index", collectionIndex))
defer txSpan.Finish()
txSpan.SetAttributes(
attribute.String("tx_id", txID.String()),
attribute.Int64("tx_index", int64(txIndex)),
attribute.Int("col_index", collectionIndex),
)
defer txSpan.End()

var traceID string
txInternalSpan, _, isSampled := e.tracer.StartTransactionSpan(context.Background(), txID, trace.EXERunTransaction)
if isSampled {
txInternalSpan.LogFields(log.String("tx_id", txID.String()))
if sc, ok := txInternalSpan.Context().(jaeger.SpanContext); ok {
traceID = sc.TraceID().String()
}
txInternalSpan.SetAttributes(attribute.String("tx_id", txID.String()))
traceID = txInternalSpan.SpanContext().TraceID().String()
}
defer txInternalSpan.Finish()
defer txInternalSpan.End()

e.log.Info().
Str("tx_id", txID.String()).
Expand Down Expand Up @@ -379,7 +378,7 @@ func (e *blockComputer) executeTransaction(
}

postProcessSpan := e.tracer.StartSpanFromParent(txSpan, trace.EXEPostProcessTransaction)
defer postProcessSpan.Finish()
defer postProcessSpan.End()

// always merge the view, fvm take cares of reverting changes
// of failed transaction invocation
Expand Down Expand Up @@ -429,11 +428,11 @@ func (e *blockComputer) executeTransaction(

func (e *blockComputer) mergeView(
parent, child state.View,
parentSpan opentracing.Span,
parentSpan otelTrace.Span,
mergeSpanName trace.SpanName) error {

mergeSpan := e.tracer.StartSpanFromParent(parentSpan, mergeSpanName)
defer mergeSpan.Finish()
defer mergeSpan.End()

return parent.MergeView(child)
}
Expand All @@ -444,7 +443,7 @@ type blockCommitter struct {
state flow.StateCommitment
views chan state.View
closeOnce sync.Once
blockSpan opentracing.Span
blockSpan otelTrace.Span

res *execution.ComputationResult
}
Expand All @@ -462,7 +461,7 @@ func (bc *blockCommitter) Run() {
bc.res.TrieUpdates = append(bc.res.TrieUpdates, trieUpdate)

bc.state = stateCommit
span.Finish()
span.End()
}
}

Expand All @@ -478,7 +477,7 @@ type eventHasher struct {
tracer module.Tracer
data chan flow.EventsList
closeOnce sync.Once
blockSpan opentracing.Span
blockSpan otelTrace.Span

res *execution.ComputationResult
}
Expand All @@ -493,7 +492,7 @@ func (eh *eventHasher) Run() {

eh.res.EventsHashes = append(eh.res.EventsHashes, rootHash)

span.Finish()
span.End()
}
}

Expand Down
Loading

0 comments on commit f9c16a5

Please sign in to comment.