Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(dot/network): move low reputation peer removal from network ConnManager to peer scoring logic (dot/peerstate) #2068

Merged
merged 9 commits into from
Nov 30, 2021
34 changes: 1 addition & 33 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ package network

import (
"context"
"crypto/rand"
"math/big"
"sync"

"github.com/libp2p/go-libp2p-core/connmgr"
Expand Down Expand Up @@ -130,40 +128,10 @@ func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID {
func (cm *ConnManager) Connected(n network.Network, c network.Conn) {
logger.Tracef(
"Host %s connected to peer %s", n.LocalPeer(), c.RemotePeer())

if cm.connectHandler != nil {
cm.connectHandler(c.RemotePeer())
}

cm.Lock()
defer cm.Unlock()

over := len(n.Peers()) - cm.max
if over <= 0 {
return
}

// TODO: peer scoring doesn't seem to prevent us from going over the max.
// if over the max peer count, disconnect from (total_peers - maximum) peers
// (#2039)
for i := 0; i < over; i++ {
unprotPeers := cm.unprotectedPeers(n.Peers())
if len(unprotPeers) == 0 {
return
}

i, err := rand.Int(rand.Reader, big.NewInt(int64(len(unprotPeers))))
if err != nil {
logger.Errorf("error generating random number: %s", err)
return
}

up := unprotPeers[i.Int64()]
logger.Tracef("Over max peer count, disconnecting from random unprotected peer %s", up)
err = n.ClosePeer(up)
if err != nil {
logger.Tracef("failed to close connection to peer %s", up)
}
}
}

// Disconnected is called when a connection closed
Expand Down
11 changes: 4 additions & 7 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ package network
import (
"context"
"errors"
"fmt"
"math/big"
"strings"
"sync"
Expand Down Expand Up @@ -670,6 +669,9 @@ func (s *Service) startPeerSetHandler() {

func (s *Service) processMessage(msg peerset.Message) {
peerID := msg.PeerID
if peerID == "" {
logger.Errorf("found empty peer id in peerset message")
kishansagathiya marked this conversation as resolved.
Show resolved Hide resolved
}
switch msg.Status {
case peerset.Connect:
addrInfo := s.host.h.Peerstore().PeerInfo(peerID)
Expand Down Expand Up @@ -704,12 +706,7 @@ func (s *Service) startProcessingMsg() {
select {
case <-s.ctx.Done():
return
case m := <-msgCh:
msg, ok := m.(peerset.Message)
if !ok {
logger.Error(fmt.Sprintf("failed to get message from peerSet: type is %T instead of peerset.Message", m))
continue
}
case msg := <-msgCh:
s.processMessage(msg)
}
}
Expand Down
2 changes: 1 addition & 1 deletion dot/network/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,5 +79,5 @@ type PeerRemove interface {
type Peer interface {
PeerReputation(peer.ID) (peerset.Reputation, error)
SortedPeers(idx int) chan peer.IDSlice
Messages() chan interface{}
Messages() chan peerset.Message
}
6 changes: 4 additions & 2 deletions dot/peerset/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@

package peerset

import "github.com/libp2p/go-libp2p-core/peer"
import (
"github.com/libp2p/go-libp2p-core/peer"
)

// Handler manages peerSet.
type Handler struct {
Expand Down Expand Up @@ -88,7 +90,7 @@ func (h *Handler) Incoming(setID int, peers ...peer.ID) {
}

// Messages return result message chan.
func (h *Handler) Messages() chan interface{} {
func (h *Handler) Messages() chan Message {
return h.peerSet.resultMsgCh
}

Expand Down
45 changes: 33 additions & 12 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ type PeerSet struct {
// TODO: this will be useful for reserved only mode
// this is for future purpose if reserved-only flag is enabled (#1888).
isReservedOnly bool
resultMsgCh chan interface{}
resultMsgCh chan Message
// time when the PeerSet was created.
created time.Time
// last time when we updated the reputations of connected nodes.
Expand Down Expand Up @@ -183,6 +183,8 @@ func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *C
}

return &ConfigSet{
// Why are we using an array of config in the set, when we are
// using just one config
Comment on lines +186 to +187
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1886 @arijitAD could you give more context on this?

if I remember correctly it was so we could score the different stream protocols differently (eg. sync, block-announce, etc)

Set: []*config{set},
}
}
Expand Down Expand Up @@ -228,8 +230,8 @@ func reputationTick(reput Reputation) Reputation {
return reput.sub(diff)
}

// updateTime updates the value of latestTimeUpdate and performs all the updates that happen
// over time, such as Reputation increases for staying connected.
// updateTime updates the value of latestTimeUpdate and performs all the updates that
// happen over time, such as Reputation increases for staying connected.
func (ps *PeerSet) updateTime() error {
currTime := time.Now()
// identify the time difference between current time and last update time for peer reputation in seconds.
Expand Down Expand Up @@ -282,8 +284,8 @@ func (ps *PeerSet) updateTime() error {
}

// reportPeer on report ReputationChange of the peer based on its behaviour,
// if the updated Reputation is below BannedThresholdValue then, this node need to be disconnected
// and a drop message for the peer is sent in order to disconnect.
// if the updated Reputation is below BannedThresholdValue then, this node need to
// be disconnected and a drop message for the peer is sent in order to disconnect.
func (ps *PeerSet) reportPeer(change ReputationChange, peers ...peer.ID) error {
// we want reputations to be up-to-date before adjusting them.
if err := ps.updateTime(); err != nil {
Expand Down Expand Up @@ -516,8 +518,9 @@ func (ps *PeerSet) removePeer(setID int, peers ...peer.ID) error {
return nil
}

// incoming indicates that we have received an incoming connection. Must be answered either with
// a corresponding `Accept` or `Reject`, except if we were already connected to this peer.
// incoming indicates that we have received an incoming connection. Must be answered
// either with a corresponding `Accept` or `Reject`, except if we were already
// connected to this peer.
func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
if err := ps.updateTime(); err != nil {
return err
Expand All @@ -527,7 +530,11 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
for _, pid := range peers {
if ps.isReservedOnly {
if _, ok := ps.reservedNode[pid]; !ok {
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
continue
}
}
Expand All @@ -546,11 +553,24 @@ func (ps *PeerSet) incoming(setID int, peers ...peer.ID) error {
p := state.nodes[pid]
switch {
case p.getReputation() < BannedThresholdValue:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
case state.tryAcceptIncoming(setID, pid) != nil:
ps.resultMsgCh <- Message{Status: Reject}
ps.resultMsgCh <- Message{
Status: Reject,
setID: uint64(setID),
PeerID: pid,
}
default:
ps.resultMsgCh <- Message{Status: Accept}
logger.Debugf("incoming connection accepted from peer %s", pid)
ps.resultMsgCh <- Message{
Status: Accept,
setID: uint64(setID),
PeerID: pid,
}
}
}

Expand Down Expand Up @@ -593,6 +613,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
}
ps.resultMsgCh <- Message{
Status: Drop,
setID: uint64(setIdx),
PeerID: pid,
}

Expand All @@ -610,7 +631,7 @@ func (ps *PeerSet) disconnect(setIdx int, reason DropReason, peers ...peer.ID) e
// start handles all the action for the peerSet.
func (ps *PeerSet) start(aq chan action) {
ps.actionQueue = aq
ps.resultMsgCh = make(chan interface{}, msgChanSize)
ps.resultMsgCh = make(chan Message, msgChanSize)
go ps.doWork()
}

Expand Down
4 changes: 1 addition & 3 deletions dot/peerset/peerset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,7 @@ func TestAddReservedPeers(t *testing.T) {
if len(ps.resultMsgCh) == 0 {
break
}
m := <-ps.resultMsgCh
msg, ok := m.(Message)
require.True(t, ok)
msg := <-ps.resultMsgCh
require.Equal(t, expectedMsgs[i], msg)
}
}
Expand Down