Skip to content

Commit

Permalink
Merge pull request #69 from iand/issue-67
Browse files Browse the repository at this point in the history
Fix hang in BackoffDiscovery.FindPeers when requesting limit lower than number of peers available
  • Loading branch information
marten-seemann authored Jun 14, 2021
2 parents 39a7c78 + dd7188b commit 90403a4
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 13 deletions.
21 changes: 13 additions & 8 deletions p2p/discovery/backoff/backoffcache.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,15 @@ func WithBackoffDiscoveryReturnedChannelSize(size int) BackoffDiscoveryOption {
}

type backoffCache struct {
// strat is assigned on creation and not written to
strat BackoffStrategy

mux sync.Mutex // guards writes to all following fields
nextDiscover time.Time
prevPeers map[peer.ID]peer.AddrInfo

peers map[peer.ID]peer.AddrInfo
sendingChs map[chan peer.AddrInfo]int

ongoing bool
strat BackoffStrategy
mux sync.Mutex
peers map[peer.ID]peer.AddrInfo
sendingChs map[chan peer.AddrInfo]int
ongoing bool
}

func (d *BackoffDiscovery) Advertise(ctx context.Context, ns string, opts ...discovery.Option) (time.Duration, error) {
Expand Down Expand Up @@ -112,6 +112,7 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
sendingChs: make(map[chan peer.AddrInfo]int),
strat: d.stratFactory(),
}

d.peerCacheMux.Lock()
c, ok = d.peerCache[ns]

Expand Down Expand Up @@ -139,7 +140,11 @@ func (d *BackoffDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis
}
pch := make(chan peer.AddrInfo, chLen)
for _, ai := range c.prevPeers {
pch <- ai
select {
case pch <- ai:
default:
// skip if we have asked for a lower limit than the number of peers known
}
}
close(pch)
return pch, nil
Expand Down
69 changes: 64 additions & 5 deletions p2p/discovery/backoff/backoffcache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,12 @@ func (d *delayedDiscovery) FindPeers(ctx context.Context, ns string, opts ...dis

func assertNumPeers(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, count int) {
t.Helper()
peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(10))
assertNumPeersWithLimit(t, ctx, d, ns, 10, count)
}

func assertNumPeersWithLimit(t *testing.T, ctx context.Context, d discovery.Discovery, ns string, limit int, count int) {
t.Helper()
peerCh, err := d.FindPeers(ctx, ns, discovery.Limit(limit))
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -115,7 +120,7 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {
assertNumPeers(t, ctx, dCache, ns, 1)

// wait a little to make sure the extra request doesn't modify the backoff
time.Sleep(time.Millisecond * 50) //50 < 100
time.Sleep(time.Millisecond * 50) // 50 < 100
assertNumPeers(t, ctx, dCache, ns, 1)

// wait for backoff to expire and check if we increase it
Expand All @@ -124,15 +129,15 @@ func TestBackoffDiscoveryMultipleBackoff(t *testing.T) {

d2.Advertise(ctx, ns, discovery.TTL(time.Millisecond*400))

time.Sleep(time.Millisecond * 150) //150 < 250
time.Sleep(time.Millisecond * 150) // 150 < 250
assertNumPeers(t, ctx, dCache, ns, 1)

time.Sleep(time.Millisecond * 150) //150 + 150 > 250
time.Sleep(time.Millisecond * 150) // 150 + 150 > 250
assertNumPeers(t, ctx, dCache, ns, 2)

// check that the backoff has been reset
// also checks that we can decrease our peer count (i.e. not just growing a set)
time.Sleep(time.Millisecond * 110) //110 > 100, also 150+150+110>400
time.Sleep(time.Millisecond * 110) // 110 > 100, also 150+150+110>400
assertNumPeers(t, ctx, dCache, ns, 1)
}

Expand Down Expand Up @@ -193,3 +198,57 @@ func TestBackoffDiscoverySimultaneousQuery(t *testing.T) {
t.Fatalf("Channels returned %d, %d elements instead of %d", szCh1, szCh2, n)
}
}

func TestBackoffDiscoveryCacheCapacity(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

discServer := newDiscoveryServer()

// Testing with n larger than most internal buffer sizes (32)
n := 40
advertisers := make([]discovery.Discovery, n)

for i := 0; i < n; i++ {
h := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
advertisers[i] = &mockDiscoveryClient{h, discServer}
}

h1 := bhost.NewBlankHost(swarmt.GenSwarm(t, ctx))
d1 := &mockDiscoveryClient{h1, discServer}

const discoveryInterval = time.Millisecond * 100

bkf := NewFixedBackoff(discoveryInterval)
dCache, err := NewBackoffDiscovery(d1, bkf)
if err != nil {
t.Fatal(err)
}

const ns = "test"

// add speers
for i := 0; i < n; i++ {
advertisers[i].Advertise(ctx, ns, discovery.TTL(time.Hour))
}

// Request all peers, all will be present
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)

// Request peers with a lower limit
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)

// Wait a little time but don't allow cache to expire
time.Sleep(discoveryInterval / 10)

// Request peers with a lower limit this time using cache
// Here we are testing that the cache logic does not block when there are more peers known than the limit requested
// See https://github.com/libp2p/go-libp2p-discovery/issues/67
assertNumPeersWithLimit(t, ctx, dCache, ns, n-1, n-1)

// Wait for next discovery so next request will bypass cache
time.Sleep(time.Millisecond * 100)

// Ask for all peers again
assertNumPeersWithLimit(t, ctx, dCache, ns, n, n)
}

0 comments on commit 90403a4

Please sign in to comment.