Skip to content

Commit

Permalink
tracing: add protocol messages client tracing
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo committed Aug 18, 2023
1 parent 62a416e commit c4a4739
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 47 deletions.
9 changes: 1 addition & 8 deletions fullrt/dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
kadkey "github.com/libp2p/go-libp2p-xor/key"
"github.com/libp2p/go-libp2p-xor/trie"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -1303,16 +1302,10 @@ func (dht *FullRT) findProvidersAsyncRoutine(ctx context.Context, key multihash.
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetProviders", trace.WithAttributes(attribute.Stringer("peer", p)))
provs, closest, err := dht.protoMessenger.GetProviders(mctx, p, key)
provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
return err
}
mspan.End()

logger.Debugf("%d provider entries", len(provs))

Expand Down
10 changes: 1 addition & 9 deletions lookup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
kb "github.com/libp2p/go-libp2p-kbucket"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

Expand Down Expand Up @@ -64,13 +62,8 @@ func (dht *IpfsDHT) pmGetClosestPeers(key string) queryFn {
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, peer.ID(key))
peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, peer.ID(key))
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Type: routing.QueryError,
Expand All @@ -79,7 +72,6 @@ func (dht *IpfsDHT) pmGetClosestPeers(key string) queryFn {
})
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Expand Down
109 changes: 101 additions & 8 deletions pb/protocol_messenger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multihash"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"

"github.com/libp2p/go-libp2p-kad-dht/internal"
)
Expand Down Expand Up @@ -60,7 +62,18 @@ type MessageSender interface {
}

// PutValue asks a peer to store the given key/value pair.
func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb.Record) error {
func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb.Record) (err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.PutValue")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p), attribute.Stringer("record", rec))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}()
}

pmes := NewMessage(Message_PUT_VALUE, rec.Key, 0)
pmes.Record = rec
rpmes, err := pm.m.SendRequest(ctx, p, pmes)
Expand All @@ -80,7 +93,27 @@ func (pm *ProtocolMessenger) PutValue(ctx context.Context, p peer.ID, rec *recpb

// GetValue asks a peer for the value corresponding to the given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string) (*recpb.Record, []*peer.AddrInfo, error) {
func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string) (record *recpb.Record, closerPeers []*peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.GetValue")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p), internal.KeyAsAttribute("key", key))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
peers := make([]string, len(closerPeers))
for i, v := range closerPeers {
peers[i] = v.String()
}
span.SetAttributes(
attribute.Stringer("record", record),
attribute.StringSlice("closestPeers", peers),
)
}
}()
}

pmes := NewMessage(Message_GET_VALUE, []byte(key), 0)
respMsg, err := pm.m.SendRequest(ctx, p, pmes)
if err != nil {
Expand Down Expand Up @@ -109,7 +142,24 @@ func (pm *ProtocolMessenger) GetValue(ctx context.Context, p peer.ID, key string
// GetClosestPeers asks a peer to return the K (a DHT-wide parameter) DHT server peers closest in XOR space to the id
// Note: If the peer happens to know another peer whose peerID exactly matches the given id it will return that peer
// even if that peer is not a DHT server node.
func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id peer.ID) ([]*peer.AddrInfo, error) {
func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id peer.ID) (closerPeers []*peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.GetClosestPeers")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p), attribute.Stringer("key", id))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
peers := make([]string, len(closerPeers))
for i, v := range closerPeers {
peers[i] = v.String()
}
span.SetAttributes(attribute.StringSlice("peers", peers))
}
}()
}

pmes := NewMessage(Message_FIND_NODE, []byte(id), 0)
respMsg, err := pm.m.SendRequest(ctx, p, pmes)
if err != nil {
Expand All @@ -120,7 +170,18 @@ func (pm *ProtocolMessenger) GetClosestPeers(ctx context.Context, p peer.ID, id
}

// PutProvider asks a peer to store that we are a provider for the given key.
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, host host.Host) error {
func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key multihash.Multihash, host host.Host) (err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.PutProvider")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p), attribute.Stringer("key", key))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}()
}

pi := peer.AddrInfo{
ID: host.ID(),
Addrs: host.Addrs(),
Expand All @@ -140,19 +201,51 @@ func (pm *ProtocolMessenger) PutProvider(ctx context.Context, p peer.ID, key mul

// GetProviders asks a peer for the providers it knows of for a given key. Also returns the K closest peers to the key
// as described in GetClosestPeers.
func (pm *ProtocolMessenger) GetProviders(ctx context.Context, p peer.ID, key multihash.Multihash) ([]*peer.AddrInfo, []*peer.AddrInfo, error) {
func (pm *ProtocolMessenger) GetProviders(ctx context.Context, p peer.ID, key multihash.Multihash) (provs []*peer.AddrInfo, closerPeers []*peer.AddrInfo, err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.GetProviders")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p), attribute.Stringer("key", key))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
} else {
provsStr := make([]string, len(provs))
for i, v := range provs {
provsStr[i] = v.String()
}
closerPeersStr := make([]string, len(provs))
for i, v := range closerPeers {
closerPeersStr[i] = v.String()
}
span.SetAttributes(attribute.StringSlice("provs", provsStr), attribute.StringSlice("closestPeers", closerPeersStr))
}
}()
}

pmes := NewMessage(Message_GET_PROVIDERS, key, 0)
respMsg, err := pm.m.SendRequest(ctx, p, pmes)
if err != nil {
return nil, nil, err
}
provs := PBPeersToPeerInfos(respMsg.GetProviderPeers())
closerPeers := PBPeersToPeerInfos(respMsg.GetCloserPeers())
provs = PBPeersToPeerInfos(respMsg.GetProviderPeers())
closerPeers = PBPeersToPeerInfos(respMsg.GetCloserPeers())
return provs, closerPeers, nil
}

// Ping sends a ping message to the passed peer and waits for a response.
func (pm *ProtocolMessenger) Ping(ctx context.Context, p peer.ID) error {
func (pm *ProtocolMessenger) Ping(ctx context.Context, p peer.ID) (err error) {
ctx, span := internal.StartSpan(ctx, "ProtocolMessenger.Ping")
defer span.End()
if span.IsRecording() {
span.SetAttributes(attribute.Stringer("to", p))
defer func() {
if err != nil {
span.SetStatus(codes.Error, err.Error())
}
}()
}

req := NewMessage(Message_PING, nil, 0)
resp, err := pm.m.SendRequest(ctx, p, req)
if err != nil {
Expand Down
25 changes: 3 additions & 22 deletions routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/libp2p/go-libp2p/core/peerstore"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

u "github.com/ipfs/boxo/util"
Expand Down Expand Up @@ -317,17 +316,11 @@ func (dht *IpfsDHT) getValues(ctx context.Context, key string, stopQuery chan st
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetValue", trace.WithAttributes(attribute.Stringer("peer", p)))
rec, peers, err := dht.protoMessenger.GetValue(mctx, p, key)
rec, peers, err := dht.protoMessenger.GetValue(ctx, p, key)
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Expand Down Expand Up @@ -584,16 +577,10 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key multihash
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetProviders", trace.WithAttributes(attribute.Stringer("peer", p)))
provs, closest, err := dht.protoMessenger.GetProviders(mctx, p, key)
provs, closest, err := dht.protoMessenger.GetProviders(ctx, p, key)
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
return nil, err
}
mspan.End()

logger.Debugf("%d provider entries", len(provs))

Expand Down Expand Up @@ -665,17 +652,11 @@ func (dht *IpfsDHT) FindPeer(ctx context.Context, id peer.ID) (_ peer.AddrInfo,
ID: p,
})

mctx, mspan := internal.StartSpan(ctx, "protoMessenger.GetClosestPeers", trace.WithAttributes(attribute.Stringer("peer", p)))
peers, err := dht.protoMessenger.GetClosestPeers(mctx, p, id)
peers, err := dht.protoMessenger.GetClosestPeers(ctx, p, id)
if err != nil {
if mspan.IsRecording() {
mspan.SetStatus(codes.Error, err.Error())
}
mspan.End()
logger.Debugf("error getting closer peers: %s", err)
return nil, err
}
mspan.End()

// For DHT query command
routing.PublishQueryEvent(ctx, &routing.QueryEvent{
Expand Down

0 comments on commit c4a4739

Please sign in to comment.