Skip to content

Commit

Permalink
Refresh Cpl's, not buckets
Browse files Browse the repository at this point in the history
  • Loading branch information
aarshkshah1992 committed Dec 12, 2019
1 parent d5af829 commit a58d596
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 108 deletions.
19 changes: 0 additions & 19 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package kbucket
import (
"container/list"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)
Expand All @@ -14,32 +13,14 @@ import (
type Bucket struct {
lk sync.RWMutex
list *list.List

lastRefreshedAtLk sync.RWMutex
lastRefreshedAt time.Time // the last time we looked up a key in the bucket
}

func newBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
b.lastRefreshedAt = time.Now()
return b
}

func (b *Bucket) RefreshedAt() time.Time {
b.lastRefreshedAtLk.RLock()
defer b.lastRefreshedAtLk.RUnlock()

return b.lastRefreshedAt
}

func (b *Bucket) ResetRefreshedAt(newTime time.Time) {
b.lastRefreshedAtLk.Lock()
defer b.lastRefreshedAtLk.Unlock()

b.lastRefreshedAt = newTime
}

func (b *Bucket) Peers() []peer.ID {
b.lk.RLock()
defer b.lk.RUnlock()
Expand Down
111 changes: 55 additions & 56 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ var log = logging.Logger("table")
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

// MaxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'MaxCplForRefresh' bit prefixes for now.
var MaxCplForRefresh uint = 15

type CplRefresh struct {
Cpl uint
LastRefreshAt time.Time
}

// RoutingTable defines the routing table.
type RoutingTable struct {
// ID of the local peer
Expand All @@ -39,6 +48,9 @@ type RoutingTable struct {
Buckets []*Bucket
bucketsize int

cplRefreshLk sync.RWMutex
cplRefreshedAt map[uint]time.Time

// notification functions
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)
Expand All @@ -47,84 +59,71 @@ type RoutingTable struct {
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
rt := &RoutingTable{
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
cplRefreshedAt: make(map[uint]time.Time),
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
}

return rt
}

// GetAllBuckets is safe to call as rt.Buckets is append-only
// caller SHOULD NOT modify the returned slice
func (rt *RoutingTable) GetAllBuckets() []*Bucket {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
return rt.Buckets
}
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []*CplRefresh {
rt.cplRefreshLk.RLock()
defer rt.cplRefreshLk.RUnlock()

// GenRandPeerID generates a random peerID in bucket=bucketID
func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID {
if bucketID < 0 {
panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID))
}
rt.tabLock.RLock()
bucketLen := len(rt.Buckets)
rt.tabLock.RUnlock()
var cpls []*CplRefresh

var targetCpl uint
if bucketID > (bucketLen - 1) {
targetCpl = uint(bucketLen) - 1
} else {
targetCpl = uint(bucketID)
for c, t := range rt.cplRefreshedAt {
cpls = append(cpls, &CplRefresh{c, t})
}

// We can only handle upto 16 bit prefixes
if targetCpl > 16 {
targetCpl = 16
return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
if targetCpl > MaxCplForRefresh {
return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", MaxCplForRefresh)
}

var targetPrefix uint16
localPrefix := binary.BigEndian.Uint16(rt.local)
if targetCpl < 16 {
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())

// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask)
} else {
targetPrefix = localPrefix
}

// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())

// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)

// Convert to a known peer ID.
key := keyPrefixMap[targetPrefix]
id := [34]byte{mh.SHA2_256, 32}
binary.BigEndian.PutUint32(id[2:], key)
return peer.ID(id[:])
return peer.ID(id[:]), nil
}

// Returns the bucket for a given ID
// should NOT modify the peer list on the returned bucket
func (rt *RoutingTable) BucketForID(id ID) *Bucket {
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
cpl := CommonPrefixLen(id, rt.local)

rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
if uint(cpl) > MaxCplForRefresh {
return
}

return rt.Buckets[bucketID]
rt.cplRefreshLk.Lock()
defer rt.cplRefreshLk.Unlock()

rt.cplRefreshedAt[uint(cpl)] = newTime
}

// Update adds or moves the given peer to the front of its respective bucket
Expand Down
68 changes: 35 additions & 33 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)

// Test basic features of the bucket struct
Expand Down Expand Up @@ -53,48 +54,49 @@ func TestBucket(t *testing.T) {
func TestGenRandPeerID(t *testing.T) {
t.Parallel()

nBuckets := 21
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)

// create nBuckets
for i := 0; i < nBuckets; i++ {
for {
if p := test.RandPeerIDFatal(t); CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == i {
rt.Update(p)
break
}
}
// generate above MaxCplForRefresh fails
p, err := rt.GenRandPeerID(MaxCplForRefresh + 1)
require.Error(t, err)
require.Empty(t, p)

// test generate rand peer ID
for cpl := uint(0); cpl <= MaxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)

require.True(t, uint(CommonPrefixLen(ConvertPeerID(peerID), rt.local)) == cpl, "failed for cpl=%d", cpl)
}
}

// test bucket for peer
peers := rt.ListPeers()
for _, p := range peers {
b := rt.BucketForID(ConvertPeerID(p))
if !b.Has(p) {
t.Fatalf("bucket should have peers %s", p.String())
}
func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()

local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)

// add cpl's for tracking
for cpl := uint(0); cpl < MaxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now())
}

// test generate rand peer ID
for bucketID := 0; bucketID < nBuckets; bucketID++ {
peerID := rt.GenRandPeerID(bucketID)

// for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID
if bucketID < 16 {
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) != bucketID {
t.Fatalf("cpl should be %d for bucket %d but got %d, generated peerID is %s", bucketID, bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
} else {
// from bucketID 16 onwards, CPL should be ATLEAST 16
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) < 16 {
t.Fatalf("cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s", bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
}
// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(MaxCplForRefresh))
actualCpls := make(map[uint]struct{})
for i := 0; i < len(trackedCpls); i++ {
actualCpls[trackedCpls[i].Cpl] = struct{}{}
}

for i := uint(0); i < MaxCplForRefresh; i++ {
_, ok := actualCpls[i]
require.True(t, ok, "tracked cpl's should have cpl %d", i)
}
}

Expand Down

0 comments on commit a58d596

Please sign in to comment.