-
Notifications
You must be signed in to change notification settings - Fork 233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature/correct bootstrapping #384
Feature/correct bootstrapping #384
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks!
dht.go
Outdated
// reset the timer for the k-bucket we just searched in ONLY if there was no error | ||
// so that we can retry during the next bootstrap | ||
bucket := dht.routingTable.BucketForPeer(id) | ||
bucket.ResetLastQueriedAt(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, I'm sorry, I think I got this wrong. I think we have to update buckets based on the key we're querying (and we can do it whether or not the query succeeds, really), not based on whether or not we've contacted a peer in that bucket. Otherwise, we won't fill that bucket, we'll just add a single node.
We'll probably need to do this in several places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stebalien Ah... I see exactly what you mean. Any lookup for a key in a bucket will result in attempts to add peers closer to that key to the bucket and that is essentially a lookup/query on the bucket.
Now, a key for a bucket can be either a peer or content since we combine them into the same keyspace for k-buckets. So, we should refresh a bucket when we lookup either of them.
I've updated the following functions in routing.go to refresh a bucket when we lookup a peer/content in it.
- PutValue
- SearchValue (This covers GetValue as well)
- GetValues
- Provide
- FindProvidersAsync (This covers FindProviders as well)
- FindPeer
- FindPeersConnectedToPeer
Let me know what you think.
dht_bootstrap.go
Outdated
// Note there is a tradeoff between the bootstrap period and the | ||
// number of queries. We could support a higher period with less | ||
// queries. | ||
// BootstrapConfig specifies parameters used for bootstrapping the DHT. | ||
type BootstrapConfig struct { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
cc @hsanjuan, @whyrusleeping, @frrist:
This will be an API breaking change, modifying the BootstrapConfig structure. This change is necessary to bring the DHT's bootstrapping logic in-line with the kademlia paper (properly refreshing each bucket) instead of just making random queries in the dark.
Thoughts? Objections? Questions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for the ping, no objection from me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good to me.
@Stebalien Have made the changes as per your review. Please take a look. Thanks a lot ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Almost there. Thanks for sticking with this.
routing.go
Outdated
@@ -74,6 +74,11 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key string, value []byte, opts | |||
return err | |||
} | |||
|
|||
// refresh the k-bucket containing this key | |||
defer func() { | |||
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just move this into GetClosestPeers
. Also, we should probably check if the query succeeded.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(or, if it didn't succeed, that it lasted more than... 30 seconds?)
routing.go
Outdated
@@ -157,6 +163,11 @@ func (dht *IpfsDHT) SearchValue(ctx context.Context, key string, opts ...routing | |||
responsesNeeded = getQuorum(&cfg, -1) | |||
} | |||
|
|||
// refresh the k-bucket containing this key | |||
defer func() { | |||
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ditto on making sure we don't cancel the request.
routing.go
Outdated
// refresh the k-bucket containing this key | ||
defer func() { | ||
dht.routingTable.BucketForID(kb.ConvertKey(key.KeyString())).ResetRefreshedAt(time.Now()) | ||
}() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Push into GetClosestPeers
.
I've moved the bucket refresh to the 'peripheries' for all the calls :
Also, we now refresh ONLY when the call is 'successful'. Please take a look & let me know what you think ! |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks correct. I'm now going to run this on a machine for a few days to see if it works as expected.
Thanks a lot. Do let me know if I can help in some way. |
Take a look at https://github.com/libp2p/go-libp2p-kad-dht/tree/feature/correct-bootstrapping-debug. A couple of things I've noticed so far:
That should be enough to not introduce regressions. Try playing around with that branch. |
I think the next step here is #387 (hopefully that issue makes sense). |
Based on @Stebalien's comments above & #387, here's what we need to accomplish now: TODO
Questions
I'm not sure what you mean here. Our bootstrap query in
Do we need this ? The kad paper states that a bucket should usually get queried during the normal flow. In the adverse scenario that it doesn't, we need to step in. Hence, the period of 1 hour to wait & watch. Even in the current implementation, though we do query every 5 minutes, we query randomly & aren't really targeting a specific empty bucket. Though, I do see the value in doing it 12 times an hour. |
e1b5f75
to
75ece93
Compare
@Stebalien Have finished the TODO tasks that do not depend on 383. Will push @raulk to review 383 & get us there sooner. Please take a look & let me know what you think. Also, please take a look at the questions. Will add the corresponding tasks to TODO if required. |
Ah. Ok:
|
You're probably right. However, we do need to handle the case where we completely disconnect from the network. Maybe some kind of threshold? To back this up: I've been running this patch on a stable peer and it has a pretty good routing table. |
I've added the code to recover from an empty RT in the latest commit. Please take a look. The only thing left there it to call the seeder in #383 to add the default bootstrap peers to the RT. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the bootstrapBucket
function and related material looks great. i'm generally quite confused by the "recovery" channel you've built. what purpose does it serve? could it be simplified? it seems like the real work is being done by the two goroutines that occasionally bootstrap each bucket and "self walk".
dht.go
Outdated
case <-ctx.Done(): | ||
return | ||
case <-req.errorChan: | ||
// TODO Do we need to do anything here ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's log error here
for { | ||
select { | ||
case req := <-dht.rtRecoveryChan: | ||
if dht.routingTable.Size() == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check seems redundant, as both processes that send recovery requests down the channel check before sending
edit: i'm actually a bit confused by this code generally. i get the idea of serializing recovery requests, but this will work its way through them in a pretty tight loop, so i could see multiple recovery request being dispatched without error in a row.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, it seems like all this process does is serialize checks of the routing table, but does nothing to actually bootstrap it. it's up to the func that actually creates a rtRecoveryReq
to do any work, and i see a few instances where this is essentially just an info log.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this check seems redundant, as both processes that send recovery requests down the channel check before sending
edit: i'm actually a bit confused by this code generally. i get the idea of serializing recovery requests, but this will work its way through them in a pretty tight loop, so i could see multiple recovery request being dispatched without error in a row.
The check is to ensure that if multiple callers 'simultaneously' observe that the RT has become empty & send a request to the channel, only one request results in the RT being seeded & the remaining become no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in fact, it seems like all this process does is serialize checks of the routing table, but does nothing to actually bootstrap it. it's up to the func that actually creates a
rtRecoveryReq
to do any work, and i see a few instances where this is essentially just an info log.
Apologies if this wasn't clear. Yes, we need to add a call to the default seeder implementation in #383 to seed the RT with the 'default bootstrap peers'/'known peers'. There is a TODO for this in the code(line 214) & in the TODO list on this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aarshkshah1992 totally! it really felt like something was missing! definitely missed the comment. with that in mind, the rest are just small comments/docs improvements. generally looking good :)
if dht.routingTable.Size() == 0 { | ||
logger.Infof("rt recovery proc: received request with reqID=%s, RT is empty. initiating recovery", req.id) | ||
// TODO Call Seeder with default bootstrap peers here once #383 is merged | ||
if dht.routingTable.Size() > 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unreachable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Answered above
dht_bootstrap.go
Outdated
// checks on the config so that callers aren't oblivious. | ||
if cfg.Queries <= 0 { | ||
return fmt.Errorf("invalid number of queries: %d", cfg.Queries) | ||
seedRTIfEmpty := func(tag string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this not essentially a no-op? just logging whether or not the table is full
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The recovery proc will seed the RT later as explained here
The main purpose of the "rtRecovery" channel/proc is to recover from a state where the Routing Table becomes empty. We can not even 'bootstrap' the buckets/'self walk' from such a state as they wouldn't even know where to start their 'search'/'walk' from. The only way to recover the RT from this state is to use the known 'bootstrap' peers/default peers to seed the RT(#295 ). The PR in #383 has a 'default seeder' implementation that does this and the purpose of the recovery proc is to call that seeder. However, I am waiting for 383 to be merged before I can insert that call(hence the item in the TODO list & in the code here). There are two places where we pass a request to the recovery proc:
Let me know what you think. |
Thank you so much for your time. I've answered your questions on the RT recovery mechanism. Please take a look and let me know what you think. The other changes you've suggested look good to me. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looking good! once that dependent PR(s) lands, i think we're good to go. thanks so much for helping out!
dht.go
Outdated
rt.PeerRemoved = func(p peer.ID) { | ||
cmgr.UntagPeer(p, "kbucket") | ||
go func(rtRecoveryChan chan *rtRecoveryReq) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid launching a goroutine every time we drop a peer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we don't, we dealock on the Routing Table lock because the rt.RemovePeer is waiting for this callback to return and rt.Size() also needs the same lock.
dht.go
Outdated
rt.PeerRemoved = func(p peer.ID) { | ||
cmgr.UntagPeer(p, "kbucket") | ||
go func(rtRecoveryChan chan *rtRecoveryReq) { | ||
if rt.Size() == 0 { | ||
req := mkRtRecoveryReq() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to send a request object with a UUID?
dht.go
Outdated
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-req.errorChan: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no need to wait here (might as well fire and forget).
dht.go
Outdated
rt.PeerRemoved = func(p peer.ID) { | ||
cmgr.UntagPeer(p, "kbucket") | ||
go func(rtRecoveryChan chan *rtRecoveryReq) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a simpler solution:
First, add a channel (protected with a lock) to signal when bootstrapping is done. Tasks that need to wait for peers in the routing table can wait for this channel to be closed.
On peer add:
- If the recovery channel is closed, return.
- Otherwise, take the lock.
- Check 1 again.
- Close the channel (we've recovered).
- Trigger a bootstrap run.
On peer remove:
- Check if the channel is closed. If not, return as we're currently bootstrapping.
- Check to see if the table is empty. If not, return as we have peers.
- Take the lock, defer a release of the lock.
- Re-run steps 1-2 with the lock held.
- Replace the channel with a new open channel.
- Launch a recovery goroutine.
In the recovery goroutine, keep recovering, sleeping, recovering, etc. until the recovery channel is closed.
Note: this is just one approach. The key parts are:
- We launch fewer goroutines.
- We stop the recovery process when we've actually recovered (i.e., the routing table has entries).
- We do very little work unless we actually have work to do.
My concerns with the current approach are:
- We launch a goroutine every time we drop a peer.
- We consider the recovery process "done" when it finishes instead of when the routing table gains a peer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@Stebalien Have linked this comment in #387. This particular task will now be done as part of that issue.
e20d579
to
f49447d
Compare
Have updated the PR.
Please let me know if we need any more changes to this before merging. Thanks ! :) |
@Stebalien Why was the routing table able to cross the flooded road ? Please take a look this PR when you can :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is beautiful and correct. Thank you so much for your persistence and patience!
2) seed RT if empty before starting bootstrap incase 1 hasn't fired 3) pass bootstrap config as option while creating Dht 4) replace all bootstrap function with 1 function
1) on connecting to a new peer -> trigger self & bucket bootstrap if RT size goes below thereshold 2) accept formatting & doc suggestions in the review 3) remove RT recovery code for now -> will address in a separate PR once libp2p#383 goes in changes as per review
f49447d
to
00fffba
Compare
@Stebalien Thank you so much for all your help & time :) |
var wg sync.WaitGroup | ||
errChan := make(chan error) | ||
|
||
for bucketID, bucket := range buckets { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should stagger these lookups, instead of launching them all at once. This can explode at the dialer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@raulk Please can you explain this in context of #383 (comment) ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be missing something, because in my head this logic here is not related to #383. I'll explain what I mean anyway. If we have 10 buckets to refresh, this logic performs 10 "find peer" queries, which multiplies into as many as 200 simultaneous dials (worst case scenario), as permitted by the dial queue. I don't think that will scale well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pinging @Stebalien ^^
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wouldn't expect 10 concurrent requests to be an issue. The dialer overload issues came from the fact that the gateways are making many concurrent DHT requests per user request. That means 1000s of DHT requests.
We could stagger them when running them in the background but:
- I'm not sure if it's really worth it.
- We definitely want to fire them all off as fast as possible on start.
- We only need to bootstrap when we're not under heavy DHT load. When we're under load, the buckets will all be "fresh" and this code will be a no-op.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10 concurrent requests x 20 max. pending dials in dial queue = potentially 200 outstanding dials, which is larger than the default FD limit we ship with (160 dials). In the worst case scenario, the DHT could max out the dialer.
} | ||
|
||
func (dht *IpfsDHT) BootstrapSelf(ctx context.Context) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we OK simply dropping methods from the public interface, @Stebalien? We usually frown upon this. Couldn't these methods be turned into shims, and marked Deprecated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We try to avoid silently breaking APIs or breaking APIs on a whim (e.g., preference, style, etc.). In thi scase:
- This function was recently added.
- We completely revamped the bootstrap system anyways and had to make breaking changes to the bootstrap config (Feature/correct bootstrapping #384 (comment)).
On the other hand, you're right. This change wasn't completely necessary and probably should have gone through a deprecation period.
@@ -103,6 +103,9 @@ func (dht *IpfsDHT) GetClosestPeers(ctx context.Context, key string) (<-chan pee | |||
} | |||
|
|||
if res != nil && res.queriedSet != nil { | |||
// refresh the k-bucket containing this key as the query was successful | |||
dht.routingTable.BucketForID(kb.ConvertKey(key)).ResetRefreshedAt(time.Now()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this behave well with the "catch all" bucket?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
type BootstrapConfig struct { | ||
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it | ||
Timeout time.Duration // how long to wait for a bootstrap query to run | ||
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't we refresh every BucketPeriod
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great question. I can not think of why. @Stebalien, would you happen to remember why we agreed on a RoutingTableScanInterval
of 30 minutes here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't remember. I believe what we really want is for the scan interval to be some fraction of the bucket period. I'd be fine getting rid of this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Really, we technically don't need a scan interval. We could just take the minimum of all bucket update times and add the bucket period to that. That would have the added benefit of spreading bucket polls out over time instead of batching them together.
Hello, I just saw that I already fought against removal of this method last time. It is very difficult to have peers Join a network in full swing without a way to trigger a dht bootstrap right away when that happens. Has an alternative way to do this been added? How can I make a peer start discovering other peers at a certain point without waiting for a bootstrap round to trigger? cc @Stebalien |
@hsanjuan A call to |
@aarshkshah1992 it seems calling Bootstrap() several times will launch additional "periodic bootstrap" goroutines? In this case my peers have already called Bootstrap(), and they need to trigger a single bootstrap round later. |
@Stebalien Please can you also take a look at some of the concerns/questions by @raulk on this PR when you get time ? |
Thanks @aarshkshah1992 , I appreciate it! |
For issue #375 & #387
K-bucket work at libp2p/go-libp2p-kbucket#38