Skip to content

Commit

Permalink
maintainence: fix network tests on CI (ChainSafe#1627)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored Jun 8, 2021
1 parent 29413d4 commit 00a8df1
Show file tree
Hide file tree
Showing 18 changed files with 166 additions and 234 deletions.
1 change: 0 additions & 1 deletion dot/core/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ func NewTestService(t *testing.T, cfg *Config) *Service {
config := &network.Config{
BasePath: testDatadirPath,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
BlockState: stateSrvc.Block,
Expand Down
2 changes: 0 additions & 2 deletions dot/network/block_announce_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func TestHandleBlockAnnounceMessage(t *testing.T) {
config := &Config{
BasePath: basePath,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -110,7 +109,6 @@ func TestValidateBlockAnnounceHandshake(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand Down
1 change: 0 additions & 1 deletion dot/network/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ type Config struct {
// build checks the configuration, sets up the private key for the network service,
// and applies default values where appropriate
func (c *Config) build() error {

// check state configuration
err := c.checkState()
if err != nil {
Expand Down
37 changes: 29 additions & 8 deletions dot/network/connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,17 +196,38 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) {
Addrs: addrs,
}

go func() {
for i := 0; i < maxRetries; i++ {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
time.Sleep(time.Minute)
continue
}
count := 0
retry := func() bool {
err := cm.host.connect(info)
if err != nil {
logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err)
return false
}

count++
if count > maxRetries {
return true
}
return true
}

go func() {
if retry() {
return
}

retryTimer := time.NewTicker(time.Minute)
defer retryTimer.Stop()
for {
select {
case <-cm.host.ctx.Done():
return
case <-retryTimer.C:
if retry() {
return
}
}
}
}()

// TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers
Expand Down
3 changes: 0 additions & 3 deletions dot/network/connmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ func TestMaxPeers(t *testing.T) {
config := &Config{
BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
Port: 7000 + uint32(i),
RandSeed: 1 + int64(i),
NoBootstrap: true,
NoMDNS: true,
MaxPeers: max,
Expand Down Expand Up @@ -91,7 +90,6 @@ func TestPersistentPeers(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "node-a"),
Port: 7000,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -101,7 +99,6 @@ func TestPersistentPeers(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "node-b"),
Port: 7001,
RandSeed: 2,
NoMDNS: true,
PersistentPeers: []string{addrs[0].String()},
}
Expand Down
30 changes: 9 additions & 21 deletions dot/network/discovery_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func newTestDiscovery(t *testing.T, num int) []*discovery {
config := &Config{
BasePath: utils.NewTestBasePath(t, fmt.Sprintf("node%d", i)),
Port: uint32(7001 + i),
RandSeed: int64(1 + i),
NoBootstrap: true,
NoMDNS: true,
}
Expand Down Expand Up @@ -119,7 +118,6 @@ func TestBeginDiscovery(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -130,21 +128,18 @@ func TestBeginDiscovery(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}

nodeB := createTestService(t, configB)
nodeB.noGossip = true

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
err = nodeA.host.connect(addrInfoB)
}
require.NoError(t, err)

Expand All @@ -159,7 +154,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configA := &Config{
BasePath: utils.NewTestBasePath(t, "nodeA"),
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -170,7 +164,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configB := &Config{
BasePath: utils.NewTestBasePath(t, "nodeB"),
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -181,7 +174,6 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
configC := &Config{
BasePath: utils.NewTestBasePath(t, "nodeC"),
Port: 7003,
RandSeed: 3,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -190,24 +182,20 @@ func TestBeginDiscovery_ThreeNodes(t *testing.T) {
nodeC.noGossip = true

// connect A and B
addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err := nodeA.host.connect(addrInfoB)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosB[0])
err = nodeA.host.connect(addrInfoB)
}
require.NoError(t, err)

// connect A and C
addrInfosC, err := nodeC.host.addrInfos()
require.NoError(t, err)

err = nodeA.host.connect(*addrInfosC[0])
addrInfoC := nodeC.host.addrInfo()
err = nodeA.host.connect(addrInfoC)
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeA.host.connect(*addrInfosC[0])
err = nodeA.host.connect(addrInfoC)
}
require.NoError(t, err)

Expand Down
25 changes: 9 additions & 16 deletions dot/network/gossip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ func TestGossip(t *testing.T) {
configA := &Config{
BasePath: basePathA,
Port: 7001,
RandSeed: 1,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -49,7 +48,6 @@ func TestGossip(t *testing.T) {
configB := &Config{
BasePath: basePathB,
Port: 7002,
RandSeed: 2,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -58,22 +56,19 @@ func TestGossip(t *testing.T) {
handlerB := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeB.host.registerStreamHandler("", handlerB.handleStream)

addrInfosA, err := nodeA.host.addrInfos()
require.NoError(t, err)

err = nodeB.host.connect(*addrInfosA[0])
addrInfoA := nodeA.host.addrInfo()
err := nodeB.host.connect(addrInfoA)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeB.host.connect(*addrInfosA[0])
err = nodeB.host.connect(addrInfoA)
}
require.NoError(t, err)

basePathC := utils.NewTestBasePath(t, "nodeC")
configC := &Config{
BasePath: basePathC,
Port: 7003,
RandSeed: 3,
NoBootstrap: true,
NoMDNS: true,
}
Expand All @@ -82,26 +77,24 @@ func TestGossip(t *testing.T) {
handlerC := newTestStreamHandler(testBlockAnnounceMessageDecoder)
nodeC.host.registerStreamHandler("", handlerC.handleStream)

err = nodeC.host.connect(*addrInfosA[0])
err = nodeC.host.connect(addrInfoA)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeC.host.connect(*addrInfosA[0])
err = nodeC.host.connect(addrInfoA)
}
require.NoError(t, err)

addrInfosB, err := nodeB.host.addrInfos()
require.NoError(t, err)

err = nodeC.host.connect(*addrInfosB[0])
addrInfoB := nodeB.host.addrInfo()
err = nodeC.host.connect(addrInfoB)
// retry connect if "failed to dial" error
if failedToDial(err) {
time.Sleep(TestBackoffTimeout)
err = nodeC.host.connect(*addrInfosB[0])
err = nodeC.host.connect(addrInfoB)
}
require.NoError(t, err)

_, err = nodeA.host.send(addrInfosB[0].ID, "", testBlockAnnounceMessage)
_, err = nodeA.host.send(addrInfoB.ID, "", testBlockAnnounceMessage)
require.NoError(t, err)

time.Sleep(TestMessageTimeout)
Expand Down
42 changes: 22 additions & 20 deletions dot/network/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"fmt"
"net"
"path"
"sync"
"time"

"github.com/dgraph-io/ristretto"
Expand Down Expand Up @@ -48,6 +49,8 @@ var privateCIDRs = []string{
"169.254.0.0/16",
}

var connectTimeout = time.Second * 5

// host wraps libp2p host with network host configuration and services
type host struct {
ctx context.Context
Expand All @@ -60,6 +63,7 @@ type host struct {
ds *badger.Datastore
messageCache *messageCache
bwc *metrics.BandwidthCounter
closeSync sync.Once
}

// newHost creates a host wrapper with a new libp2p host instance
Expand Down Expand Up @@ -204,17 +208,19 @@ func (h *host) close() error {
return err
}

err = h.h.Peerstore().Close()
if err != nil {
logger.Error("Failed to close libp2p peerstore", "error", err)
return err
}
h.closeSync.Do(func() {
err = h.h.Peerstore().Close()
if err != nil {
logger.Error("Failed to close libp2p peerstore", "error", err)
return
}

err = h.ds.Close()
if err != nil {
logger.Error("Failed to close libp2p host datastore", "error", err)
return err
}
err = h.ds.Close()
if err != nil {
logger.Error("Failed to close libp2p host datastore", "error", err)
return
}
})
return nil
}

Expand All @@ -241,7 +247,7 @@ func (h *host) registerStreamHandlerWithOverwrite(pid protocol.ID, overwrite boo
// connect connects the host to a specific peer address
func (h *host) connect(p peer.AddrInfo) (err error) {
h.h.Peerstore().AddAddrs(p.ID, p.Addrs, peerstore.PermanentAddrTTL)
ctx, cancel := context.WithTimeout(h.ctx, time.Second*2)
ctx, cancel := context.WithTimeout(h.ctx, connectTimeout)
defer cancel()
err = h.h.Connect(ctx, p)
return err
Expand Down Expand Up @@ -379,16 +385,12 @@ func (h *host) peerCount() int {
return len(peers)
}

// addrInfos returns the libp2p AddrInfos of the host
func (h *host) addrInfos() (addrInfos []*peer.AddrInfo, err error) {
for _, multiaddr := range h.multiaddrs() {
addrInfo, err := peer.AddrInfoFromP2pAddr(multiaddr)
if err != nil {
return nil, err
}
addrInfos = append(addrInfos, addrInfo)
// addrInfo returns the libp2p peer.AddrInfo of the host
func (h *host) addrInfo() peer.AddrInfo {
return peer.AddrInfo{
ID: h.h.ID(),
Addrs: h.h.Addrs(),
}
return addrInfos, nil
}

// multiaddrs returns the multiaddresses of the host
Expand Down
Loading

0 comments on commit 00a8df1

Please sign in to comment.