Skip to content

Commit

Permalink
fix(dot/network): fix discovery between gossamer nodes (ChainSafe#1594)
Browse files Browse the repository at this point in the history
  • Loading branch information
noot authored May 20, 2021
1 parent af9c925 commit f4c79d3
Show file tree
Hide file tree
Showing 11 changed files with 509 additions and 299 deletions.
204 changes: 204 additions & 0 deletions dot/network/discovery.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2019 ChainSafe Systems (ON) Corp.
// This file is part of gossamer.
//
// The gossamer library is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// The gossamer library is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with the gossamer library. If not, see <http://www.gnu.org/licenses/>.

package network

import (
"context"
"fmt"
"time"

badger "github.com/ipfs/go-ds-badger2"
libp2phost "github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
libp2pdiscovery "github.com/libp2p/go-libp2p-discovery"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p-kad-dht/dual"
)

var (
startDHTTimeout = time.Second * 10
initialAdvertisementTimeout = time.Millisecond
tryAdvertiseTimeout = time.Second * 30
connectToPeersTimeout = time.Minute
)

// discovery handles discovery of new peers via the kademlia DHT
type discovery struct {
ctx context.Context
dht *dual.DHT
h libp2phost.Host
bootnodes []peer.AddrInfo
ds *badger.Datastore
pid protocol.ID
minPeers, maxPeers int
}

func newDiscovery(ctx context.Context, h libp2phost.Host, bootnodes []peer.AddrInfo, ds *badger.Datastore, pid protocol.ID, min, max int) *discovery {
return &discovery{
ctx: ctx,
h: h,
bootnodes: bootnodes,
ds: ds,
pid: pid,
minPeers: min,
maxPeers: max,
}
}

// start creates the DHT.
func (d *discovery) start() error {
if len(d.bootnodes) == 0 {
// get all currently connected peers and use them to bootstrap the DHT
peers := d.h.Network().Peers()

for {
if len(peers) > 0 {
break
}

select {
case <-time.After(startDHTTimeout):
logger.Debug("no peers yet, waiting to start DHT...")
// wait for peers to connect before starting DHT, otherwise DHT bootstrap nodes
// will be empty and we will fail to fill the routing table
case <-d.ctx.Done():
return nil
}

peers = d.h.Network().Peers()
}

for _, p := range peers {
d.bootnodes = append(d.bootnodes, d.h.Peerstore().PeerInfo(p))
}
}

logger.Debug("starting DHT...", "bootnodes", d.bootnodes)

dhtOpts := []dual.Option{
dual.DHTOption(kaddht.Datastore(d.ds)),
dual.DHTOption(kaddht.BootstrapPeers(d.bootnodes...)),
dual.DHTOption(kaddht.V1ProtocolOverride(d.pid + "/kad")),
dual.DHTOption(kaddht.Mode(kaddht.ModeAutoServer)),
}

// create DHT service
dht, err := dual.New(d.ctx, d.h, dhtOpts...)
if err != nil {
return err
}

d.dht = dht
return d.discoverAndAdvertise()
}

func (d *discovery) stop() error {
if d.dht == nil {
return nil
}

return d.dht.Close()
}

func (d *discovery) discoverAndAdvertise() error {
rd := libp2pdiscovery.NewRoutingDiscovery(d.dht)

err := d.dht.Bootstrap(d.ctx)
if err != nil {
return fmt.Errorf("failed to bootstrap DHT: %w", err)
}

// wait to connect to bootstrap peers
time.Sleep(time.Second)

go func() {
ttl := initialAdvertisementTimeout

for {
select {
case <-time.After(ttl):
logger.Debug("advertising ourselves in the DHT...")
err := d.dht.Bootstrap(d.ctx)
if err != nil {
logger.Warn("failed to bootstrap DHT", "error", err)
continue
}

ttl, err = rd.Advertise(d.ctx, string(d.pid))
if err != nil {
logger.Debug("failed to advertise in the DHT", "error", err)
ttl = tryAdvertiseTimeout
}
case <-d.ctx.Done():
return
}
}
}()

go func() {
logger.Debug("attempting to find DHT peers...")
peerCh, err := rd.FindPeers(d.ctx, string(d.pid))
if err != nil {
logger.Warn("failed to begin finding peers via DHT", "err", err)
return
}

peersToTry := make(map[*peer.AddrInfo]struct{})

for {
select {
case <-d.ctx.Done():
return
case <-time.After(connectToPeersTimeout):
if len(d.h.Network().Peers()) > d.minPeers {
continue
}

// reconnect to peers if peer count is low
for p := range peersToTry {
err = d.h.Connect(d.ctx, *p)
if err != nil {
logger.Trace("failed to connect to discovered peer", "peer", p.ID, "err", err)
delete(peersToTry, p)
}
}
case peer := <-peerCh:
if peer.ID == d.h.ID() || peer.ID == "" {
continue
}

logger.Trace("found new peer via DHT", "peer", peer.ID)

// found a peer, try to connect if we need more peers
if len(d.h.Network().Peers()) < d.maxPeers {
err = d.h.Connect(d.ctx, peer)
if err != nil {
logger.Trace("failed to connect to discovered peer", "peer", peer.ID, "err", err)
}
} else {
d.h.Peerstore().AddAddrs(peer.ID, peer.Addrs, peerstore.PermanentAddrTTL)
peersToTry[&peer] = struct{}{}
}
}
}
}()

logger.Debug("DHT discovery started!")
return nil
}
Loading

0 comments on commit f4c79d3

Please sign in to comment.