Skip to content

Commit

Permalink
fix(dot/network): fix dht connection on discovery on devnet (#2059)
Browse files Browse the repository at this point in the history
  • Loading branch information
kishansagathiya authored Dec 3, 2021
1 parent a7d4be0 commit da065b8
Show file tree
Hide file tree
Showing 14 changed files with 121 additions and 63 deletions.
1 change: 1 addition & 0 deletions chain/gssmr/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ nobootstrap = false
nomdns = false
discovery-interval = 10
min-peers = 1
max-peers = 50

[rpc]
enabled = false
Expand Down
2 changes: 2 additions & 0 deletions chain/gssmr/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@ var (
DefaultNoMDNS = false
// DefaultMinPeers is the default minimum desired peer count
DefaultMinPeers = 1
// DefaultMaxPeers is the default maximum desired peer count
DefaultMaxPeers = 50

// DefaultDiscoveryInterval is the default interval for searching for DHT peers
DefaultDiscoveryInterval = time.Second * 10
Expand Down
4 changes: 3 additions & 1 deletion cmd/gossamer/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,9 @@ func createDotConfig(ctx *cli.Context) (*dot.Config, error) {
return nil, err
}

logger.Infof("loaded package log configuration: %v", cfg.Log)
// TODO: log this better.
// See https://github.com/ChainSafe/gossamer/issues/1945
logger.Infof("loaded package log configuration: %#v", cfg.Log)

// set global configuration values
if err := setDotGlobalConfig(ctx, tomlCfg, &cfg.Global); err != nil {
Expand Down
7 changes: 7 additions & 0 deletions cmd/gossamer/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -472,6 +473,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -486,6 +488,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -500,6 +503,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -514,6 +518,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: true,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
},
{
Expand All @@ -528,6 +533,7 @@ func TestNetworkConfigFromFlags(t *testing.T) {
NoMDNS: false,
DiscoveryInterval: time.Second * 10,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
PublicIP: "10.0.5.2",
},
},
Expand Down Expand Up @@ -909,6 +915,7 @@ func TestUpdateConfigFromGenesisData(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
System: testCfg.System,
Expand Down
3 changes: 3 additions & 0 deletions cmd/gossamer/export_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down Expand Up @@ -112,6 +113,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down Expand Up @@ -149,6 +151,7 @@ func TestExportCommand(t *testing.T) {
NoMDNS: testCfg.Network.NoMDNS,
DiscoveryInterval: testCfg.Network.DiscoveryInterval,
MinPeers: testCfg.Network.MinPeers,
MaxPeers: testCfg.Network.MaxPeers,
},
RPC: testCfg.RPC,
Pprof: testCfg.Pprof,
Expand Down
1 change: 1 addition & 0 deletions dot/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@ func GssmrConfig() *Config {
NoMDNS: gssmr.DefaultNoMDNS,
DiscoveryInterval: gssmr.DefaultDiscoveryInterval,
MinPeers: gssmr.DefaultMinPeers,
MaxPeers: gssmr.DefaultMaxPeers,
},
RPC: RPCConfig{
Port: gssmr.DefaultRPCHTTPPort,
Expand Down
11 changes: 8 additions & 3 deletions dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,15 @@ func TestMinPeers(t *testing.T) {
}

nodeB := createTestService(t, configB)
require.Equal(t, min, nodeB.host.peerCount())
require.GreaterOrEqual(t, nodeB.host.peerCount(), len(nodes))

nodeB.host.cm.peerSetHandler.DisconnectPeer(0, nodes[0].host.id())
require.GreaterOrEqual(t, min, nodeB.host.peerCount())
// check that peer count is at least greater than minimum number of peers,
// even after trying to disconnect from all peers
for _, node := range nodes {
nodeB.host.cm.peerSetHandler.DisconnectPeer(0, node.host.id())
}

require.GreaterOrEqual(t, nodeB.host.peerCount(), min)
}

func TestMaxPeers(t *testing.T) {
Expand Down
14 changes: 1 addition & 13 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func (d *discovery) advertise() {

ttl, err = d.rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Debugf("failed to advertise in the DHT: %s", err)
logger.Warnf("failed to advertise in the DHT: %s", err)
ttl = tryAdvertiseTimeout
}
case <-d.ctx.Done():
Expand Down Expand Up @@ -199,21 +199,9 @@ func (d *discovery) findPeers(ctx context.Context) {

logger.Tracef("found new peer %s via DHT", peer.ID)

// TODO: this isn't working on the devnet (#2026)
// can remove the code block below which directly connects
// once that's fixed
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
d.handler.AddPeer(0, peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) >= d.maxPeers {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
return
}

if err = d.h.Connect(d.ctx, peer); err != nil {
logger.Tracef("failed to connect to discovered peer %s: %s", peer.ID, err)
}
}
}
}
Expand Down
8 changes: 6 additions & 2 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,12 +98,16 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) {
return nil, err
}

// We have tried to set maxInPeers and maxOutPeers such that number of peer
// connections remain between min peers and max peers
const reservedOnly = false
peerCfgSet := peerset.NewConfigSet(
uint32(cfg.MaxPeers-cfg.MinPeers),
uint32(cfg.MinPeers),
uint32(cfg.MaxPeers/2),
reservedOnly,
peerSetSlotAllocTime)
peerSetSlotAllocTime,
)

// create connection manager
cm, err := newConnManager(cfg.MinPeers, cfg.MaxPeers, peerCfgSet)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions dot/network/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -675,21 +675,21 @@ func (s *Service) processMessage(msg peerset.Message) {
var err error
addrInfo, err = s.host.discovery.findPeer(peerID)
if err != nil {
logger.Debugf("failed to find peer id %s: %s", peerID, err)
logger.Warnf("failed to find peer id %s: %s", peerID, err)
return
}
}

err := s.host.connect(addrInfo)
if err != nil {
logger.Debugf("failed to open connection for peer %s: %s", peerID, err)
logger.Warnf("failed to open connection for peer %s: %s", peerID, err)
return
}
logger.Debugf("connection successful with peer %s", peerID)
case peerset.Drop, peerset.Reject:
err := s.host.closePeer(peerID)
if err != nil {
logger.Debugf("failed to close connection with peer %s: %s", peerID, err)
logger.Warnf("failed to close connection with peer %s: %s", peerID, err)
return
}
logger.Debugf("connection dropped successfully for peer %s", peerID)
Expand Down
59 changes: 48 additions & 11 deletions dot/peerset/peerset.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,33 @@ const (
disconnect
)

func (a ActionReceiver) String() string {
switch a {
case addReservedPeer:
return "addReservedPeer"
case removeReservedPeer:
return "removeReservedPeer"
case setReservedPeers:
return "setReservedPeers"
case setReservedOnly:
return "setReservedOnly"
case reportPeer:
return "reportPeer"
case addToPeerSet:
return "addToPeerSet"
case removeFromPeerSet:
return "removeFromPeerSet"
case incoming:
return "incoming"
case sortedPeers:
return "sortedPeers"
case disconnect:
return "disconnect"
default:
return "invalid action"
}
}

// action struct stores the action type and required parameters to perform action
type action struct {
actionCall ActionReceiver
Expand All @@ -67,8 +94,8 @@ func (a action) String() string {
for i := range a.peers {
peersStrings[i] = a.peers[i].String()
}
return fmt.Sprintf("{call=%d, set-id=%d, reputation change %v, peers=[%s]",
a.actionCall, a.setID, a.reputation, strings.Join(peersStrings, ", "))
return fmt.Sprintf("{call=%s, set-id=%d, reputation change %v, peers=[%s]",
a.actionCall.String(), a.setID, a.reputation, strings.Join(peersStrings, ", "))
}

// Status represents the enum value for Message
Expand Down Expand Up @@ -156,9 +183,9 @@ type PeerSet struct {
// config is configuration of a single set.
type config struct {
// maximum number of slot occupying nodes for incoming connections.
inPeers uint32
maxInPeers uint32
// maximum number of slot occupying nodes for outgoing connections.
outPeers uint32
maxOutPeers uint32

// TODO Use in future for reserved only peers
// if true, we only accept reservedNodes (#1888).
Expand All @@ -174,10 +201,10 @@ type ConfigSet struct {
}

// NewConfigSet creates a new config set for the peerSet
func NewConfigSet(in, out uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
func NewConfigSet(maxInPeers, maxOutPeers uint32, reservedOnly bool, allocTime time.Duration) *ConfigSet {
set := &config{
inPeers: in,
outPeers: out,
maxInPeers: maxInPeers,
maxOutPeers: maxOutPeers,
reservedOnly: reservedOnly,
periodicAllocTime: allocTime,
}
Expand Down Expand Up @@ -351,6 +378,8 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if n.getReputation() < BannedThresholdValue {
logger.Warnf("reputation is lower than banned threshold value, reputation: %d, banned threshold value: %d",
n.getReputation(), BannedThresholdValue)
break
}

Expand All @@ -364,6 +393,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
PeerID: reservePeer,
}
}

// nothing more to do if we're in reserved mode.
if ps.isReservedOnly {
return nil
Expand All @@ -382,6 +412,7 @@ func (ps *PeerSet) allocSlots(setIdx int) error {
}

if err = peerState.tryOutgoing(setIdx, peerID); err != nil {
logger.Errorf("could not set peer %s as outgoing connection: %s", peerID.Pretty(), err)
break
}

Expand All @@ -403,10 +434,14 @@ func (ps *PeerSet) addReservedPeers(setID int, peers ...peer.ID) error {
return nil
}

ps.peerState.discover(setID, peerID)

ps.reservedNode[peerID] = struct{}{}
ps.peerState.addNoSlotNode(setID, peerID)
if err := ps.peerState.addNoSlotNode(setID, peerID); err != nil {
return fmt.Errorf("could not add to list of no-slot nodes: %w", err)
}
if err := ps.allocSlots(setID); err != nil {
return err
return fmt.Errorf("could not allocate slots: %w", err)
}
}
return nil
Expand All @@ -420,7 +455,9 @@ func (ps *PeerSet) removeReservedPeers(setID int, peers ...peer.ID) error {
}

delete(ps.reservedNode, peerID)
ps.peerState.removeNoSlotNode(setID, peerID)
if err := ps.peerState.removeNoSlotNode(setID, peerID); err != nil {
return fmt.Errorf("could not remove from the list of no-slot nodes: %w", err)
}

// nothing more to do if not in reservedOnly mode.
if !ps.isReservedOnly {
Expand Down Expand Up @@ -645,7 +682,7 @@ func (ps *PeerSet) doWork() {
l := ps.peerState.getSetLength()
for i := 0; i < l; i++ {
if err := ps.allocSlots(i); err != nil {
logger.Debugf("failed to do action on peerSet: %s", err)
logger.Warnf("failed to do action on peerSet: %s", err)
}
}
case act, ok := <-ps.actionQueue:
Expand Down
Loading

0 comments on commit da065b8

Please sign in to comment.