Skip to content

Commit

Permalink
Crawler based DHT client (#709)
Browse files Browse the repository at this point in the history
* fullrt: add experimental FullRT DHT client

The FullRT client periodically crawls the network to fill its local routing table and uses that to perform queries. It supports performing many provides or puts together taking advantage of efficiencies of grouping the DHT operations by where they will occur in the Kademlia space. Additionally, it has a more tunable function for determining when a query is complete that does not require waiting on any individual peer (which could be offline) to respond. This client is experimental and its exposed interfaces should be expected to change and break over time.

* crawler: starting peers with addresses and peers found during a crawl have their addresses extended. logging improved

* dht: move IpfsDHT options to the internal package. Make a breaking change to the filter interfaces to support more DHT implementations

* dht: GetClosestPeers now returns a slice of peers instead of a channel of peers since we have to wait for the query to complete to return the closest peers anyway

* dht: the subscriberNotifiee has been refactored to work more independently of the underlying message sender implementation
  • Loading branch information
aschmahmann authored May 14, 2021
1 parent 5932122 commit eac1b5e
Show file tree
Hide file tree
Showing 22 changed files with 1,855 additions and 398 deletions.
26 changes: 20 additions & 6 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,8 @@ type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)
// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

const dialAddressExtendDur time.Duration = time.Minute * 30

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
Expand All @@ -140,15 +142,27 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, len(startingPeers))
var toDial []*peer.AddrInfo
peersSeen := make(map[peer.ID]struct{})

numSkipped := 0
for _, ai := range startingPeers {
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
if len(ai.Addrs) > 0 {
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, dialAddressExtendDur)
}
if len(extendAddrs) == 0 {
numSkipped++
continue
}

toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
extendAddrs := c.host.Peerstore().Addrs(ai.ID)
extendAddrs = append(extendAddrs, ai.Addrs...)
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour)
}

if numSkipped > 0 {
logger.Infof("%d starting peers were skipped due to lack of addresses. Starting crawl with %d peers", numSkipped, len(toDial))
}

numQueried := 0
Expand All @@ -168,7 +182,7 @@ func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handl
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour)
c.host.Peerstore().AddAddrs(p, ai.Addrs, dialAddressExtendDur)
if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
Expand Down Expand Up @@ -208,7 +222,7 @@ func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Infof("could not connect to peer %v: %v", nextPeer, err)
logger.Debugf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

Expand Down
28 changes: 0 additions & 28 deletions ctx_mutex.go

This file was deleted.

86 changes: 42 additions & 44 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ import (
"github.com/libp2p/go-libp2p-core/routing"

"github.com/libp2p/go-libp2p-kad-dht/internal"
dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
"github.com/libp2p/go-libp2p-kad-dht/internal/net"
"github.com/libp2p/go-libp2p-kad-dht/metrics"
pb "github.com/libp2p/go-libp2p-kad-dht/pb"
"github.com/libp2p/go-libp2p-kad-dht/providers"
Expand Down Expand Up @@ -96,7 +98,7 @@ type IpfsDHT struct {
proc goprocess.Process

protoMessenger *pb.ProtocolMessenger
msgSender *messageSenderImpl
msgSender pb.MessageSender

plk sync.Mutex

Expand Down Expand Up @@ -163,15 +165,15 @@ var (
// If the Routing Table has more than "minRTRefreshThreshold" peers, we consider a peer as a Routing Table candidate ONLY when
// we successfully get a query response from it OR if it send us a query.
func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error) {
var cfg config
if err := cfg.apply(append([]Option{defaults}, options...)...); err != nil {
var cfg dhtcfg.Config
if err := cfg.Apply(append([]Option{dhtcfg.Defaults}, options...)...); err != nil {
return nil, err
}
if err := cfg.applyFallbacks(h); err != nil {
if err := cfg.ApplyFallbacks(h); err != nil {
return nil, err
}

if err := cfg.validate(); err != nil {
if err := cfg.Validate(); err != nil {
return nil, err
}

Expand All @@ -180,34 +182,30 @@ func New(ctx context.Context, h host.Host, options ...Option) (*IpfsDHT, error)
return nil, fmt.Errorf("failed to create DHT, err=%s", err)
}

dht.autoRefresh = cfg.routingTable.autoRefresh
dht.autoRefresh = cfg.RoutingTable.AutoRefresh

dht.maxRecordAge = cfg.maxRecordAge
dht.enableProviders = cfg.enableProviders
dht.enableValues = cfg.enableValues
dht.disableFixLowPeers = cfg.disableFixLowPeers
dht.maxRecordAge = cfg.MaxRecordAge
dht.enableProviders = cfg.EnableProviders
dht.enableValues = cfg.EnableValues
dht.disableFixLowPeers = cfg.DisableFixLowPeers

dht.Validator = cfg.validator
dht.msgSender = &messageSenderImpl{
host: h,
strmap: make(map[peer.ID]*peerMessageSender),
protocols: dht.protocols,
}
dht.Validator = cfg.Validator
dht.msgSender = net.NewMessageSenderImpl(h, dht.protocols)
dht.protoMessenger, err = pb.NewProtocolMessenger(dht.msgSender, pb.WithValidator(dht.Validator))
if err != nil {
return nil, err
}

dht.testAddressUpdateProcessing = cfg.testAddressUpdateProcessing
dht.testAddressUpdateProcessing = cfg.TestAddressUpdateProcessing

dht.auto = cfg.mode
switch cfg.mode {
dht.auto = cfg.Mode
switch cfg.Mode {
case ModeAuto, ModeClient:
dht.mode = modeClient
case ModeAutoServer, ModeServer:
dht.mode = modeServer
default:
return nil, fmt.Errorf("invalid dht mode %d", cfg.mode)
return nil, fmt.Errorf("invalid dht mode %d", cfg.Mode)
}

if dht.mode == modeServer {
Expand Down Expand Up @@ -265,20 +263,20 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT
return dht
}

func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
func makeDHT(ctx context.Context, h host.Host, cfg dhtcfg.Config) (*IpfsDHT, error) {
var protocols, serverProtocols []protocol.ID

v1proto := cfg.protocolPrefix + kad1
v1proto := cfg.ProtocolPrefix + kad1

if cfg.v1ProtocolOverride != "" {
v1proto = cfg.v1ProtocolOverride
if cfg.V1ProtocolOverride != "" {
v1proto = cfg.V1ProtocolOverride
}

protocols = []protocol.ID{v1proto}
serverProtocols = []protocol.ID{v1proto}

dht := &IpfsDHT{
datastore: cfg.datastore,
datastore: cfg.Datastore,
self: h.ID(),
selfKey: kb.ConvertPeerID(h.ID()),
peerstore: h.Peerstore(),
Expand All @@ -287,12 +285,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
protocols: protocols,
protocolsStrs: protocol.ConvertToStrings(protocols),
serverProtocols: serverProtocols,
bucketSize: cfg.bucketSize,
alpha: cfg.concurrency,
beta: cfg.resiliency,
queryPeerFilter: cfg.queryPeerFilter,
routingTablePeerFilter: cfg.routingTable.peerFilter,
rtPeerDiversityFilter: cfg.routingTable.diversityFilter,
bucketSize: cfg.BucketSize,
alpha: cfg.Concurrency,
beta: cfg.Resiliency,
queryPeerFilter: cfg.QueryPeerFilter,
routingTablePeerFilter: cfg.RoutingTable.PeerFilter,
rtPeerDiversityFilter: cfg.RoutingTable.DiversityFilter,

fixLowPeersChan: make(chan struct{}, 1),

Expand All @@ -306,12 +304,12 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// query a peer as part of our refresh cycle.
// To grok the Math Wizardy that produced these exact equations, please be patient as a document explaining it will
// be published soon.
if cfg.concurrency < cfg.bucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.bucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.concurrency) / float64(cfg.bucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.routingTable.refreshInterval))
if cfg.Concurrency < cfg.BucketSize { // (alpha < K)
l1 := math.Log(float64(1) / float64(cfg.BucketSize)) //(Log(1/K))
l2 := math.Log(float64(1) - (float64(cfg.Concurrency) / float64(cfg.BucketSize))) // Log(1 - (alpha / K))
maxLastSuccessfulOutboundThreshold = time.Duration(l1 / l2 * float64(cfg.RoutingTable.RefreshInterval))
} else {
maxLastSuccessfulOutboundThreshold = cfg.routingTable.refreshInterval
maxLastSuccessfulOutboundThreshold = cfg.RoutingTable.RefreshInterval
}

// construct routing table
Expand All @@ -321,7 +319,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return nil, fmt.Errorf("failed to construct routing table,err=%s", err)
}
dht.routingTable = rt
dht.bootstrapPeers = cfg.bootstrapPeers
dht.bootstrapPeers = cfg.BootstrapPeers

// rt refresh manager
rtRefresh, err := makeRtRefreshManager(dht, cfg, maxLastSuccessfulOutboundThreshold)
Expand All @@ -340,7 +338,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
// the DHT context should be done when the process is closed
dht.ctx = goprocessctx.WithProcessClosing(ctxTags, dht.proc)

pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.datastore, cfg.providersOptions...)
pm, err := providers.NewProviderManager(dht.ctx, h.ID(), cfg.Datastore, cfg.ProvidersOptions...)
if err != nil {
return nil, err
}
Expand All @@ -351,7 +349,7 @@ func makeDHT(ctx context.Context, h host.Host, cfg config) (*IpfsDHT, error) {
return dht, nil
}

func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
func makeRtRefreshManager(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*rtrefresh.RtRefreshManager, error) {
keyGenFnc := func(cpl uint) (string, error) {
p, err := dht.routingTable.GenRandPeerID(cpl)
return string(p), err
Expand All @@ -363,18 +361,18 @@ func makeRtRefreshManager(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThr
}

r, err := rtrefresh.NewRtRefreshManager(
dht.host, dht.routingTable, cfg.routingTable.autoRefresh,
dht.host, dht.routingTable, cfg.RoutingTable.AutoRefresh,
keyGenFnc,
queryFnc,
cfg.routingTable.refreshQueryTimeout,
cfg.routingTable.refreshInterval,
cfg.RoutingTable.RefreshQueryTimeout,
cfg.RoutingTable.RefreshInterval,
maxLastSuccessfulOutboundThreshold,
dht.refreshFinishedCh)

return r, err
}

func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
func makeRoutingTable(dht *IpfsDHT, cfg dhtcfg.Config, maxLastSuccessfulOutboundThreshold time.Duration) (*kb.RoutingTable, error) {
// make a Routing Table Diversity Filter
var filter *peerdiversity.Filter
if dht.rtPeerDiversityFilter != nil {
Expand All @@ -389,7 +387,7 @@ func makeRoutingTable(dht *IpfsDHT, cfg config, maxLastSuccessfulOutboundThresho
filter = df
}

rt, err := kb.NewRoutingTable(cfg.bucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
rt, err := kb.NewRoutingTable(cfg.BucketSize, dht.selfKey, time.Minute, dht.host.Peerstore(), maxLastSuccessfulOutboundThreshold, filter)
if err != nil {
return nil, err
}
Expand Down
35 changes: 27 additions & 8 deletions dht_filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"

Expand All @@ -14,14 +15,16 @@ import (

ma "github.com/multiformats/go-multiaddr"
manet "github.com/multiformats/go-multiaddr/net"

dhtcfg "github.com/libp2p/go-libp2p-kad-dht/internal/config"
)

// QueryFilterFunc is a filter applied when considering peers to dial when querying
type QueryFilterFunc func(dht *IpfsDHT, ai peer.AddrInfo) bool
type QueryFilterFunc = dhtcfg.QueryFilterFunc

// RouteTableFilterFunc is a filter applied when considering connections to keep in
// the local route table.
type RouteTableFilterFunc func(dht *IpfsDHT, conns []network.Conn) bool
type RouteTableFilterFunc = dhtcfg.RouteTableFilterFunc

var publicCIDR6 = "2000::/3"
var public6 *net.IPNet
Expand Down Expand Up @@ -59,7 +62,7 @@ func isPrivateAddr(a ma.Multiaddr) bool {
}

// PublicQueryFilter returns true if the peer is suspected of being publicly accessible
func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
func PublicQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
if len(ai.Addrs) == 0 {
return false
}
Expand All @@ -73,18 +76,25 @@ func PublicQueryFilter(_ *IpfsDHT, ai peer.AddrInfo) bool {
return hasPublicAddr
}

type hasHost interface {
Host() host.Host
}

var _ QueryFilterFunc = PublicQueryFilter

// PublicRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a public network
func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PublicRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)

conns := d.Host().Network().ConnsToPeer(p)
if len(conns) == 0 {
return false
}

// Do we have a public address for this peer?
id := conns[0].RemotePeer()
known := dht.peerstore.PeerInfo(id)
known := d.Host().Peerstore().PeerInfo(id)
for _, a := range known.Addrs {
if !isRelayAddr(a) && isPublicAddr(a) {
return true
Expand All @@ -97,7 +107,7 @@ func PublicRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
var _ RouteTableFilterFunc = PublicRoutingTableFilter

// PrivateQueryFilter doens't currently restrict which peers we are willing to query from the local DHT.
func PrivateQueryFilter(dht *IpfsDHT, ai peer.AddrInfo) bool {
func PrivateQueryFilter(_ interface{}, ai peer.AddrInfo) bool {
return len(ai.Addrs) > 0
}

Expand Down Expand Up @@ -137,10 +147,19 @@ func getCachedRouter() routing.Router {

// PrivateRoutingTableFilter allows a peer to be added to the routing table if the connections to that peer indicate
// that it is on a private network
func PrivateRoutingTableFilter(dht *IpfsDHT, conns []network.Conn) bool {
func PrivateRoutingTableFilter(dht interface{}, p peer.ID) bool {
d := dht.(hasHost)
conns := d.Host().Network().ConnsToPeer(p)
return privRTFilter(d, conns)
}

func privRTFilter(dht interface{}, conns []network.Conn) bool {
d := dht.(hasHost)
h := d.Host()

router := getCachedRouter()
myAdvertisedIPs := make([]net.IP, 0)
for _, a := range dht.Host().Addrs() {
for _, a := range h.Addrs() {
if isPublicAddr(a) && !isRelayAddr(a) {
ip, err := manet.ToIP(a)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion dht_filters_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestFilterCaching(t *testing.T) {
d := setupDHT(ctx, t, true)

remote, _ := manet.FromIP(net.IPv4(8, 8, 8, 8))
if PrivateRoutingTableFilter(d, []network.Conn{&mockConn{
if privRTFilter(d, []network.Conn{&mockConn{
local: d.Host().Peerstore().PeerInfo(d.Host().ID()),
remote: peer.AddrInfo{ID: "", Addrs: []ma.Multiaddr{remote}},
}}) {
Expand Down
Loading

0 comments on commit eac1b5e

Please sign in to comment.