diff --git a/p2p/net/connmgr/connmgr.go b/p2p/net/connmgr/connmgr.go index e61ad15ca9..681649662c 100644 --- a/p2p/net/connmgr/connmgr.go +++ b/p2p/net/connmgr/connmgr.go @@ -53,12 +53,12 @@ type segment struct { type segments [256]*segment -func (s *segments) get(p peer.ID) *segment { - return s[byte(p[len(p)-1])] +func (ss *segments) get(p peer.ID) *segment { + return ss[byte(p[len(p)-1])] } -func (s *segments) countPeers() (count int) { - for _, seg := range s { +func (ss *segments) countPeers() (count int) { + for _, seg := range ss { seg.Lock() count += len(seg.peers) seg.Unlock() @@ -66,6 +66,23 @@ func (s *segments) countPeers() (count int) { return count } +func (s *segment) tagInfoFor(p peer.ID) *peerInfo { + pi, ok := s.peers[p] + if ok { + return pi + } + // create a temporary peer to buffer early tags before the Connected notification arrives. + pi = &peerInfo{ + id: p, + firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives. + temp: true, + tags: make(map[string]int), + conns: make(map[inet.Conn]time.Time), + } + s.peers[p] = pi + return pi +} + // NewConnManager creates a new BasicConnMgr with the provided params: // * lo and hi are watermarks governing the number of connections that'll be maintained. // When the peer count exceeds the 'high watermark', as many peers will be pruned (and @@ -134,6 +151,7 @@ type peerInfo struct { id peer.ID tags map[string]int // value for each tag value int // cached sum of all tag values + temp bool // this is a temporary entry holding early tags, and awaiting connections conns map[inet.Conn]time.Time // start time of each connection @@ -218,7 +236,13 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { // Sort peers according to their value. sort.Slice(candidates, func(i, j int) bool { - return candidates[i].value < candidates[j].value + left, right := candidates[i], candidates[j] + // temporary peers are preferred for pruning. + if left.temp != right.temp { + return left.temp + } + // otherwise, compare by value. + return left.value < right.value }) target := nconns - cm.lowWater @@ -227,6 +251,9 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { selected := make([]inet.Conn, 0, target+10) for _, inf := range candidates { + if target <= 0 { + break + } // TODO: should we be using firstSeen or the time associated with the connection itself? if inf.firstSeen.Add(cm.gracePeriod).After(now) { continue @@ -235,15 +262,18 @@ func (cm *BasicConnMgr) getConnsToClose(ctx context.Context) []inet.Conn { // lock this to protect from concurrent modifications from connect/disconnect events s := cm.segments.get(inf.id) s.Lock() - for c := range inf.conns { - selected = append(selected, c) + + if len(inf.conns) == 0 && inf.temp { + // handle temporary entries for early tags -- this entry has gone past the grace period + // and still holds no connections, so prune it. + delete(s.peers, inf.id) + } else { + for c := range inf.conns { + selected = append(selected, c) + } } target -= len(inf.conns) s.Unlock() - - if target <= 0 { - break - } } return selected @@ -284,14 +314,10 @@ func (cm *BasicConnMgr) TagPeer(p peer.ID, tag string, val int) { s.Lock() defer s.Unlock() - pi, ok := s.peers[p] - if !ok { - log.Info("tried to tag conn from untracked peer: ", p) - return - } + pi := s.tagInfoFor(p) // Update the total value of the peer. - pi.value += (val - pi.tags[tag]) + pi.value += val - pi.tags[tag] pi.tags[tag] = val } @@ -318,15 +344,11 @@ func (cm *BasicConnMgr) UpsertTag(p peer.ID, tag string, upsert func(int) int) { s.Lock() defer s.Unlock() - pi, ok := s.peers[p] - if !ok { - log.Info("tried to upsert tag from untracked peer: ", p) - return - } + pi := s.tagInfoFor(p) oldval := pi.tags[tag] newval := upsert(oldval) - pi.value += (newval - oldval) + pi.value += newval - oldval pi.tags[tag] = newval } @@ -383,15 +405,22 @@ func (nn *cmNotifee) Connected(n inet.Network, c inet.Conn) { s.Lock() defer s.Unlock() - pinfo, ok := s.peers[p] + id := c.RemotePeer() + pinfo, ok := s.peers[id] if !ok { pinfo = &peerInfo{ - id: p, + id: id, firstSeen: time.Now(), tags: make(map[string]int), conns: make(map[inet.Conn]time.Time), } - s.peers[p] = pinfo + s.peers[id] = pinfo + } else if pinfo.temp { + // we had created a temporary entry for this peer to buffer early tags before the + // Connected notification arrived: flip the temporary flag, and update the firstSeen + // timestamp to the real one. + pinfo.temp = false + pinfo.firstSeen = time.Now() } _, ok = pinfo.conns[c] diff --git a/p2p/net/connmgr/connmgr_test.go b/p2p/net/connmgr/connmgr_test.go index 7f4ed10436..d7933ae893 100644 --- a/p2p/net/connmgr/connmgr_test.go +++ b/p2p/net/connmgr/connmgr_test.go @@ -190,8 +190,8 @@ func TestTagPeerNonExistant(t *testing.T) { id := tu.RandPeerIDFatal(t) cm.TagPeer(id, "test", 1) - if cm.segments.countPeers() != 0 { - t.Fatal("expected zero peers") + if !cm.segments.get(id).peers[id].temp { + t.Fatal("expected 1 temporary entry") } } @@ -525,9 +525,9 @@ func TestUpsertTag(t *testing.T) { cm := NewConnManager(1, 1, time.Duration(10*time.Minute)) not := cm.Notifee() conn := randConn(t, nil) - not.Connected(nil, conn) rp := conn.RemotePeer() + // this is an early tag, before the Connected notification arrived. cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 }) if len(cm.segments.get(rp).peers[rp].tags) != 1 { t.Fatal("expected a tag") @@ -536,6 +536,9 @@ func TestUpsertTag(t *testing.T) { t.Fatal("expected a tag value of 1") } + // now let's notify the connection. + not.Connected(nil, conn) + cm.UpsertTag(rp, "tag", func(v int) int { return v + 1 }) if len(cm.segments.get(rp).peers[rp].tags) != 1 { t.Fatal("expected a tag") @@ -552,3 +555,44 @@ func TestUpsertTag(t *testing.T) { t.Fatal("expected a tag value of 1") } } + +func TestTemporaryEntriesClearedFirst(t *testing.T) { + cm := NewConnManager(1, 1, 0) + + id := tu.RandPeerIDFatal(t) + cm.TagPeer(id, "test", 20) + + if cm.GetTagInfo(id).Value != 20 { + t.Fatal("expected an early tag with value 20") + } + + not := cm.Notifee() + conn1, conn2 := randConn(t, nil), randConn(t, nil) + not.Connected(nil, conn1) + not.Connected(nil, conn2) + + cm.TrimOpenConns(context.Background()) + if cm.GetTagInfo(id) != nil { + t.Fatal("expected no temporary tags after trimming") + } +} + +func TestTemporaryEntryConvertedOnConnection(t *testing.T) { + cm := NewConnManager(1, 1, 0) + + conn := randConn(t, nil) + cm.TagPeer(conn.RemotePeer(), "test", 20) + + ti := cm.segments.get(conn.RemotePeer()).peers[conn.RemotePeer()] + + if ti.value != 20 || !ti.temp { + t.Fatal("expected a temporary tag with value 20") + } + + not := cm.Notifee() + not.Connected(nil, conn) + + if ti.value != 20 || ti.temp { + t.Fatal("expected a non-temporary tag with value 20") + } +}