-
Notifications
You must be signed in to change notification settings - Fork 235
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
1 parent
03d4b62
commit a2d41e5
Showing
2 changed files
with
275 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,224 @@ | ||
package crawler | ||
|
||
import ( | ||
"context" | ||
"sync" | ||
"time" | ||
|
||
"github.com/libp2p/go-libp2p-core/host" | ||
"github.com/libp2p/go-libp2p-core/network" | ||
"github.com/libp2p/go-libp2p-core/peer" | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
|
||
logging "github.com/ipfs/go-log" | ||
"github.com/libp2p/go-msgio/protoio" | ||
|
||
pb "github.com/libp2p/go-libp2p-kad-dht/pb" | ||
kbucket "github.com/libp2p/go-libp2p-kbucket" | ||
) | ||
|
||
var logger = logging.Logger("dht-crawler") | ||
|
||
type Crawler struct { | ||
parallelism int | ||
host host.Host | ||
dhtRPC *pb.ProtocolMessenger | ||
} | ||
|
||
func New(host host.Host, opts ...Option) (*Crawler, error) { | ||
o := new(options) | ||
if err := defaults(o); err != nil { | ||
return nil, err | ||
} | ||
for _, opt := range opts { | ||
if err := opt(o); err != nil { | ||
return nil, err | ||
} | ||
} | ||
|
||
pm, err := pb.NewProtocolMessenger(&messageSender{h: host, protocols: o.protocols, timeout: o.perMsgTimeout}) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
return &Crawler{ | ||
parallelism: o.parallelism, | ||
host: host, | ||
dhtRPC: pm, | ||
}, nil | ||
} | ||
|
||
// MessageSender handles sending wire protocol messages to a given peer | ||
type messageSender struct { | ||
h host.Host | ||
protocols []protocol.ID | ||
timeout time.Duration | ||
} | ||
|
||
// SendRequest sends a peer a message and waits for its response | ||
func (ms *messageSender) SendRequest(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { | ||
s, err := ms.h.NewStream(ctx, p, ms.protocols...) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
w := protoio.NewDelimitedWriter(s) | ||
if err := w.WriteMsg(pmes); err != nil { | ||
return nil, err | ||
} | ||
|
||
r := protoio.NewDelimitedReader(s, network.MessageSizeMax) | ||
tctx, cancel := context.WithTimeout(ctx, ms.timeout) | ||
defer cancel() | ||
defer func() { _ = s.Close() }() | ||
|
||
msg := new(pb.Message) | ||
if err := ctxReadMsg(tctx, r, msg); err != nil { | ||
_ = s.Reset() | ||
return nil, err | ||
} | ||
|
||
return msg, nil | ||
} | ||
|
||
func ctxReadMsg(ctx context.Context, rc protoio.ReadCloser, mes *pb.Message) error { | ||
errc := make(chan error, 1) | ||
go func(r protoio.ReadCloser) { | ||
defer close(errc) | ||
err := r.ReadMsg(mes) | ||
errc <- err | ||
}(rc) | ||
|
||
select { | ||
case err := <-errc: | ||
return err | ||
case <-ctx.Done(): | ||
return ctx.Err() | ||
} | ||
} | ||
|
||
// SendMessage sends a peer a message without waiting on a response | ||
func (ms *messageSender) SendMessage(ctx context.Context, p peer.ID, pmes *pb.Message) error { | ||
s, err := ms.h.NewStream(ctx, p, ms.protocols...) | ||
if err != nil { | ||
return err | ||
} | ||
defer func() { _ = s.Close() }() | ||
|
||
w := protoio.NewDelimitedWriter(s) | ||
return w.WriteMsg(pmes) | ||
} | ||
|
||
type HandleQueryResult func(p peer.ID, rtPeers []*peer.AddrInfo) | ||
type HandleQueryFail func(p peer.ID, err error) | ||
|
||
func (c *Crawler) Run(ctx context.Context, startingPeers []*peer.AddrInfo, handleSuccess HandleQueryResult, handleFail HandleQueryFail) { | ||
jobs := make(chan peer.ID, 1) | ||
results := make(chan *queryResult, 1) | ||
|
||
// Start worker goroutines | ||
var wg sync.WaitGroup | ||
wg.Add(c.parallelism) | ||
for i := 0; i < c.parallelism; i++ { | ||
go func() { | ||
defer wg.Done() | ||
for p := range jobs { | ||
res := queryPeer(ctx, c.host, c.dhtRPC, p) | ||
results <- res | ||
} | ||
}() | ||
} | ||
|
||
defer wg.Wait() | ||
defer close(jobs) | ||
|
||
toDial := make([]*peer.AddrInfo, 0, 1000) | ||
peersSeen := make(map[peer.ID]struct{}) | ||
|
||
for _, ai := range startingPeers { | ||
toDial = append(toDial, ai) | ||
peersSeen[ai.ID] = struct{}{} | ||
} | ||
|
||
numQueried := 0 | ||
outstanding := 0 | ||
|
||
for len(toDial) > 0 || outstanding > 0 { | ||
var jobCh chan peer.ID | ||
var nextPeerID peer.ID | ||
if len(toDial) > 0 { | ||
jobCh = jobs | ||
nextPeerID = toDial[0].ID | ||
} | ||
|
||
select { | ||
case res := <-results: | ||
if len(res.data) > 0 { | ||
logger.Debugf("peer %v had %d peers", res.peer, len(res.data)) | ||
rtPeers := make([]*peer.AddrInfo, 0, len(res.data)) | ||
for p, ai := range res.data { | ||
c.host.Peerstore().AddAddrs(p, ai.Addrs, time.Hour) | ||
if _, ok := peersSeen[p]; !ok { | ||
peersSeen[p] = struct{}{} | ||
toDial = append(toDial, ai) | ||
} | ||
rtPeers = append(rtPeers, ai) | ||
} | ||
if handleSuccess != nil { | ||
handleSuccess(res.peer, rtPeers) | ||
} | ||
} else if handleFail != nil { | ||
handleFail(res.peer, res.err) | ||
} | ||
outstanding-- | ||
case jobCh <- nextPeerID: | ||
outstanding++ | ||
numQueried++ | ||
toDial = toDial[1:] | ||
logger.Debugf("starting %d out of %d", numQueried, len(peersSeen)) | ||
} | ||
} | ||
} | ||
|
||
type queryResult struct { | ||
peer peer.ID | ||
data map[peer.ID]*peer.AddrInfo | ||
err error | ||
} | ||
|
||
func queryPeer(ctx context.Context, host host.Host, dhtRPC *pb.ProtocolMessenger, nextPeer peer.ID) *queryResult { | ||
tmpRT, err := kbucket.NewRoutingTable(20, kbucket.ConvertPeerID(nextPeer), time.Hour, host.Peerstore(), time.Hour, nil) | ||
if err != nil { | ||
logger.Errorf("error creating rt for peer %v : %v", nextPeer, err) | ||
return &queryResult{nextPeer, nil, err} | ||
} | ||
|
||
connCtx, cancel := context.WithTimeout(ctx, time.Second*5) | ||
defer cancel() | ||
err = host.Connect(connCtx, peer.AddrInfo{ID: nextPeer}) | ||
if err != nil { | ||
logger.Errorf("could not connect to peer %v: %v", nextPeer, err) | ||
return &queryResult{nextPeer, nil, err} | ||
} | ||
|
||
localPeers := make(map[peer.ID]*peer.AddrInfo) | ||
var retErr error | ||
for cpl := 0; cpl <= 15; cpl++ { | ||
generatePeer, err := tmpRT.GenRandPeerID(uint(cpl)) | ||
if err != nil { | ||
panic(err) | ||
} | ||
peers, err := dhtRPC.GetClosestPeers(ctx, nextPeer, generatePeer) | ||
if err != nil { | ||
logger.Debugf("error finding data on peer %v with cpl %d : %v", nextPeer, cpl, err) | ||
retErr = err | ||
break | ||
} | ||
for _, ai := range peers { | ||
if _, ok := localPeers[ai.ID]; !ok { | ||
localPeers[ai.ID] = ai | ||
} | ||
} | ||
} | ||
return &queryResult{nextPeer, localPeers, retErr} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
package crawler | ||
|
||
import ( | ||
"github.com/libp2p/go-libp2p-core/protocol" | ||
"time" | ||
) | ||
|
||
// Option DHT Crawler option type. | ||
type Option func(*options) error | ||
|
||
type options struct { | ||
protocols []protocol.ID | ||
parallelism int | ||
crawlInterval time.Duration | ||
perMsgTimeout time.Duration | ||
} | ||
|
||
// defaults are the default crawler options. This option will be automatically | ||
// prepended to any options you pass to the crawler constructor. | ||
var defaults = func(o *options) error { | ||
o.protocols = []protocol.ID{"/ipfs/kad/1.0.0"} | ||
o.parallelism = 1000 | ||
o.crawlInterval = time.Hour | ||
o.perMsgTimeout = time.Second * 5 | ||
|
||
return nil | ||
} | ||
|
||
// WithProtocols defines the ordered set of protocols the crawler will use to talk to other nodes | ||
func WithProtocols(protocols []protocol.ID) Option { | ||
return func(o *options) error { | ||
o.protocols = append([]protocol.ID{}, protocols...) | ||
return nil | ||
} | ||
} | ||
|
||
// WithParallelism defines the number of queries that can be issued in parallel | ||
func WithParallelism(parallelism int) Option { | ||
return func(o *options) error { | ||
o.parallelism = parallelism | ||
return nil | ||
} | ||
} | ||
|
||
// WithMsgTimeout defines the amount of time a single DHT message is allowed to take before it's deemed failed | ||
func WithMsgTimeout(timeout time.Duration) Option { | ||
return func(o *options) error { | ||
o.perMsgTimeout = timeout | ||
return nil | ||
} | ||
} |