From e6b4efdb1b311808d067ded3343c54db17f45d81 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 6 Apr 2021 18:20:25 -0400 Subject: [PATCH 1/8] don't start justification requesting if syncing --- dot/network/sync_justification.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go index c5310cc144..dd08917ee4 100644 --- a/dot/network/sync_justification.go +++ b/dot/network/sync_justification.go @@ -36,6 +36,15 @@ func (q *syncQueue) finalizeAtHead() { return } + head, err := q.s.blockState.BestBlockHeader() + if err != nil { + continue + } + + if head.Int64() < q.goal { + continue + } + curr, err := q.s.blockState.GetFinalizedHeader(0, 0) if err != nil { continue From 60246a2e53d9cd18e9eedc0b3cddb53613fa3761 Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 6 Apr 2021 18:23:32 -0400 Subject: [PATCH 2/8] fix --- dot/network/sync_justification.go | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/dot/network/sync_justification.go b/dot/network/sync_justification.go index dd08917ee4..e0897d0d10 100644 --- a/dot/network/sync_justification.go +++ b/dot/network/sync_justification.go @@ -36,7 +36,7 @@ func (q *syncQueue) finalizeAtHead() { return } - head, err := q.s.blockState.BestBlockHeader() + head, err := q.s.blockState.BestBlockNumber() if err != nil { continue } @@ -59,12 +59,6 @@ func (q *syncQueue) finalizeAtHead() { prev = curr - // no new blocks have been finalized, request block justifications from peers - head, err := q.s.blockState.BestBlockNumber() - if err != nil { - continue - } - start := head.Uint64() - uint64(blockRequestSize) if curr.Number.Uint64() > start { start = curr.Number.Uint64() + 1 From 048cf4cedb878677f75fe72a690f83157a18caba Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 6 Apr 2021 18:42:56 -0400 Subject: [PATCH 3/8] add persistent-peers to toml --- cmd/gossamer/config.go | 6 ++++++ dot/config.go | 15 ++++++++------- dot/config/toml/config.go | 15 ++++++++------- dot/network/config.go | 3 +++ dot/services.go | 25 +++++++++++++------------ 5 files changed, 38 insertions(+), 26 deletions(-) diff --git a/cmd/gossamer/config.go b/cmd/gossamer/config.go index 4f08f54e83..dfe218cdaf 100644 --- a/cmd/gossamer/config.go +++ b/cmd/gossamer/config.go @@ -550,6 +550,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot cfg.NoMDNS = tomlCfg.NoMDNS cfg.MinPeers = tomlCfg.MinPeers cfg.MaxPeers = tomlCfg.MaxPeers + cfg.PersistentPeers = tomlCfg.PersistentPeers // check --port flag and update node configuration if port := ctx.GlobalUint(PortFlag.Name); port != 0 { @@ -581,6 +582,10 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot cfg.NoMDNS = true } + if len(cfg.PersistentPeers) == 0 { + cfg.PersistentPeers = []string(nil) + } + logger.Debug( "network configuration", "port", cfg.Port, @@ -590,6 +595,7 @@ func setDotNetworkConfig(ctx *cli.Context, tomlCfg ctoml.NetworkConfig, cfg *dot "nomdns", cfg.NoMDNS, "minpeers", cfg.MinPeers, "maxpeers", cfg.MaxPeers, + "persistent-peers", cfg.PersistentPeers, ) } diff --git a/dot/config.go b/dot/config.go index 73d17e0880..83ff461c0a 100644 --- a/dot/config.go +++ b/dot/config.go @@ -77,13 +77,14 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 - Bootnodes []string - ProtocolID string - NoBootstrap bool - NoMDNS bool - MinPeers int - MaxPeers int + Port uint32 + Bootnodes []string + ProtocolID string + NoBootstrap bool + NoMDNS bool + MinPeers int + MaxPeers int + PersistentPeers []string } // CoreConfig is to marshal/unmarshal toml core config vars diff --git a/dot/config/toml/config.go b/dot/config/toml/config.go index 6948bbcfe2..98a53611a3 100644 --- a/dot/config/toml/config.go +++ b/dot/config/toml/config.go @@ -61,13 +61,14 @@ type AccountConfig struct { // NetworkConfig is to marshal/unmarshal toml network config vars type NetworkConfig struct { - Port uint32 `toml:"port,omitempty"` - Bootnodes []string `toml:"bootnodes,omitempty"` - ProtocolID string `toml:"protocol,omitempty"` - NoBootstrap bool `toml:"nobootstrap,omitempty"` - NoMDNS bool `toml:"nomdns,omitempty"` - MinPeers int `toml:"min-peers,omitempty"` - MaxPeers int `toml:"max-peers,omitempty"` + Port uint32 `toml:"port,omitempty"` + Bootnodes []string `toml:"bootnodes,omitempty"` + ProtocolID string `toml:"protocol,omitempty"` + NoBootstrap bool `toml:"nobootstrap,omitempty"` + NoMDNS bool `toml:"nomdns,omitempty"` + MinPeers int `toml:"min-peers,omitempty"` + MaxPeers int `toml:"max-peers,omitempty"` + PersistentPeers []string `toml:"persistent-peers,omitempty"` } // CoreConfig is to marshal/unmarshal toml core config vars diff --git a/dot/network/config.go b/dot/network/config.go index d2420f8a7a..a3cf8e2a02 100644 --- a/dot/network/config.go +++ b/dot/network/config.go @@ -85,6 +85,9 @@ type Config struct { MinPeers int MaxPeers int + // PersistentPeers is a list of multiaddrs which the node should remain connected to + PersistentPeers []string + // privateKey the private key for the network p2p identity privateKey crypto.PrivKey diff --git a/dot/services.go b/dot/services.go index a49239cb52..0f5d329dbc 100644 --- a/dot/services.go +++ b/dot/services.go @@ -263,18 +263,19 @@ func createNetworkService(cfg *Config, stateSrvc *state.Service) (*network.Servi // network service configuation networkConfig := network.Config{ - LogLvl: cfg.Log.NetworkLvl, - BlockState: stateSrvc.Block, - BasePath: cfg.Global.BasePath, - Roles: cfg.Core.Roles, - Port: cfg.Network.Port, - Bootnodes: cfg.Network.Bootnodes, - ProtocolID: cfg.Network.ProtocolID, - NoBootstrap: cfg.Network.NoBootstrap, - NoMDNS: cfg.Network.NoMDNS, - MinPeers: cfg.Network.MinPeers, - MaxPeers: cfg.Network.MaxPeers, - PublishMetrics: cfg.Global.PublishMetrics, + LogLvl: cfg.Log.NetworkLvl, + BlockState: stateSrvc.Block, + BasePath: cfg.Global.BasePath, + Roles: cfg.Core.Roles, + Port: cfg.Network.Port, + Bootnodes: cfg.Network.Bootnodes, + ProtocolID: cfg.Network.ProtocolID, + NoBootstrap: cfg.Network.NoBootstrap, + NoMDNS: cfg.Network.NoMDNS, + MinPeers: cfg.Network.MinPeers, + MaxPeers: cfg.Network.MaxPeers, + PublishMetrics: cfg.Global.PublishMetrics, + PersistentPeers: cfg.Network.PersistentPeers, } networkSrvc, err := network.NewService(&networkConfig) From 81a9f4648cec183d7d27253083603df011dc6e9f Mon Sep 17 00:00:00 2001 From: noot Date: Tue, 6 Apr 2021 19:31:08 -0400 Subject: [PATCH 4/8] add persistent peer logic to network --- dot/network/connmgr.go | 63 +++++++++++++++++++++++++----------------- dot/network/host.go | 47 +++++++++++++++++++------------ 2 files changed, 67 insertions(+), 43 deletions(-) diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index f118ae1c38..34bfcf1b29 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -31,25 +31,29 @@ import ( // ConnManager implements connmgr.ConnManager type ConnManager struct { + sync.Mutex + host *host min, max int disconnectHandler func(peer.ID) // closeHandlerMap contains close handler corresponding to a protocol. closeHandlerMap map[protocol.ID]func(peerID peer.ID) - protectedPeerMapMu sync.RWMutex - // protectedPeerMap contains a list of peers that are protected from pruning + // protectedPeers contains a list of peers that are protected from pruning // when we reach the maximum numbers of peers. - protectedPeerMap map[peer.ID]struct{} - sync.Mutex + protectedPeers *sync.Map // map[peer.ID]struct{} + + // persistentPeers contains peers we should remain connected to. + persistentPeers *sync.Map // map[peer.ID]struct{} } func newConnManager(min, max int) *ConnManager { return &ConnManager{ - min: min, - max: max, - closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)), - protectedPeerMap: make(map[peer.ID]struct{}), + min: min, + max: max, + closeHandlerMap: make(map[protocol.ID]func(peerID peer.ID)), + protectedPeers: new(sync.Map), + persistentPeers: new(sync.Map), } } @@ -85,25 +89,15 @@ func (*ConnManager) TrimOpenConns(ctx context.Context) {} // Protect peer will add the given peer to the protectedPeerMap which will // protect the peer from pruning. func (cm *ConnManager) Protect(id peer.ID, tag string) { - cm.protectedPeerMapMu.Lock() - defer cm.protectedPeerMapMu.Unlock() - - cm.protectedPeerMap[id] = struct{}{} + cm.protectedPeers.Store(id, struct{}{}) } // Unprotect peer will remove the given peer from prune protection. // returns true if we have successfully removed the peer from the // protectedPeerMap. False otherwise. func (cm *ConnManager) Unprotect(id peer.ID, tag string) bool { - cm.protectedPeerMapMu.Lock() - defer cm.protectedPeerMapMu.Unlock() - - _, ok := cm.protectedPeerMap[id] - if ok { - delete(cm.protectedPeerMap, id) - return true - } - return false + _, wasDeleted := cm.protectedPeers.LoadAndDelete(id) + return wasDeleted } // Close peer @@ -111,10 +105,7 @@ func (*ConnManager) Close() error { return nil } // IsProtected returns whether the given peer is protected from pruning or not. func (cm *ConnManager) IsProtected(id peer.ID, tag string) (protected bool) { - cm.protectedPeerMapMu.RLock() - defer cm.protectedPeerMapMu.RUnlock() - - _, ok := cm.protectedPeerMap[id] + _, ok := cm.protectedPeers.Load(id) return ok } @@ -159,6 +150,7 @@ func (cm *ConnManager) Connected(n network.Network, c network.Conn) { cm.Lock() defer cm.Unlock() + // TODO: this should be updated to disconnect from (total_peers - maximum) peers, instead of just one peer if len(n.Peers()) > cm.max { unprotPeers := cm.unprotectedPeers(n.Peers()) if len(unprotPeers) == 0 { @@ -188,6 +180,22 @@ func (cm *ConnManager) Disconnected(n network.Network, c network.Conn) { if cm.disconnectHandler != nil { cm.disconnectHandler(c.RemotePeer()) } + + if !cm.isPersistent(c.RemotePeer()) { + return + } + + addrs := cm.host.h.Peerstore().Addrs(c.RemotePeer()) + info := peer.AddrInfo{ + ID: c.RemotePeer(), + Addrs: addrs, + } + + err := cm.host.connect(info) + if err != nil { + logger.Warn("failed to reconnect to persistent peer", "peer", c.RemotePeer(), "error", err) + } + // TODO: if number of peers falls below the min desired peer count, we should try to connect to previously discovered peers } @@ -224,3 +232,8 @@ func (cm *ConnManager) ClosedStream(n network.Network, s network.Stream) { closeCB(s.Conn().RemotePeer()) } } + +func (cm *ConnManager) isPersistent(p peer.ID) bool { + _, ok := cm.persistentPeers.Load(p) + return ok +} diff --git a/dot/network/host.go b/dot/network/host.go index 79594760e4..853b9320ec 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -49,13 +49,14 @@ var privateCIDRs = []string{ // host wraps libp2p host with network host configuration and services type host struct { - ctx context.Context - h libp2phost.Host - dht *dual.DHT - bootnodes []peer.AddrInfo - protocolID protocol.ID - cm *ConnManager - ds *badger.Datastore + ctx context.Context + h libp2phost.Host + dht *dual.DHT + bootnodes []peer.AddrInfo + persistentPeers []peer.AddrInfo + protocolID protocol.ID + cm *ConnManager + ds *badger.Datastore } // newHost creates a host wrapper with a new libp2p host instance @@ -78,6 +79,12 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { return nil, err } + // format persistent peers + pps, err := stringsToAddrInfos(cfg.PersistentPeers) + if err != nil { + return nil, err + } + // format protocol id pid := protocol.ID(cfg.ProtocolID) @@ -143,16 +150,19 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { // wrap host and DHT service with routed host h = rhost.Wrap(h, dht) - return &host{ - ctx: ctx, - h: h, - dht: dht, - bootnodes: bns, - protocolID: pid, - cm: cm, - ds: ds, - }, nil + host := &host{ + ctx: ctx, + h: h, + dht: dht, + bootnodes: bns, + protocolID: pid, + cm: cm, + ds: ds, + persistentPeers: pps, + } + cm.host = host + return host, nil } // close closes host services and the libp2p host (host services first) @@ -221,14 +231,15 @@ func (h *host) addToPeerstore(p peer.AddrInfo) { // bootstrap connects the host to the configured bootnodes func (h *host) bootstrap() { failed := 0 - for _, addrInfo := range h.bootnodes { + all := append(h.bootnodes, h.persistentPeers...) + for _, addrInfo := range all { err := h.connect(addrInfo) if err != nil { logger.Debug("failed to bootstrap to peer", "error", err) failed++ } } - if failed == len(h.bootnodes) { + if failed == len(all) { logger.Error("failed to bootstrap to any bootnode") } } From 38d1d21265b38fe1cce40310f014c12809fe290b Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 7 Apr 2021 11:43:19 -0400 Subject: [PATCH 5/8] fix test --- dot/network/connmgr_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index e6112735fb..3bd51f73af 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -64,7 +64,6 @@ func TestMaxPeers(t *testing.T) { func TestProtectUnprotectPeer(t *testing.T) { cm := newConnManager(1, 4) - require.Zero(t, len(cm.protectedPeerMap)) p1 := peer.ID("a") p2 := peer.ID("b") From f6712759ddfd5d7c1123efd8f3b43e969b09b764 Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 7 Apr 2021 12:49:05 -0400 Subject: [PATCH 6/8] add test --- dot/network/connmgr.go | 2 +- dot/network/connmgr_test.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 33 insertions(+), 1 deletion(-) diff --git a/dot/network/connmgr.go b/dot/network/connmgr.go index 34bfcf1b29..f10b2fc585 100644 --- a/dot/network/connmgr.go +++ b/dot/network/connmgr.go @@ -131,7 +131,7 @@ func (cm *ConnManager) ListenClose(n network.Network, addr ma.Multiaddr) { func (cm *ConnManager) unprotectedPeers(peers []peer.ID) []peer.ID { unprot := []peer.ID{} for _, id := range peers { - if !cm.IsProtected(id, "") { + if !cm.IsProtected(id, "") && !cm.isPersistent(id) { unprot = append(unprot, id) } } diff --git a/dot/network/connmgr_test.go b/dot/network/connmgr_test.go index 3bd51f73af..e760704013 100644 --- a/dot/network/connmgr_test.go +++ b/dot/network/connmgr_test.go @@ -19,6 +19,7 @@ package network import ( "fmt" "testing" + "time" "github.com/ChainSafe/gossamer/lib/utils" "github.com/libp2p/go-libp2p-core/peer" @@ -85,3 +86,34 @@ func TestProtectUnprotectPeer(t *testing.T) { unprot = cm.unprotectedPeers([]peer.ID{p1, p2, p3, p4}) require.Equal(t, unprot, []peer.ID{p1, p2, p3, p4}) } + +func TestPersistentPeers(t *testing.T) { + configA := &Config{ + BasePath: utils.NewTestBasePath(t, "node-a"), + Port: 7000, + RandSeed: 1, + NoBootstrap: true, + NoMDNS: true, + } + nodeA := createTestService(t, configA) + + addrs := nodeA.host.multiaddrs() + configB := &Config{ + BasePath: utils.NewTestBasePath(t, "node-b"), + Port: 7001, + RandSeed: 2, + NoMDNS: true, + PersistentPeers: []string{addrs[0].String()}, + } + nodeB := createTestService(t, configB) + + // B should have connected to A during bootstrap + conns := nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + require.NotEqual(t, 0, len(conns)) + + // if A disconnects from B, B should reconnect + nodeA.host.h.Network().ClosePeer(nodeA.host.id()) + time.Sleep(time.Millisecond * 500) + conns = nodeB.host.h.Network().ConnsToPeer(nodeA.host.id()) + require.NotEqual(t, 0, len(conns)) +} From 1c41a7b6de684d271d889be6def12c7aa775c47f Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 7 Apr 2021 12:50:13 -0400 Subject: [PATCH 7/8] add go.mod --- go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a38499f774..e9ee6e8434 100644 --- a/go.mod +++ b/go.mod @@ -57,7 +57,7 @@ require ( golang.org/x/net v0.0.0-20200822124328-c89045814202 // indirect golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9 // indirect golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c // indirect - golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd + golang.org/x/tools v0.0.0-20200221224223-e1da425f72fd // indirect google.golang.org/appengine v1.6.5 // indirect google.golang.org/protobuf v1.25.0 ) From ad89fb66beab731415f2233141d3eeffa66f8b08 Mon Sep 17 00:00:00 2001 From: noot Date: Wed, 7 Apr 2021 18:03:51 -0400 Subject: [PATCH 8/8] load persistent peers map properly --- dot/network/host.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/dot/network/host.go b/dot/network/host.go index 853b9320ec..8b1e4d25da 100644 --- a/dot/network/host.go +++ b/dot/network/host.go @@ -85,6 +85,10 @@ func newHost(ctx context.Context, cfg *Config) (*host, error) { return nil, err } + for _, pp := range pps { + cm.persistentPeers.Store(pp.ID, struct{}{}) + } + // format protocol id pid := protocol.ID(cfg.ProtocolID)