-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/correct bootstrapping #384
Merged
Stebalien
merged 9 commits into
libp2p:master
from
aarshkshah1992:feature/correct-bootstrapping
Oct 11, 2019
Merged
Changes from all commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
d53dfd6
changed bootstrapping logic
aarshkshah1992 f7353aa
reset timer on bucket
aarshkshah1992 5329454
removed cfg.Queries param from bootstrap
aarshkshah1992 585d672
refresh a bucket whenever we lookup a key in it
aarshkshah1992 1454529
dep: update go-libp2p-kbucket
Stebalien 232357e
chore: smaller diff
Stebalien 5da1634
refresh bucket only when query is successful
aarshkshah1992 f4630f6
1) seed RT whenever it becomes empty
aarshkshah1992 00fffba
Update dht_bootstrap.go
aarshkshah1992 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -21,26 +21,22 @@ import ( | |
"github.com/libp2p/go-libp2p-kad-dht/metrics" | ||
opts "github.com/libp2p/go-libp2p-kad-dht/opts" | ||
pb "github.com/libp2p/go-libp2p-kad-dht/pb" | ||
providers "github.com/libp2p/go-libp2p-kad-dht/providers" | ||
"github.com/libp2p/go-libp2p-kad-dht/providers" | ||
|
||
proto "github.com/gogo/protobuf/proto" | ||
cid "github.com/ipfs/go-cid" | ||
"github.com/gogo/protobuf/proto" | ||
"github.com/ipfs/go-cid" | ||
ds "github.com/ipfs/go-datastore" | ||
logging "github.com/ipfs/go-log" | ||
goprocess "github.com/jbenet/goprocess" | ||
goprocessctx "github.com/jbenet/goprocess/context" | ||
"github.com/jbenet/goprocess" | ||
"github.com/jbenet/goprocess/context" | ||
kb "github.com/libp2p/go-libp2p-kbucket" | ||
record "github.com/libp2p/go-libp2p-record" | ||
"github.com/libp2p/go-libp2p-record" | ||
recpb "github.com/libp2p/go-libp2p-record/pb" | ||
base32 "github.com/whyrusleeping/base32" | ||
"github.com/whyrusleeping/base32" | ||
) | ||
|
||
var logger = logging.Logger("dht") | ||
|
||
// NumBootstrapQueries defines the number of random dht queries to do to | ||
// collect members of the routing table. | ||
const NumBootstrapQueries = 5 | ||
|
||
// IpfsDHT is an implementation of Kademlia with S/Kademlia modifications. | ||
// It is used to implement the base Routing module. | ||
type IpfsDHT struct { | ||
|
@@ -70,6 +66,10 @@ type IpfsDHT struct { | |
protocols []protocol.ID // DHT protocols | ||
|
||
bucketSize int | ||
|
||
bootstrapCfg opts.BootstrapConfig | ||
|
||
triggerBootstrap chan struct{} | ||
} | ||
|
||
// Assert that IPFS assumptions about interfaces aren't broken. These aren't a | ||
|
@@ -90,6 +90,7 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er | |
return nil, err | ||
} | ||
dht := makeDHT(ctx, h, cfg.Datastore, cfg.Protocols, cfg.BucketSize) | ||
dht.bootstrapCfg = cfg.BootstrapConfig | ||
|
||
// register for network notifs. | ||
dht.host.Network().Notify((*netNotifiee)(dht)) | ||
|
@@ -136,34 +137,71 @@ func NewDHTClient(ctx context.Context, h host.Host, dstore ds.Batching) *IpfsDHT | |
|
||
func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []protocol.ID, bucketSize int) *IpfsDHT { | ||
rt := kb.NewRoutingTable(bucketSize, kb.ConvertPeerID(h.ID()), time.Minute, h.Peerstore()) | ||
|
||
cmgr := h.ConnManager() | ||
|
||
rt.PeerAdded = func(p peer.ID) { | ||
cmgr.TagPeer(p, "kbucket", 5) | ||
} | ||
|
||
rt.PeerRemoved = func(p peer.ID) { | ||
cmgr.UntagPeer(p, "kbucket") | ||
} | ||
|
||
dht := &IpfsDHT{ | ||
datastore: dstore, | ||
self: h.ID(), | ||
peerstore: h.Peerstore(), | ||
host: h, | ||
strmap: make(map[peer.ID]*messageSender), | ||
ctx: ctx, | ||
providers: providers.NewProviderManager(ctx, h.ID(), dstore), | ||
birth: time.Now(), | ||
routingTable: rt, | ||
protocols: protocols, | ||
bucketSize: bucketSize, | ||
datastore: dstore, | ||
self: h.ID(), | ||
peerstore: h.Peerstore(), | ||
host: h, | ||
strmap: make(map[peer.ID]*messageSender), | ||
ctx: ctx, | ||
providers: providers.NewProviderManager(ctx, h.ID(), dstore), | ||
birth: time.Now(), | ||
routingTable: rt, | ||
protocols: protocols, | ||
bucketSize: bucketSize, | ||
triggerBootstrap: make(chan struct{}), | ||
} | ||
|
||
dht.ctx = dht.newContextWithLocalTags(ctx) | ||
|
||
return dht | ||
} | ||
|
||
// TODO Implement RT seeding as described in https://github.com/libp2p/go-libp2p-kad-dht/pull/384#discussion_r320994340 OR | ||
// come up with an alternative solution. | ||
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387 | ||
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) { | ||
writeResp := func(errorChan chan error, err error) { | ||
select { | ||
case <-proc.Closing(): | ||
case errorChan <- err: | ||
} | ||
close(errorChan) | ||
} | ||
|
||
for { | ||
select { | ||
case req := <-dht.rtRecoveryChan: | ||
if dht.routingTable.Size() == 0 { | ||
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) | ||
// TODO Call Seeder with default bootstrap peers here once #383 is merged | ||
if dht.routingTable.Size() > 0 { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. unreachable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Answered above |
||
logger.Infof("rt recovery proc: successfully recovered RT for reqID=%s, RT size is now %d", req.id, dht.routingTable.Size()) | ||
go writeResp(req.errorChan, nil) | ||
} else { | ||
logger.Errorf("rt recovery proc: failed to recover RT for reqID=%s, RT is still empty", req.id) | ||
go writeResp(req.errorChan, errors.New("RT empty after seed attempt")) | ||
} | ||
} else { | ||
logger.Infof("rt recovery proc: RT is not empty, no need to act on request with reqID=%s", req.id) | ||
go writeResp(req.errorChan, nil) | ||
} | ||
case <-proc.Closing(): | ||
return | ||
} | ||
} | ||
}*/ | ||
|
||
// putValueToPeer stores the given key/value pair at the peer 'p' | ||
func (dht *IpfsDHT) putValueToPeer(ctx context.Context, p peer.ID, rec *recpb.Record) error { | ||
|
||
|
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check seems redundant, as both processes that send recovery requests down the channel check before sending
edit: i'm actually a bit confused by this code generally. i get the idea of serializing recovery requests, but this will work its way through them in a pretty tight loop, so i could see multiple recovery request being dispatched without error in a row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, it seems like all this process does is serialize checks of the routing table, but does nothing to actually bootstrap it. it's up to the func that actually creates a
rtRecoveryReq
to do any work, and i see a few instances where this is essentially just an info log.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check is to ensure that if multiple callers 'simultaneously' observe that the RT has become empty & send a request to the channel, only one request results in the RT being seeded & the remaining become no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Apologies if this wasn't clear. Yes, we need to add a call to the default seeder implementation in #383 to seed the RT with the 'default bootstrap peers'/'known peers'. There is a TODO for this in the code(line 214) & in the TODO list on this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 totally! it really felt like something was missing! definitely missed the comment. with that in mind, the rest are just small comments/docs improvements. generally looking good :)