Skip to content

Commit

Permalink
net: address PR comments regarding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
MichaelMure committed Mar 2, 2020
1 parent 979ed6a commit faa5638
Showing 1 changed file with 40 additions and 37 deletions.
77 changes: 40 additions & 37 deletions dht_net.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
for {
var req pb.Message
msgbytes, err := r.ReadMsg()
msgLen := len(msgbytes)
if err != nil {
r.ReleaseMsg(msgbytes)
if err == io.EOF {
Expand All @@ -92,41 +93,39 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
if err.Error() != "stream reset" {
logger.Debugf("error reading message: %#v", err)
}
stats.RecordWithTags(
ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedMessages.M(1),
metrics.ReceivedBytes.M(int64(req.Size())),
)
if msgLen > 0 {
stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessages.M(1),
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedBytes.M(int64(msgLen)),
)
}
return false
}
err = req.Unmarshal(msgbytes)
r.ReleaseMsg(msgbytes)
if err != nil {
logger.Debugf("error unmarshalling message: %#v", err)
stats.RecordWithTags(
ctx,
stats.RecordWithTags(ctx,
[]tag.Mutator{tag.Upsert(metrics.KeyMessageType, "UNKNOWN")},
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedMessages.M(1),
metrics.ReceivedBytes.M(int64(req.Size())),
metrics.ReceivedMessageErrors.M(1),
metrics.ReceivedBytes.M(int64(msgLen)),
)
return false
}

timer.Reset(dhtStreamIdleTimeout)

startTime := time.Now()
ctx, _ := tag.New(
ctx,
ctx, _ := tag.New(ctx,
tag.Upsert(metrics.KeyMessageType, req.GetType().String()),
)

stats.Record(
ctx,
stats.Record(ctx,
metrics.ReceivedMessages.M(1),
metrics.ReceivedBytes.M(int64(req.Size())),
metrics.ReceivedBytes.M(int64(msgLen)),
)

handler := dht.handlerForMsgType(req.GetType())
Expand Down Expand Up @@ -168,34 +167,33 @@ func (dht *IpfsDHT) handleNewMessage(s network.Stream) bool {
func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

stats.Record(
ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
)

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
stats.Record(ctx, metrics.SentRequestErrors.M(1))
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
)
return nil, err
}

start := time.Now()

rpmes, err := ms.SendRequest(ctx, pmes)
if err != nil {
stats.Record(ctx, metrics.SentRequestErrors.M(1))
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentRequestErrors.M(1),
)
return nil, err
}

// update the peer (on valid msgs only)
dht.updateFromMessage(ctx, p, rpmes)

stats.Record(
ctx,
metrics.OutboundRequestLatency.M(
float64(time.Since(start))/float64(time.Millisecond),
),
stats.Record(ctx,
metrics.SentRequests.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
metrics.OutboundRequestLatency.M(float64(time.Since(start))/float64(time.Millisecond)),
)
dht.peerstore.RecordLatency(p, time.Since(start))
logger.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes)
Expand All @@ -206,23 +204,28 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
ctx, _ = tag.New(ctx, metrics.UpsertMessageType(pmes))

stats.Record(
ctx,
metrics.SentMessages.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
)

ms, err := dht.messageSenderForPeer(ctx, p)
if err != nil {
stats.Record(ctx, metrics.SentMessageErrors.M(1))
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
)
return err
}

if err := ms.SendMessage(ctx, pmes); err != nil {
stats.Record(ctx, metrics.SentMessageErrors.M(1))
stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentMessageErrors.M(1),
)
return err
}

stats.Record(ctx,
metrics.SentMessages.M(1),
metrics.SentBytes.M(int64(pmes.Size())),
)

logger.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
return nil
}
Expand Down

0 comments on commit faa5638

Please sign in to comment.