-
Notifications
You must be signed in to change notification settings - Fork 235
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add basic crawler #663
Add basic crawler #663
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,240 @@ | ||
package crawler | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
logging "github.com/ipfs/go-log" | ||
"github.com/libp2p/go-msgio/protoio" | ||
|
||
pb "github.com/libp2p/go-libp2p-kad-dht/pb" | ||
kbucket "github.com/libp2p/go-libp2p-kbucket" | ||
) | ||
|
||
var logger = logging.Logger("dht-crawler") | ||
|
||
// Crawler connects to hosts in the DHT to track routing tables of peers. | ||
type Crawler struct { | ||
parallelism int | ||
connectTimeout time.Duration | ||
host host.Host | ||
dhtRPC *pb.ProtocolMessenger | ||
} | ||
|
||
// New creates a new Crawler | ||
func New(host host.Host, opts ...Option) (*Crawler, error) { | ||
o := new(options) | ||
if err := defaults(o); err != nil { | ||
return nil, err | ||
} | ||
for _, opt := range opts { | ||
if err := opt(o); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Crawler{ | ||
parallelism: o.parallelism, | ||
connectTimeout: o.connectTimeout, | ||
host: host, | ||
dhtRPC: pm, | ||
}, nil | ||
} | ||
|
||
// MessageSender handles sending wire protocol messages to a given peer | ||
type messageSender struct { | ||
h host.Host | ||
protocols []protocol.ID | ||
timeout time.Duration | ||
} | ||
|
||
// SendRequest sends a peer a message and waits for its response | ||
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { | ||
s, err := ms.h.NewStream(ctx, p, ms.protocols...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
w := protoio.NewDelimitedWriter(s) | ||
if err := w.WriteMsg(pmes); err != nil { | ||
return nil, err | ||
} | ||
|
||
r := protoio.NewDelimitedReader(s, network.MessageSizeMax) | ||
tctx, cancel := context.WithTimeout(ctx, ms.timeout) | ||
defer cancel() | ||
defer func() { _ = s.Close() }() | ||
|
||
msg := new(pb.Message) | ||
if err := ctxReadMsg(tctx, r, msg); err != nil { | ||
_ = s.Reset() | ||
return nil, err | ||
} | ||
|
||
return msg, nil | ||
} | ||
|
||
func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error { | ||
errc := make(chan error, 1) | ||
go func(r protoio.ReadCloser) { | ||
defer close(errc) | ||
err := r.ReadMsg(mes) | ||
errc <- err | ||
}(rc) | ||
|
||
select { | ||
case err := <-errc: | ||
return err | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
|
||
// SendMessage sends a peer a message without waiting on a response | ||
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { | ||
s, err := ms.h.NewStream(ctx, p, ms.protocols...) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { _ = s.Close() }() | ||
|
||
w := protoio.NewDelimitedWriter(s) | ||
return w.WriteMsg(pmes) | ||
} | ||
|
||
// HandleQueryResult is a callback on successful peer query | ||
type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) | ||
|
||
// HandleQueryFail is a callback on failed peer query | ||
type HandleQueryFail func(p peer.ID, err error) | ||
|
||
// Run crawls dht peers from an initial seed of `startingPeers` | ||
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { | ||
jobs := make(chan peer.ID, 1) | ||
results := make(chan *queryResult, 1) | ||
|
||
// Start worker goroutines | ||
var wg sync.WaitGroup | ||
wg.Add(c.parallelism) | ||
for i := 0; i < c.parallelism; i++ { | ||
go func() { | ||
defer wg.Done() | ||
for p := range jobs { | ||
res := c.queryPeer(ctx, p) | ||
results <- res | ||
} | ||
}() | ||
} | ||
|
||
defer wg.Wait() | ||
defer close(jobs) | ||
|
||
toDial := make([]*peer.AddrInfo, 0, len(startingPeers)) | ||
peersSeen := make(map[peer.ID]struct{}) | ||
|
||
for _, ai := range startingPeers { | ||
toDial = append(toDial, ai) | ||
peersSeen[ai.ID] = struct{}{} | ||
extendAddrs := c.host.Peerstore().Addrs(ai.ID) | ||
extendAddrs = append(extendAddrs, ai.Addrs...) | ||
c.host.Peerstore().AddAddrs(ai.ID, extendAddrs, time.Hour) | ||
} | ||
|
||
numQueried := 0 | ||
outstanding := 0 | ||
|
||
for len(toDial) > 0 || outstanding > 0 { | ||
var jobCh chan peer.ID | ||
var nextPeerID peer.ID | ||
if len(toDial) > 0 { | ||
jobCh = jobs | ||
nextPeerID = toDial[0].ID | ||
} | ||
|
||
select { | ||
case res := <-results: | ||
if len(res.data) > 0 { | ||
logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) | ||
rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) | ||
for p, ai := range res.data { | ||
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour) | ||
if _, ok := peersSeen[p]; !ok { | ||
peersSeen[p] = struct{}{} | ||
toDial = append(toDial, ai) | ||
} | ||
rtPeers = append(rtPeers, ai) | ||
} | ||
if handleSuccess != nil { | ||
handleSuccess(res.peer, rtPeers) | ||
} | ||
} else if handleFail != nil { | ||
handleFail(res.peer, res.err) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A bit more scaffolding categorizing errors would be useful. how many attempted connections are timing out? how many are failing to connect? |
||
} | ||
outstanding-- | ||
case jobCh <- nextPeerID: | ||
outstanding++ | ||
numQueried++ | ||
toDial = toDial[1:] | ||
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen)) | ||
} | ||
} | ||
} | ||
|
||
type queryResult struct { | ||
peer peer.ID | ||
data map[peer.ID]*peer.AddrInfo | ||
err error | ||
} | ||
|
||
func (c *Crawler) queryPeer(ctx context.Context, nextPeer peer.ID) *queryResult { | ||
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, c.host.Peerstore(), time.Hour, nil) | ||
if err != nil { | ||
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) | ||
return &queryResult{nextPeer, nil, err} | ||
} | ||
|
||
connCtx, cancel := context.WithTimeout(ctx, c.connectTimeout) | ||
defer cancel() | ||
err = c.host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) | ||
if err != nil { | ||
logger.Infof("could not connect to peer %v: %v", nextPeer, err) | ||
return &queryResult{nextPeer, nil, err} | ||
} | ||
|
||
localPeers := make(map[peer.ID]*peer.AddrInfo) | ||
var retErr error | ||
for cpl := 0; cpl <= 15; cpl++ { | ||
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl)) | ||
if err != nil { | ||
panic(err) | ||
} | ||
peers, err := c.dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) | ||
if err != nil { | ||
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) | ||
retErr = err | ||
break | ||
} | ||
for _, ai := range peers { | ||
if _, ok := localPeers[ai.ID]; !ok { | ||
localPeers[ai.ID] = ai | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. how many peers have a valid dial-able address in their address info? |
||
} | ||
} | ||
} | ||
|
||
if retErr != nil { | ||
return &queryResult{nextPeer, nil, retErr} | ||
} | ||
|
||
return &queryResult{nextPeer, localPeers, retErr} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
package crawler | ||
|
||
import ( | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/protocol" | ||
) | ||
|
||
// Option DHT Crawler option type. | ||
type Option func(*options) error | ||
|
||
type options struct { | ||
protocols []protocol.ID | ||
parallelism int | ||
connectTimeout time.Duration | ||
perMsgTimeout time.Duration | ||
} | ||
|
||
// defaults are the default crawler options. This option will be automatically | ||
// prepended to any options you pass to the crawler constructor. | ||
var defaults = func(o *options) error { | ||
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} | ||
o.parallelism = 1000 | ||
o.connectTimeout = time.Second * 5 | ||
o.perMsgTimeout = time.Second * 5 | ||
|
||
return nil | ||
} | ||
|
||
// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes | ||
func WithProtocols(protocols []protocol.ID) Option { | ||
return func(o *options) error { | ||
o.protocols = append([]protocol.ID{}, protocols...) | ||
return nil | ||
} | ||
} | ||
|
||
// WithParallelism defines the number of queries that can be issued in parallel | ||
func WithParallelism(parallelism int) Option { | ||
return func(o *options) error { | ||
o.parallelism = parallelism | ||
return nil | ||
} | ||
} | ||
|
||
// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed | ||
func WithMsgTimeout(timeout time.Duration) Option { | ||
return func(o *options) error { | ||
o.perMsgTimeout = timeout | ||
return nil | ||
} | ||
} | ||
|
||
// WithConnectTimeout defines the time for peer connection before timing out | ||
func WithConnectTimeout(timeout time.Duration) Option { | ||
return func(o *options) error { | ||
o.connectTimeout = timeout | ||
return nil | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hour seems potentially racy. could imagine this lasting longer than that.