Skip to content

Commit

Permalink
address review changes
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt committed Feb 23, 2023
1 parent 557aac3 commit 07219a7
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 87 deletions.
72 changes: 26 additions & 46 deletions p2p/protocol/identify/id.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,8 @@ func (ids *idService) sendPushes(ctx context.Context) {
if err != nil { // connection might have been closed recently
return
}
if ids.metricsTracer != nil {
ids.metricsTracer.IdentifyPush(network.DirOutbound)
}
// TODO: find out if the peer supports push if we didn't have any information about push support
if err := ids.sendIdentifyResp(str); err != nil {
if err := ids.sendIdentifyResp(str, true); err != nil {
log.Debugw("failed to send identify push", "peer", c.RemotePeer(), "error", err)
return
}
Expand Down Expand Up @@ -401,28 +398,19 @@ func (ids *idService) identifyConn(c network.Conn) error {
return err
}

if ids.metricsTracer != nil {
ids.metricsTracer.Identify(network.DirInbound)
}
return ids.handleIdentifyResponse(s, false)
}

// handlePush handles incoming identify push streams
func (ids *idService) handlePush(s network.Stream) {
ids.handleIdentifyResponse(s, true)
if ids.metricsTracer != nil {
ids.metricsTracer.IdentifyPush(network.DirInbound)
}
}

func (ids *idService) handleIdentifyRequest(s network.Stream) {
if ids.metricsTracer != nil {
ids.metricsTracer.Identify(network.DirOutbound)
}
_ = ids.sendIdentifyResp(s)
_ = ids.sendIdentifyResp(s, false)
}

func (ids *idService) sendIdentifyResp(s network.Stream) error {
func (ids *idService) sendIdentifyResp(s network.Stream, isPush bool) error {
if err := s.Scope().SetService(ServiceName); err != nil {
s.Reset()
return fmt.Errorf("failed to attaching stream to identify service: %w", err)
Expand All @@ -432,11 +420,21 @@ func (ids *idService) sendIdentifyResp(s network.Stream) error {
ids.currentSnapshot.Lock()
snapshot := ids.currentSnapshot.snapshot
ids.currentSnapshot.Unlock()

log.Debugw("sending snapshot", "seq", snapshot.seq, "protocols", snapshot.protocols, "addrs", snapshot.addrs)

mes := ids.createBaseIdentifyResponse(s.Conn(), &snapshot)
mes.SignedPeerRecord = ids.getSignedRecord(&snapshot)

log.Debugf("%s sending message to %s %s", ID, s.Conn().RemotePeer(), s.Conn().RemoteMultiaddr())
if err := ids.writeChunkedIdentifyMsg(s, &snapshot); err != nil {
if err := ids.writeChunkedIdentifyMsg(s, mes); err != nil {
return err
}

if ids.metricsTracer != nil {
ids.metricsTracer.IdentifySent(isPush, len(mes.Protocols), len(mes.ListenAddrs))
}

ids.connsMu.Lock()
defer ids.connsMu.Unlock()
e, ok := ids.conns[s.Conn()]
Expand Down Expand Up @@ -485,24 +483,28 @@ func (ids *idService) handleIdentifyResponse(s network.Stream, isPush bool) erro

ids.consumeMessage(mes, c, isPush)

if ids.metricsTracer != nil {
ids.metricsTracer.IdentifyReceived(isPush, len(mes.Protocols), len(mes.ListenAddrs))
}

ids.connsMu.Lock()
defer ids.connsMu.Unlock()
e, ok := ids.conns[c]
if !ok { // might already have disconnected
return nil
}
prevPushSupport := e.PushSupport
sup, err := ids.Host.Peerstore().SupportsProtocols(c.RemotePeer(), IDPush)
if supportsIdentifyPush := err == nil && len(sup) > 0; supportsIdentifyPush {
e.PushSupport = identifyPushSupported
} else {
e.PushSupport = identifyPushUnsupported
}
ids.conns[c] = e
if ids.metricsTracer != nil && e.PushSupport != prevPushSupport {
ids.metricsTracer.IncrementPushSupport(e.PushSupport)
ids.metricsTracer.DecrementPushSupport(prevPushSupport)

if ids.metricsTracer != nil {
ids.metricsTracer.PeerPushSupport(e.PushSupport)
}

ids.conns[c] = e
return nil
}

Expand Down Expand Up @@ -541,19 +543,14 @@ func (ids *idService) updateSnapshot() {
log.Debugw("updating snapshot", "seq", snapshot.seq, "addrs", snapshot.addrs)
}

func (ids *idService) writeChunkedIdentifyMsg(s network.Stream, snapshot *identifySnapshot) error {
c := s.Conn()
log.Debugw("sending snapshot", "seq", snapshot.seq, "protocols", snapshot.protocols, "addrs", snapshot.addrs)

mes := ids.createBaseIdentifyResponse(c, snapshot)
sr := ids.getSignedRecord(snapshot)
mes.SignedPeerRecord = sr
func (ids *idService) writeChunkedIdentifyMsg(s network.Stream, mes *pb.Identify) error {
writer := pbio.NewDelimitedWriter(s)

if sr == nil || proto.Size(mes) <= legacyIDSize {
if mes.SignedPeerRecord == nil || proto.Size(mes) <= legacyIDSize {
return writer.WriteMsg(mes)
}

sr := mes.SignedPeerRecord
mes.SignedPeerRecord = nil
if err := writer.WriteMsg(mes); err != nil {
return err
Expand Down Expand Up @@ -586,10 +583,6 @@ func (ids *idService) createBaseIdentifyResponse(conn network.Conn, snapshot *id
}
mes.ListenAddrs = append(mes.ListenAddrs, addr.Bytes())
}
if ids.metricsTracer != nil {
ids.metricsTracer.NumProtocols(len(mes.Protocols))
ids.metricsTracer.NumAddrs(len(mes.ListenAddrs))
}

// set our public key
ownKey := ids.Host.Peerstore().PubKey(ids.Host.ID())
Expand Down Expand Up @@ -694,11 +687,6 @@ func (ids *idService) consumeMessage(mes *pb.Identify, c network.Conn, isPush bo
lmaddrs = append(lmaddrs, maddr)
}

if ids.metricsTracer != nil {
ids.metricsTracer.NumProtocolsReceived(len(mesProtocols))
ids.metricsTracer.NumAddrsReceived(len(lmaddrs))
}

// NOTE: Do not add `c.RemoteMultiaddr()` to the peerstore if the remote
// peer doesn't tell us to do so. Otherwise, we'll advertise it.
//
Expand Down Expand Up @@ -901,10 +889,6 @@ func (nn *netNotifiee) Connected(_ network.Network, c network.Conn) {
ids.conns[c] = entry{}
ids.connsMu.Unlock()

if ids.metricsTracer != nil {
ids.metricsTracer.IncrementPushSupport(identifyPushSupportUnknown)
}

nn.IDService().IdentifyWait(c)
}

Expand All @@ -913,12 +897,8 @@ func (nn *netNotifiee) Disconnected(_ network.Network, c network.Conn) {

// Stop tracking the connection.
ids.connsMu.Lock()
e, ok := ids.conns[c]
delete(ids.conns, c)
ids.connsMu.Unlock()
if ok && ids.metricsTracer != nil {
ids.metricsTracer.DecrementPushSupport(e.PushSupport)
}

if ids.Host.Network().Connectedness(c.RemotePeer()) != network.Connected {
// Last disconnect.
Expand Down
84 changes: 43 additions & 41 deletions p2p/protocol/identify/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ var (
},
[]string{"dir"},
)
connectionPushSupport = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
peerPushSupport = prometheus.NewCounterVec(
prometheus.CounterOpts{
Namespace: metricNamespace,
Name: "conn_push_support",
Help: "Identify Connection Push Support",
Name: "peer_push_support_total",
Help: "Identify Peer Push Support",
},
[]string{"support"},
)
Expand Down Expand Up @@ -77,7 +77,7 @@ var (
pushesTriggered,
identify,
identifyPush,
connectionPushSupport,
peerPushSupport,
protocolsCount,
addrsCount,
numProtocolsReceived,
Expand All @@ -91,15 +91,17 @@ var (
)

type MetricsTracer interface {
// TriggeredPushes counts IdentifyPushes triggered by event
TriggeredPushes(event any)
Identify(network.Direction)
IdentifyPush(network.Direction)
IncrementPushSupport(identifyPushSupport)
DecrementPushSupport(identifyPushSupport)
NumProtocols(int)
NumAddrs(int)
NumProtocolsReceived(int)
NumAddrsReceived(int)

// PeerPushSupport counts peers by Push Support
PeerPushSupport(identifyPushSupport)

// IdentifyReceived tracks metrics on receiving an identify response
IdentifyReceived(isPush bool, numProtocols int, numAddrs int)

// IdentifySent tracks metrics on sending an identify response
IdentifySent(isPush bool, numProtocols int, numAddrs int)
}

type metricsTracer struct{}
Expand Down Expand Up @@ -144,52 +146,52 @@ func (t *metricsTracer) TriggeredPushes(ev any) {
pushesTriggered.WithLabelValues(*tags...).Inc()
}

func (t *metricsTracer) Identify(dir network.Direction) {
func (t *metricsTracer) IncrementPushSupport(s identifyPushSupport) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, metricshelper.GetDirection(dir))
identify.WithLabelValues(*tags...).Inc()
*tags = append(*tags, getPushSupport(s))
peerPushSupport.WithLabelValues(*tags...).Inc()
}

func (t *metricsTracer) IdentifyPush(dir network.Direction) {
func (t *metricsTracer) IdentifySent(isPush bool, numProtocols int, numAddrs int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, metricshelper.GetDirection(dir))
identifyPush.WithLabelValues(*tags...).Inc()
}

func (t *metricsTracer) IncrementPushSupport(s identifyPushSupport) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)
if isPush {
*tags = append(*tags, metricshelper.GetDirection(network.DirOutbound))
identifyPush.WithLabelValues(*tags...).Inc()
} else {
*tags = append(*tags, metricshelper.GetDirection(network.DirInbound))
identify.WithLabelValues(*tags...).Inc()
}

*tags = append(*tags, getPushSupport(s))
connectionPushSupport.WithLabelValues(*tags...).Inc()
protocolsCount.Set(float64(numProtocols))
addrsCount.Set(float64(numAddrs))
}

func (t *metricsTracer) DecrementPushSupport(s identifyPushSupport) {
func (t *metricsTracer) IdentifyReceived(isPush bool, numProtocols int, numAddrs int) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

*tags = append(*tags, getPushSupport(s))
connectionPushSupport.WithLabelValues(*tags...).Dec()
}

func (t *metricsTracer) NumProtocols(n int) {
protocolsCount.Set(float64(n))
}
if isPush {
*tags = append(*tags, metricshelper.GetDirection(network.DirInbound))
identifyPush.WithLabelValues(*tags...).Inc()
} else {
*tags = append(*tags, metricshelper.GetDirection(network.DirOutbound))
identify.WithLabelValues(*tags...).Inc()
}

func (t *metricsTracer) NumAddrs(n int) {
addrsCount.Set(float64(n))
numProtocolsReceived.Observe(float64(numProtocols))
numAddrsReceived.Observe(float64(numAddrs))
}

func (t *metricsTracer) NumProtocolsReceived(n int) {
numProtocolsReceived.Observe(float64(n))
}
func (t *metricsTracer) PeerPushSupport(support identifyPushSupport) {
tags := metricshelper.GetStringSlice()
defer metricshelper.PutStringSlice(tags)

func (t *metricsTracer) NumAddrsReceived(n int) {
numAddrsReceived.Observe(float64(n))
*tags = append(*tags, getPushSupport(support))
peerPushSupport.WithLabelValues(*tags...).Inc()
}

func getPushSupport(s identifyPushSupport) string {
Expand Down

0 comments on commit 07219a7

Please sign in to comment.