From 07219a775f60ad84c553d1828728cd1dc238a083 Mon Sep 17 00:00:00 2001 From: sukun Date: Thu, 23 Feb 2023 09:17:13 +0530 Subject: [PATCH] address review changes --- p2p/protocol/identify/id.go | 72 ++++++++++----------------- p2p/protocol/identify/metrics.go | 84 ++++++++++++++++---------------- 2 files changed, 69 insertions(+), 87 deletions(-) diff --git a/p2p/protocol/identify/id.go b/p2p/protocol/identify/id.go index 601f23b312..8184809c3a 100644 --- a/p2p/protocol/identify/id.go +++ b/p2p/protocol/identify/id.go @@ -307,11 +307,8 @@ func (ids *idService) sendPushes(ctx context.Context) { if err != nil { // connection might have been closed recently return } - if ids.metricsTracer != nil { - ids.metricsTracer.IdentifyPush(network.DirOutbound) - } // TODO: find out if the peer supports push if we didn't have any information about push support - if err := ids.sendIdentifyResp(str); err != nil { + if err := ids.sendIdentifyResp(str, true); err != nil { log.Debugw("failed to send identify push", "peer", c.RemotePeer(), "error", err) return } @@ -401,28 +398,19 @@ func (ids *idService) identifyConn(c network.Conn) error { return err } - if ids.metricsTracer != nil { - ids.metricsTracer.Identify(network.DirInbound) - } return ids.handleIdentifyResponse(s, false) } // handlePush handles incoming identify push streams func (ids *idService) handlePush(s network.Stream) { ids.handleIdentifyResponse(s, true) - if ids.metricsTracer != nil { - ids.metricsTracer.IdentifyPush(network.DirInbound) - } } func (ids *idService) handleIdentifyRequest(s network.Stream) { - if ids.metricsTracer != nil { - ids.metricsTracer.Identify(network.DirOutbound) - } - _ = ids.sendIdentifyResp(s) + _ = ids.sendIdentifyResp(s, false) } -func (ids *idService) sendIdentifyResp(s network.Stream) error { +func (ids *idService) sendIdentifyResp(s network.Stream, isPush bool) error { if err := s.Scope().SetService(ServiceName); err != nil { s.Reset() return fmt.Errorf("failed to attaching stream to identify service: %w", err) @@ -432,11 +420,21 @@ func (ids *idService) sendIdentifyResp(s network.Stream) error { ids.currentSnapshot.Lock() snapshot := ids.currentSnapshot.snapshot ids.currentSnapshot.Unlock() + + log.Debugw("sending snapshot", "seq", snapshot.seq, "protocols", snapshot.protocols, "addrs", snapshot.addrs) + + mes := ids.createBaseIdentifyResponse(s.Conn(), &snapshot) + mes.SignedPeerRecord = ids.getSignedRecord(&snapshot) + log.Debugf("%s sending message to %s %s", ID, s.Conn().RemotePeer(), s.Conn().RemoteMultiaddr()) - if err := ids.writeChunkedIdentifyMsg(s, &snapshot); err != nil { + if err := ids.writeChunkedIdentifyMsg(s, mes); err != nil { return err } + if ids.metricsTracer != nil { + ids.metricsTracer.IdentifySent(isPush, len(mes.Protocols), len(mes.ListenAddrs)) + } + ids.connsMu.Lock() defer ids.connsMu.Unlock() e, ok := ids.conns[s.Conn()] @@ -485,24 +483,28 @@ func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) erro ids.consumeMessage(mes, c, isPush) + if ids.metricsTracer != nil { + ids.metricsTracer.IdentifyReceived(isPush, len(mes.Protocols), len(mes.ListenAddrs)) + } + ids.connsMu.Lock() defer ids.connsMu.Unlock() e, ok := ids.conns[c] if !ok { // might already have disconnected return nil } - prevPushSupport := e.PushSupport sup, err := ids.Host.Peerstore().SupportsProtocols(c.RemotePeer(), IDPush) if supportsIdentifyPush := err == nil && len(sup) > 0; supportsIdentifyPush { e.PushSupport = identifyPushSupported } else { e.PushSupport = identifyPushUnsupported } - ids.conns[c] = e - if ids.metricsTracer != nil && e.PushSupport != prevPushSupport { - ids.metricsTracer.IncrementPushSupport(e.PushSupport) - ids.metricsTracer.DecrementPushSupport(prevPushSupport) + + if ids.metricsTracer != nil { + ids.metricsTracer.PeerPushSupport(e.PushSupport) } + + ids.conns[c] = e return nil } @@ -541,19 +543,14 @@ func (ids *idService) updateSnapshot() { log.Debugw("updating snapshot", "seq", snapshot.seq, "addrs", snapshot.addrs) } -func (ids *idService) writeChunkedIdentifyMsg(s network.Stream, snapshot *identifySnapshot) error { - c := s.Conn() - log.Debugw("sending snapshot", "seq", snapshot.seq, "protocols", snapshot.protocols, "addrs", snapshot.addrs) - - mes := ids.createBaseIdentifyResponse(c, snapshot) - sr := ids.getSignedRecord(snapshot) - mes.SignedPeerRecord = sr +func (ids *idService) writeChunkedIdentifyMsg(s network.Stream, mes *pb.Identify) error { writer := pbio.NewDelimitedWriter(s) - if sr == nil || proto.Size(mes) <= legacyIDSize { + if mes.SignedPeerRecord == nil || proto.Size(mes) <= legacyIDSize { return writer.WriteMsg(mes) } + sr := mes.SignedPeerRecord mes.SignedPeerRecord = nil if err := writer.WriteMsg(mes); err != nil { return err @@ -586,10 +583,6 @@ func (ids *idService) createBaseIdentifyResponse(conn network.Conn, snapshot *id } mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes()) } - if ids.metricsTracer != nil { - ids.metricsTracer.NumProtocols(len(mes.Protocols)) - ids.metricsTracer.NumAddrs(len(mes.ListenAddrs)) - } // set our public key ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID()) @@ -694,11 +687,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo lmaddrs = append(lmaddrs, maddr) } - if ids.metricsTracer != nil { - ids.metricsTracer.NumProtocolsReceived(len(mesProtocols)) - ids.metricsTracer.NumAddrsReceived(len(lmaddrs)) - } - // NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote // peer doesn't tell us to do so. Otherwise, we'll advertise it. // @@ -901,10 +889,6 @@ func (nn *netNotifiee) Connected(_ network.Network, c network.Conn) { ids.conns[c] = entry{} ids.connsMu.Unlock() - if ids.metricsTracer != nil { - ids.metricsTracer.IncrementPushSupport(identifyPushSupportUnknown) - } - nn.IDService().IdentifyWait(c) } @@ -913,12 +897,8 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) { // Stop tracking the connection. ids.connsMu.Lock() - e, ok := ids.conns[c] delete(ids.conns, c) ids.connsMu.Unlock() - if ok && ids.metricsTracer != nil { - ids.metricsTracer.DecrementPushSupport(e.PushSupport) - } if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected { // Last disconnect. diff --git a/p2p/protocol/identify/metrics.go b/p2p/protocol/identify/metrics.go index 109667b551..3536bb53f5 100644 --- a/p2p/protocol/identify/metrics.go +++ b/p2p/protocol/identify/metrics.go @@ -35,11 +35,11 @@ var ( }, []string{"dir"}, ) - connectionPushSupport = prometheus.NewGaugeVec( - prometheus.GaugeOpts{ + peerPushSupport = prometheus.NewCounterVec( + prometheus.CounterOpts{ Namespace: metricNamespace, - Name: "conn_push_support", - Help: "Identify Connection Push Support", + Name: "peer_push_support_total", + Help: "Identify Peer Push Support", }, []string{"support"}, ) @@ -77,7 +77,7 @@ var ( pushesTriggered, identify, identifyPush, - connectionPushSupport, + peerPushSupport, protocolsCount, addrsCount, numProtocolsReceived, @@ -91,15 +91,17 @@ var ( ) type MetricsTracer interface { + // TriggeredPushes counts IdentifyPushes triggered by event TriggeredPushes(event any) - Identify(network.Direction) - IdentifyPush(network.Direction) - IncrementPushSupport(identifyPushSupport) - DecrementPushSupport(identifyPushSupport) - NumProtocols(int) - NumAddrs(int) - NumProtocolsReceived(int) - NumAddrsReceived(int) + + // PeerPushSupport counts peers by Push Support + PeerPushSupport(identifyPushSupport) + + // IdentifyReceived tracks metrics on receiving an identify response + IdentifyReceived(isPush bool, numProtocols int, numAddrs int) + + // IdentifySent tracks metrics on sending an identify response + IdentifySent(isPush bool, numProtocols int, numAddrs int) } type metricsTracer struct{} @@ -144,52 +146,52 @@ func (t *metricsTracer) TriggeredPushes(ev any) { pushesTriggered.WithLabelValues(*tags...).Inc() } -func (t *metricsTracer) Identify(dir network.Direction) { +func (t *metricsTracer) IncrementPushSupport(s identifyPushSupport) { tags := metricshelper.GetStringSlice() defer metricshelper.PutStringSlice(tags) - *tags = append(*tags, metricshelper.GetDirection(dir)) - identify.WithLabelValues(*tags...).Inc() + *tags = append(*tags, getPushSupport(s)) + peerPushSupport.WithLabelValues(*tags...).Inc() } -func (t *metricsTracer) IdentifyPush(dir network.Direction) { +func (t *metricsTracer) IdentifySent(isPush bool, numProtocols int, numAddrs int) { tags := metricshelper.GetStringSlice() defer metricshelper.PutStringSlice(tags) - *tags = append(*tags, metricshelper.GetDirection(dir)) - identifyPush.WithLabelValues(*tags...).Inc() -} - -func (t *metricsTracer) IncrementPushSupport(s identifyPushSupport) { - tags := metricshelper.GetStringSlice() - defer metricshelper.PutStringSlice(tags) + if isPush { + *tags = append(*tags, metricshelper.GetDirection(network.DirOutbound)) + identifyPush.WithLabelValues(*tags...).Inc() + } else { + *tags = append(*tags, metricshelper.GetDirection(network.DirInbound)) + identify.WithLabelValues(*tags...).Inc() + } - *tags = append(*tags, getPushSupport(s)) - connectionPushSupport.WithLabelValues(*tags...).Inc() + protocolsCount.Set(float64(numProtocols)) + addrsCount.Set(float64(numAddrs)) } -func (t *metricsTracer) DecrementPushSupport(s identifyPushSupport) { +func (t *metricsTracer) IdentifyReceived(isPush bool, numProtocols int, numAddrs int) { tags := metricshelper.GetStringSlice() defer metricshelper.PutStringSlice(tags) - *tags = append(*tags, getPushSupport(s)) - connectionPushSupport.WithLabelValues(*tags...).Dec() -} - -func (t *metricsTracer) NumProtocols(n int) { - protocolsCount.Set(float64(n)) -} + if isPush { + *tags = append(*tags, metricshelper.GetDirection(network.DirInbound)) + identifyPush.WithLabelValues(*tags...).Inc() + } else { + *tags = append(*tags, metricshelper.GetDirection(network.DirOutbound)) + identify.WithLabelValues(*tags...).Inc() + } -func (t *metricsTracer) NumAddrs(n int) { - addrsCount.Set(float64(n)) + numProtocolsReceived.Observe(float64(numProtocols)) + numAddrsReceived.Observe(float64(numAddrs)) } -func (t *metricsTracer) NumProtocolsReceived(n int) { - numProtocolsReceived.Observe(float64(n)) -} +func (t *metricsTracer) PeerPushSupport(support identifyPushSupport) { + tags := metricshelper.GetStringSlice() + defer metricshelper.PutStringSlice(tags) -func (t *metricsTracer) NumAddrsReceived(n int) { - numAddrsReceived.Observe(float64(n)) + *tags = append(*tags, getPushSupport(support)) + peerPushSupport.WithLabelValues(*tags...).Inc() } func getPushSupport(s identifyPushSupport) string {