Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(bootstrap): take autobootstrap to completion #403

Merged
merged 6 commits into from
Nov 5, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions dht.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ type IpfsDHT struct {

bootstrapCfg opts.BootstrapConfig

triggerBootstrap chan struct{}
triggerAutoBootstrap bool
triggerBootstrap chan *bootstrapReq
}

// Assert that IPFS assumptions about interfaces aren't broken. These aren't a
Expand Down Expand Up @@ -103,12 +104,14 @@ func New(ctx context.Context, h host.Host, options ...opts.Option) (*IpfsDHT, er

dht.proc.AddChild(dht.providers.Process())
dht.Validator = cfg.Validator
dht.triggerAutoBootstrap = cfg.TriggerAutoBootstrap

if !cfg.Client {
for _, p := range cfg.Protocols {
h.SetStreamHandler(p, dht.handleNewStream)
}
}
dht.startBootstrapping()
return dht, nil
}

Expand Down Expand Up @@ -159,7 +162,7 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
routingTable: rt,
protocols: protocols,
bucketSize: bucketSize,
triggerBootstrap: make(chan struct{}),
triggerBootstrap: make(chan *bootstrapReq),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We no longer need the bootstrapReq, as far as I can tell.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien Indeed. Don't know how I missed it. Thanks.

}

dht.ctx = dht.newContextWithLocalTags(ctx)
Expand All @@ -171,10 +174,10 @@ func makeDHT(ctx context.Context, h host.Host, dstore ds.Batching, protocols []p
// come up with an alternative solution.
// issue is being tracked at https://github.com/libp2p/go-libp2p-kad-dht/issues/387
/*func (dht *IpfsDHT) rtRecovery(proc goprocess.Process) {
writeResp := func(errorChan chan error, err error) {
writeResp := func(errorChan chan error, errChan error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sed?

select {
case <-proc.Closing():
case errorChan <- err:
case errorChan <- errChan:
}
close(errorChan)
}
Expand Down
105 changes: 65 additions & 40 deletions dht_bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"sync"
"time"

process "github.com/jbenet/goprocess"
processctx "github.com/jbenet/goprocess/context"
"github.com/libp2p/go-libp2p-core/routing"
"github.com/multiformats/go-multiaddr"
_ "github.com/multiformats/go-multiaddr-dns"
Expand Down Expand Up @@ -41,52 +43,71 @@ func init() {
}
}

// BootstrapConfig runs cfg.Queries bootstrap queries every cfg.BucketPeriod.
func (dht *IpfsDHT) Bootstrap(ctx context.Context) error {
triggerBootstrapFnc := func() {
logger.Infof("triggerBootstrapFnc: RT only has %d peers which is less than the min threshold of %d, triggering self & bucket bootstrap",
dht.routingTable.Size(), minRTBootstrapThreshold)
type bootstrapReq struct {
errChan chan error
}

if err := dht.selfWalk(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: self walk: error: %s", err)
}
func makeBootstrapReq() *bootstrapReq {
errChan := make(chan error, 1)
return &bootstrapReq{errChan}
}

if err := dht.bootstrapBuckets(ctx); err != nil {
logger.Warningf("triggerBootstrapFnc: bootstrap buckets: error bootstrapping: %s", err)
}
}
// Bootstrap i
func (dht *IpfsDHT) startBootstrapping() error {
// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
dht.proc.Go(func(proc process.Process) {
ctx := processctx.OnClosingContext(proc)

// we should query for self periodically so we can discover closer peers
go func() {
for {
err := dht.selfWalk(ctx)
if err != nil {
logger.Warningf("self walk: error: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.SelfQueryInterval):
case <-ctx.Done():
return
scanInterval := time.NewTicker(dht.bootstrapCfg.BucketPeriod)
defer scanInterval.Stop()

var (
lastSelfWalk time.Time
)

// run bootstrap if option is set
if dht.triggerAutoBootstrap {
if err := dht.doBootstrap(ctx, true, &lastSelfWalk); err != nil {
logger.Warningf("bootstrap error: %s", err)
}
}
}()

// scan the RT table periodically & do a random walk on k-buckets that haven't been queried since the given bucket period
go func() {
for {
err := dht.bootstrapBuckets(ctx)
if err != nil {
logger.Warningf("bootstrap buckets: error bootstrapping: %s", err)
}
select {
case <-time.After(dht.bootstrapCfg.RoutingTableScanInterval):
case <-dht.triggerBootstrap:
triggerBootstrapFnc()
case now := <-scanInterval.C:
walkSelf := now.After(lastSelfWalk.Add(dht.bootstrapCfg.SelfQueryInterval))
if err := dht.doBootstrap(ctx, walkSelf, &lastSelfWalk); err != nil {
logger.Warning("bootstrap error: %s", err)
}
case req := <-dht.triggerBootstrap:
logger.Infof("triggering a bootstrap: RT has %d peers", dht.routingTable.Size())
err := dht.doBootstrap(ctx, true, &lastSelfWalk)
select {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the err channel always has 1 slot, we shouldn't need to do this. We should be able to just run:

req.errChan <- err
close(req.errChan)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien: Oh ofcourse, but we can't rely on clients always sending a buffered channel.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this internal?

Copy link
Contributor Author

@aarshkshah1992 aarshkshah1992 Nov 2, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Stebalien

It really is just a precautionary measure. If in the future, a go-routine within Dht ("client") decides to a trigger a bootstrap & passes an unbuffered channel because of whatever reason(programmer oversight for example), the whole bootstrapping loop comes to a halt unless the "client" reads from it. We don't want to rely on that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Meh, either way. IMO, the idiom of passing respone channels with a size of 1 is common enough that users should always check how a channel is being used before passing an unbuffered response channel.

But either way works.

case req.errChan <- err:
close(req.errChan)
default:
}
case <-ctx.Done():
return
}
}
}()
})

return nil
}

func (dht *IpfsDHT) doBootstrap(ctx context.Context, walkSelf bool, latestSelfWalk *time.Time) error {
if walkSelf {
if err := dht.selfWalk(ctx); err != nil {
return fmt.Errorf("self walk: error: %s", err)
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: unnecessary else.

*latestSelfWalk = time.Now()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might as well just make this a field in the dht struct.

}
}

if err := dht.bootstrapBuckets(ctx); err != nil {
return fmt.Errorf("bootstrap buckets: error bootstrapping: %s", err)
}

return nil
}
Expand Down Expand Up @@ -166,11 +187,15 @@ func (dht *IpfsDHT) selfWalk(ctx context.Context) error {
return err
}

// synchronous bootstrap.
func (dht *IpfsDHT) bootstrapOnce(ctx context.Context) error {
if err := dht.selfWalk(ctx); err != nil {
return errors.Wrap(err, "failed bootstrap while searching for self")
} else {
return dht.bootstrapBuckets(ctx)
// Bootstrap tells the DHT to get into a bootstrapped state.
//
// Note: the context is ignored.
func (dht *IpfsDHT) Bootstrap(_ context.Context) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bootstrap is actually supposed to be async (at the moment). We can change this, but we'll need to propagate the change up the interfaces.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed below, this will now be async.

req := makeBootstrapReq()
select {
case dht.triggerBootstrap <- req:
return <-req.errChan
default:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is going to behave inconsistently. If we're currently bootstrapping, we'll return immediately.

If we're going to block, a new bootstrap call should probably try to "join" an existing one. However, that will add some complexity.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@hsanjuan what is cluster's specific need:

  1. Do you need a way to tell the DHT to bootstrap?
  2. Do you need to be able to wait for the DHT too finish bootstrapping?

Note: queries should still work even if the DHT isn't bootstrapped, as long as you have some peers.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a peer to get start getting "well-connected" after the first contact to other peer(s) [before that the peer would have had no connections). I don't need to wait until the end of the bootstrap round (it can just happen in background)

You may say "well, call Bootstrap()" then and not before, but things are a bit more tricky (the dht may have already been bootstrapped externally and what not). Being able to trigger a bootstrap round on demand (independent from the recurring configured one) is handy here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need a peer to get start getting "well-connected" after the first contact to other peer(s) [before that the peer would have had no connections).

This should now be automatic.

Being able to trigger a bootstrap round on demand (independent from the recurring configured one) is handy here.

SGTM. I agree bootstrap should actually trigger a bootstrap.


@aarshkshah1992 given this, I wouldn't wait. I'd just trigger the bootstrap and move on.

}
return nil
}
21 changes: 18 additions & 3 deletions dht_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ func setupDHT(ctx context.Context, t *testing.T, client bool) *IpfsDHT {
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(client),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -200,7 +201,7 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) {
start := rand.Intn(len(dhts)) // randomize to decrease bias.
for i := range dhts {
dht := dhts[(start+i)%len(dhts)]
dht.bootstrapOnce(ctx)
dht.Bootstrap(ctx)
}
}

Expand Down Expand Up @@ -690,7 +691,18 @@ func TestBootstrap(t *testing.T) {

func TestBootstrapBelowMinRTThreshold(t *testing.T) {
ctx := context.Background()
dhtA := setupDHT(ctx, t, false)

// enable auto bootstrap on A
dhtA, err := New(
ctx,
bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
)
if err != nil {
t.Fatal(err)
}

dhtB := setupDHT(ctx, t, false)
dhtC := setupDHT(ctx, t, false)

Expand Down Expand Up @@ -783,7 +795,7 @@ func TestPeriodicBootstrap(t *testing.T) {

t.Logf("bootstrapping them so they find each other. %d", nDHTs)
for _, dht := range dhts {
go dht.bootstrapOnce(ctx)
go dht.Bootstrap(ctx)
}

// this is async, and we dont know when it's finished with one cycle, so keep checking
Expand Down Expand Up @@ -1416,6 +1428,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}

dhtA, err := New(ctx, bhost.New(swarmt.GenSwarm(t, ctx, swarmt.OptDisableReuseport)), os...)
Expand Down Expand Up @@ -1454,6 +1467,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/esh/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand All @@ -1463,6 +1477,7 @@ func TestGetSetPluggedProtocol(t *testing.T) {
opts.Protocols("/lsr/dht"),
opts.Client(false),
opts.NamespacedValidator("v", blankValidator{}),
opts.DisableAutoBootstrap(),
}...)
if err != nil {
t.Fatal(err)
Expand Down
14 changes: 10 additions & 4 deletions ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/routing"
opts "github.com/libp2p/go-libp2p-kad-dht/opts"

ggio "github.com/gogo/protobuf/io"
u "github.com/ipfs/go-ipfs-util"
Expand All @@ -29,7 +30,8 @@ func TestGetFailures(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -149,7 +151,9 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])

os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -228,7 +232,8 @@ func TestLessThanKResponses(t *testing.T) {
}
hosts := mn.Hosts()

d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -297,7 +302,8 @@ func TestMultipleQueries(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
d, err := New(ctx, hosts[0])
os := []opts.Option{opts.DisableAutoBootstrap()}
d, err := New(ctx, hosts[0], os...)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 4 additions & 4 deletions notif.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ func (nn *netNotifiee) Connected(n network.Network, v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some tests rely on a specific number of peers in the RT/ specific number of results in a query after a peer has finished connecting to other peers. However, this code causes the peer to connect to more peers/adds more peers to the RT than it intended to in the test & that causes the test to fail. So, we need to disable this feature for tests.

select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerBootstrap <- makeBootstrapReq():
default:
}
}
Expand Down Expand Up @@ -80,9 +80,9 @@ func (nn *netNotifiee) testConnection(v network.Conn) {
if dht.host.Network().Connectedness(p) == network.Connected {
bootstrap := dht.routingTable.Size() <= minRTBootstrapThreshold
dht.Update(dht.Context(), p)
if bootstrap {
if bootstrap && dht.triggerAutoBootstrap {
select {
case dht.triggerBootstrap <- struct{}{}:
case dht.triggerBootstrap <- makeBootstrapReq():
default:
}
}
Expand Down
33 changes: 20 additions & 13 deletions opts/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,20 @@ var (

// BootstrapConfig specifies parameters used for bootstrapping the DHT.
type BootstrapConfig struct {
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
RoutingTableScanInterval time.Duration // how often to scan the RT for k-buckets that haven't been queried since the given period
SelfQueryInterval time.Duration // how often to query for self
BucketPeriod time.Duration // how long to wait for a k-bucket to be queried before doing a random walk on it
Timeout time.Duration // how long to wait for a bootstrap query to run
SelfQueryInterval time.Duration // how often to query for self
}

// Options is a structure containing all the options that can be used when constructing a DHT.
type Options struct {
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
Datastore ds.Batching
Validator record.Validator
Client bool
Protocols []protocol.ID
BucketSize int
BootstrapConfig BootstrapConfig
TriggerAutoBootstrap bool
}

// Apply applies the given options to this Option
Expand Down Expand Up @@ -63,14 +63,13 @@ var Defaults = func(o *Options) error {
// same as that mentioned in the kad dht paper
BucketPeriod: 1 * time.Hour,

// since the default bucket period is 1 hour, a scan interval of 30 minutes sounds reasonable
RoutingTableScanInterval: 30 * time.Minute,

Timeout: 10 * time.Second,

SelfQueryInterval: 1 * time.Hour,
}

o.TriggerAutoBootstrap = true

return nil
}

Expand Down Expand Up @@ -149,3 +148,11 @@ func BucketSize(bucketSize int) Option {
return nil
}
}

// DisableAutoBootstrap disables auto bootstrap on the dht
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So, all this option actually does is skip the initial bootstrap. We'll still auto-bootstrap.

Given that, really, we only care about this in tests (as far as I know), we could:

  1. Have some kind of hidden option that prevents us from starting the bootstrap loop.
  2. Manually call startBootstrap.

(We could also not start bootstrapping till the user calls Bootstrap but one of my goals here was to remove that requirement so the DHT "just works" out of the box)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This option does two things actually:

  1. Skip the initial bootstrap
  2. Disables the "bootstrap if RT size is below minimum threshold" feature as that causes some tests to fail as I've explained in the comment here

I agree that we only need to do this for tests for now. But, what's the harm in having a visible option to disable this should someone wish to run the Dht without this side effect ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I see. Yeah, that makes sense (although it's still a bit confusing as it doesn't completely disable automatic bootstrap). But some good documentation should be enough.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm. Actually, I think this is still too confusing and too weird of an option to expose in a public API. I can't think of a single (real) use-case for it and I can think of plenty of ways in which it can be used incorrectly.

If we're going to have a public option to disable automatic bootstrapping, it should completely disable automatic bootstrapping. We can probably just have this option modify the bootstrap config to set the bootstrap intervals to 0 (and then special-case 0 to disable the ticker). That way, we'll only bootstrap on request.

func DisableAutoBootstrap() Option {
return func(o *Options) error {
o.TriggerAutoBootstrap = false
return nil
}
}