Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

feat: add basic tracing #562

Merged
merged 1 commit into from
Jun 14, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"time"

delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

deciface "github.com/ipfs/go-bitswap/decision"
"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
"github.com/ipfs/go-bitswap/internal/decision"
"github.com/ipfs/go-bitswap/internal/defaults"
Expand Down Expand Up @@ -425,8 +428,10 @@ type counters struct {

// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *Bitswap) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, bs.GetBlocks)
func (bs *Bitswap) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlock", trace.WithAttributes(attribute.String("Key", k.String())))
defer span.End()
return bsgetter.SyncGetBlock(ctx, k, bs.GetBlocks)
}

// WantlistForPeer returns the currently understood list of blocks requested by a
Expand All @@ -453,13 +458,17 @@ func (bs *Bitswap) LedgerForPeer(p peer.ID) *decision.Receipt {
// resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.Int("NumKeys", len(keys))))
defer span.End()
session := bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
return session.GetBlocks(ctx, keys)
}

// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
ctx, span := internal.StartSpan(ctx, "GetBlocks", trace.WithAttributes(attribute.String("Block", blk.Cid().String())))
defer span.End()
return bs.receiveBlocksFrom(ctx, "", []blocks.Block{blk}, nil, nil)
}

Expand Down Expand Up @@ -696,5 +705,7 @@ func (bs *Bitswap) IsOnline() bool {
// be more efficient in its requests to peers. If you are using a session
// from go-blockservice, it will create a bitswap session automatically.
func (bs *Bitswap) NewSession(ctx context.Context) exchange.Fetcher {
ctx, span := internal.StartSpan(ctx, "NewSession")
defer span.End()
return bs.sm.NewSession(ctx, bs.provSearchDelay, bs.rebroadcastDelay)
}
6 changes: 5 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,17 @@ require (
github.com/libp2p/go-msgio v0.0.6
github.com/multiformats/go-multiaddr v0.3.3
github.com/multiformats/go-multistream v0.2.2
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.opentelemetry.io/otel v1.7.0
go.opentelemetry.io/otel/trace v1.7.0
go.uber.org/zap v1.16.0
)

require (
github.com/btcsuite/btcd v0.21.0-beta // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/huin/goupnp v1.0.0 // indirect
Expand Down
14 changes: 13 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,11 @@ github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgO
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.3 h1:2DntVwHkVopvECVRSlL5PSo9eG+cAkDCuckLubN+rq0=
github.com/go-logr/logr v1.2.3/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s=
Expand Down Expand Up @@ -179,6 +184,8 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ=
github.com/google/go-querystring v1.0.0/go.mod h1:odCYkC5MyYFN7vkCjXpyrEuKhc/BUO6wN/zVPAxq5ck=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
Expand Down Expand Up @@ -817,8 +824,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/syndtr/goleveldb v1.0.0/go.mod h1:ZVVdQEZoIme9iO1Ch2Jdy24qqXrMMOU6lpPAyBWyWuQ=
github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA=
github.com/tmc/grpc-websocket-proxy v0.0.0-20170815181823-89b8d40f7ca8/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U=
Expand Down Expand Up @@ -851,6 +859,10 @@ go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw=
go.opencensus.io v0.23.0 h1:gqCw0LfLxScz8irSi8exQc7fyQ0fKQU/qnC/X8+V/1M=
go.opencensus.io v0.23.0/go.mod h1:XItmlyltB5F7CS4xOC1DcqMoFqwtC6OG2xF7mCv7P7E=
go.opentelemetry.io/otel v1.7.0 h1:Z2lA3Tdch0iDcrhJXDIlC94XE+bxok1F9B+4Lz/lGsM=
go.opentelemetry.io/otel v1.7.0/go.mod h1:5BdUoMIz5WEs0vt0CUEMtSSaTSHBBVwrhnz7+nrD5xk=
go.opentelemetry.io/otel/trace v1.7.0 h1:O37Iogk1lEkMRXewVtZ1BBTVn5JEp8GrJvP92bJqC6o=
go.opentelemetry.io/otel/trace v1.7.0/go.mod h1:fzLSB9nqR2eXzxPXb2JW9IKE+ScyXA48yyE4TNvoHqU=
go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE=
go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ=
Expand Down
3 changes: 1 addition & 2 deletions internal/blockpresencemanager/blockpresencemanager_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package blockpresencemanager

import (
"fmt"
"testing"

"github.com/ipfs/go-bitswap/internal/testutil"
Expand Down Expand Up @@ -233,7 +232,7 @@ func TestAllPeersDoNotHaveBlock(t *testing.T) {
bpm.AllPeersDoNotHaveBlock(tc.peers, tc.ks),
tc.exp,
) {
t.Fatal(fmt.Sprintf("test case %d failed: expected matching keys", i))
t.Fatalf("test case %d failed: expected matching keys", i)
}
}
}
6 changes: 6 additions & 0 deletions internal/getter/getter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/ipfs/go-bitswap/internal"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
logging "github.com/ipfs/go-log"

Expand All @@ -22,6 +23,9 @@ type GetBlocksFunc func(context.Context, []cid.Cid) (<-chan blocks.Block, error)
// blocks that returns a channel, and uses that function to return the
// block syncronously.
func SyncGetBlock(p context.Context, k cid.Cid, gb GetBlocksFunc) (blocks.Block, error) {
p, span := internal.StartSpan(p, "Getter.SyncGetBlock")
defer span.End()

if !k.Defined() {
log.Error("undefined cid in GetBlock")
return nil, ipld.ErrNotFound{Cid: k}
Expand Down Expand Up @@ -65,6 +69,8 @@ type WantFunc func(context.Context, []cid.Cid)
// incoming blocks.
func AsyncGetBlocks(ctx context.Context, sessctx context.Context, keys []cid.Cid, notif notifications.PubSub,
want WantFunc, cwants func([]cid.Cid)) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Getter.AsyncGetBlocks")
defer span.End()

// If there are no keys supplied, just return a closed channel
if len(keys) == 0 {
Expand Down
19 changes: 13 additions & 6 deletions internal/providerquerymanager/providerquerymanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,18 @@ type providerQueryMessage interface {
}

type receivedProviderMessage struct {
k cid.Cid
p peer.ID
ctx context.Context
k cid.Cid
p peer.ID
}

type finishedProviderQueryMessage struct {
k cid.Cid
ctx context.Context
k cid.Cid
}

type newProvideQueryMessage struct {
ctx context.Context
k cid.Cid
inProgressRequestChan chan<- inProgressRequest
}
Expand Down Expand Up @@ -120,6 +123,7 @@ func (pqm *ProviderQueryManager) FindProvidersAsync(sessionCtx context.Context,

select {
case pqm.providerQueryMessages <- &newProvideQueryMessage{
ctx: sessionCtx,
k: k,
inProgressRequestChan: inProgressRequestChan,
}:
Expand Down Expand Up @@ -244,8 +248,9 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
}
select {
case pqm.providerQueryMessages <- &receivedProviderMessage{
k: k,
p: p,
ctx: findProviderCtx,
k: k,
p: p,
}:
case <-pqm.ctx.Done():
return
Expand All @@ -256,7 +261,8 @@ func (pqm *ProviderQueryManager) findProviderWorker() {
cancel()
select {
case pqm.providerQueryMessages <- &finishedProviderQueryMessage{
k: k,
ctx: findProviderCtx,
k: k,
}:
case <-pqm.ctx.Done():
}
Expand Down Expand Up @@ -372,6 +378,7 @@ func (npqm *newProvideQueryMessage) debugMessage() string {
func (npqm *newProvideQueryMessage) handle(pqm *ProviderQueryManager) {
requestStatus, ok := pqm.inProgressRequestStatuses[npqm.k]
if !ok {

ctx, cancelFn := context.WithCancel(pqm.ctx)
requestStatus = &inProgressRequestStatus{
listeners: make(map[chan peer.ID]struct{}),
Expand Down
10 changes: 8 additions & 2 deletions internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"time"

"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
bsgetter "github.com/ipfs/go-bitswap/internal/getter"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
Expand Down Expand Up @@ -228,14 +229,19 @@ func (s *Session) logReceiveFrom(from peer.ID, interestedKs []cid.Cid, haves []c
}

// GetBlock fetches a single block.
func (s *Session) GetBlock(parent context.Context, k cid.Cid) (blocks.Block, error) {
return bsgetter.SyncGetBlock(parent, k, s.GetBlocks)
func (s *Session) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlock")
defer span.End()
return bsgetter.SyncGetBlock(ctx, k, s.GetBlocks)
}

// GetBlocks fetches a set of blocks within the context of this session and
// returns a channel that found blocks will be returned on. No order is
// guaranteed on the returned blocks.
func (s *Session) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.Block, error) {
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
defer span.End()

ctx = logging.ContextWithLoggable(ctx, s.uuid)

return bsgetter.AsyncGetBlocks(ctx, s.ctx, keys, s.notif,
Expand Down
7 changes: 7 additions & 0 deletions internal/sessionmanager/sessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package sessionmanager

import (
"context"
"strconv"
"sync"
"time"

cid "github.com/ipfs/go-cid"
delay "github.com/ipfs/go-ipfs-delay"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/ipfs/go-bitswap/internal"
bsbpm "github.com/ipfs/go-bitswap/internal/blockpresencemanager"
notifications "github.com/ipfs/go-bitswap/internal/notifications"
bssession "github.com/ipfs/go-bitswap/internal/session"
Expand Down Expand Up @@ -87,6 +91,9 @@ func (sm *SessionManager) NewSession(ctx context.Context,
rebroadcastDelay delay.D) exchange.Fetcher {
id := sm.GetNextSessionID()

ctx, span := internal.StartSpan(ctx, "SessionManager.NewSession", trace.WithAttributes(attribute.String("ID", strconv.FormatUint(id, 10))))
defer span.End()

pm := sm.peerManagerFactory(ctx, id)
session := sm.sessionFactory(ctx, sm, id, pm, sm.sessionInterestManager, sm.peerManager, sm.blockPresenceManager, sm.notif, provSearchDelay, rebroadcastDelay, sm.self)

Expand Down
13 changes: 13 additions & 0 deletions internal/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package internal

import (
"context"
"fmt"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/trace"
)

func StartSpan(ctx context.Context, name string, opts ...trace.SpanStartOption) (context.Context, trace.Span) {
return otel.Tracer("go-bitswap").Start(ctx, fmt.Sprintf("Bitswap.%s", name), opts...)
}