Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add basic crawler #663

Merged
merged 4 commits into from
Apr 22, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
232 changes: 232 additions & 0 deletions crawler/crawler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
package crawler

import (
"context"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"

logging "github.com/ipfs/go-log"
"github.com/libp2p/go-msgio/protoio"

pb "github.com/libp2p/go-libp2p-kad-dht/pb"
kbucket "github.com/libp2p/go-libp2p-kbucket"
)

var logger = logging.Logger("dht-crawler")

// Crawler connects to hosts in the DHT to track routing tables of peers.
type Crawler struct {
parallelism int
connectTimeout time.Duration
host host.Host
dhtRPC *pb.ProtocolMessenger
}

// New creates a new Crawler
func New(host host.Host, opts ...Option) (*Crawler, error) {
o := new(options)
if err := defaults(o); err != nil {
return nil, err
}
for _, opt := range opts {
if err := opt(o); err != nil {
return nil, err
}
}

pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout})
if err != nil {
return nil, err
}

return &Crawler{
parallelism: o.parallelism,
connectTimeout: o.connectTimeout,
host: host,
dhtRPC: pm,
}, nil
}

// MessageSender handles sending wire protocol messages to a given peer
type messageSender struct {
h host.Host
protocols []protocol.ID
timeout time.Duration
}

// SendRequest sends a peer a message and waits for its response
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return nil, err
}

w := protoio.NewDelimitedWriter(s)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}

r := protoio.NewDelimitedReader(s, network.MessageSizeMax)
tctx, cancel := context.WithTimeout(ctx, ms.timeout)
defer cancel()
defer func() { _ = s.Close() }()

msg := new(pb.Message)
if err := ctxReadMsg(tctx, r, msg); err != nil {
_ = s.Reset()
return nil, err
}

return msg, nil
}

func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error {
errc := make(chan error, 1)
go func(r protoio.ReadCloser) {
defer close(errc)
err := r.ReadMsg(mes)
errc <- err
}(rc)

select {
case err := <-errc:
return err
case <-ctx.Done():
return ctx.Err()
}
}

// SendMessage sends a peer a message without waiting on a response
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error {
s, err := ms.h.NewStream(ctx, p, ms.protocols...)
if err != nil {
return err
}
defer func() { _ = s.Close() }()

w := protoio.NewDelimitedWriter(s)
return w.WriteMsg(pmes)
}

// HandleQueryResult is a callback on successful peer query
type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo)

// HandleQueryFail is a callback on failed peer query
type HandleQueryFail func(p peer.ID, err error)

// Run crawls dht peers from an initial seed of `startingPeers`
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) {
jobs := make(chan peer.ID, 1)
results := make(chan *queryResult, 1)

// Start worker goroutines
var wg sync.WaitGroup
wg.Add(c.parallelism)
for i := 0; i < c.parallelism; i++ {
go func() {
defer wg.Done()
for p := range jobs {
res := c.queryPeer(ctx, p)
results <- res
}
}()
}

defer wg.Wait()
defer close(jobs)

toDial := make([]*peer.AddrInfo, 0, 1000)
willscott marked this conversation as resolved.
Show resolved Hide resolved
peersSeen := make(map[peer.ID]struct{})

for _, ai := range startingPeers {
toDial = append(toDial, ai)
peersSeen[ai.ID] = struct{}{}
}

numQueried := 0
outstanding := 0

for len(toDial) > 0 || outstanding > 0 {
var jobCh chan peer.ID
var nextPeerID peer.ID
if len(toDial) > 0 {
jobCh = jobs
nextPeerID = toDial[0].ID
}

select {
case res := <-results:
if len(res.data) > 0 {
logger.Debugf("peer %v had %d peers", res.peer, len(res.data))
rtPeers := make([]*peer.AddrInfo, 0, len(res.data))
for p, ai := range res.data {
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hour seems potentially racy. could imagine this lasting longer than that.

if _, ok := peersSeen[p]; !ok {
peersSeen[p] = struct{}{}
toDial = append(toDial, ai)
}
rtPeers = append(rtPeers, ai)
}
if handleSuccess != nil {
handleSuccess(res.peer, rtPeers)
}
} else if handleFail != nil {
handleFail(res.peer, res.err)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit more scaffolding categorizing errors would be useful. how many attempted connections are timing out? how many are failing to connect?

}
outstanding--
case jobCh <- nextPeerID:
outstanding++
numQueried++
toDial = toDial[1:]
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen))
}
}
}

type queryResult struct {
peer peer.ID
data map[peer.ID]*peer.AddrInfo
err error
}

func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult {
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil)
if err != nil {
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout)
defer cancel()
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer})
if err != nil {
logger.Errorf("could not connect to peer %v: %v", nextPeer, err)
return &queryResult{nextPeer, nil, err}
}

localPeers := make(map[peer.ID]*peer.AddrInfo)
var retErr error
for cpl := 0; cpl <= 15; cpl++ {
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl))
if err != nil {
panic(err)
}
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer)
if err != nil {
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err)
retErr = err
break
}
for _, ai := range peers {
if _, ok := localPeers[ai.ID]; !ok {
localPeers[ai.ID] = ai
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how many peers have a valid dial-able address in their address info?

}
}
}
return &queryResult{nextPeer, localPeers, retErr}
}
62 changes: 62 additions & 0 deletions crawler/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package crawler

import (
"time"

"github.com/libp2p/go-libp2p-core/protocol"
)

// Option DHT Crawler option type.
type Option func(*options) error

type options struct {
protocols []protocol.ID
parallelism int
crawlInterval time.Duration
connectTimeout time.Duration
perMsgTimeout time.Duration
}

// defaults are the default crawler options. This option will be automatically
// prepended to any options you pass to the crawler constructor.
var defaults = func(o *options) error {
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"}
o.parallelism = 1000
o.crawlInterval = time.Hour
willscott marked this conversation as resolved.
Show resolved Hide resolved
o.connectTimeout = time.Second * 5
o.perMsgTimeout = time.Second * 5

return nil
}

// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes
func WithProtocols(protocols []protocol.ID) Option {
return func(o *options) error {
o.protocols = append([]protocol.ID{}, protocols...)
return nil
}
}

// WithParallelism defines the number of queries that can be issued in parallel
func WithParallelism(parallelism int) Option {
return func(o *options) error {
o.parallelism = parallelism
return nil
}
}

// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed
func WithMsgTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.perMsgTimeout = timeout
return nil
}
}

// WithConnectTimeout defines the time for peer connection before timing out
func WithConnectTimeout(timeout time.Duration) Option {
return func(o *options) error {
o.connectTimeout = timeout
return nil
}
}