Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refresh Cpl's, not buckets #46

Merged
merged 3 commits into from
Dec 16, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 0 additions & 19 deletions bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ package kbucket
import (
"container/list"
"sync"
"time"

"github.com/libp2p/go-libp2p-core/peer"
)
Expand All @@ -14,32 +13,14 @@ import (
type Bucket struct {
lk sync.RWMutex
list *list.List

lastRefreshedAtLk sync.RWMutex
lastRefreshedAt time.Time // the last time we looked up a key in the bucket
}

func newBucket() *Bucket {
b := new(Bucket)
b.list = list.New()
b.lastRefreshedAt = time.Now()
return b
}

func (b *Bucket) RefreshedAt() time.Time {
b.lastRefreshedAtLk.RLock()
defer b.lastRefreshedAtLk.RUnlock()

return b.lastRefreshedAt
}

func (b *Bucket) ResetRefreshedAt(newTime time.Time) {
b.lastRefreshedAtLk.Lock()
defer b.lastRefreshedAtLk.Unlock()

b.lastRefreshedAt = newTime
}

func (b *Bucket) Peers() []peer.ID {
b.lk.RLock()
defer b.lk.RUnlock()
Expand Down
113 changes: 57 additions & 56 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,17 @@ var log = logging.Logger("table")
var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high")
var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity")

// maxCplForRefresh is the maximum cpl we support for refresh.
// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now.
const maxCplForRefresh uint = 15

// CplRefresh contains a CPL(common prefix length) with the host & the last time
// we refreshed that cpl/searched for an ID which has that cpl with the host.
type CplRefresh struct {
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
Cpl uint
LastRefreshAt time.Time
}

// RoutingTable defines the routing table.
type RoutingTable struct {
// ID of the local peer
Expand All @@ -39,6 +50,9 @@ type RoutingTable struct {
Buckets []*Bucket
bucketsize int

cplRefreshLk sync.RWMutex
cplRefreshedAt map[uint]time.Time

// notification functions
PeerRemoved func(peer.ID)
PeerAdded func(peer.ID)
Expand All @@ -47,84 +61,71 @@ type RoutingTable struct {
// NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance.
func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable {
rt := &RoutingTable{
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
Buckets: []*Bucket{newBucket()},
bucketsize: bucketsize,
local: localID,
maxLatency: latency,
metrics: m,
cplRefreshedAt: make(map[uint]time.Time),
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
PeerRemoved: func(peer.ID) {},
PeerAdded: func(peer.ID) {},
}

return rt
}

// GetAllBuckets is safe to call as rt.Buckets is append-only
// caller SHOULD NOT modify the returned slice
func (rt *RoutingTable) GetAllBuckets() []*Bucket {
rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
return rt.Buckets
}
// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh.
// Caller is free to modify the returned slice as it is a defensive copy.
func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh {
rt.cplRefreshLk.RLock()
defer rt.cplRefreshLk.RUnlock()

// GenRandPeerID generates a random peerID in bucket=bucketID
func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID {
if bucketID < 0 {
panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID))
}
rt.tabLock.RLock()
bucketLen := len(rt.Buckets)
rt.tabLock.RUnlock()
cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt))

var targetCpl uint
if bucketID > (bucketLen - 1) {
targetCpl = uint(bucketLen) - 1
} else {
targetCpl = uint(bucketID)
for c, t := range rt.cplRefreshedAt {
cpls = append(cpls, CplRefresh{c, t})
}

// We can only handle upto 16 bit prefixes
if targetCpl > 16 {
targetCpl = 16
return cpls
}

// GenRandPeerID generates a random peerID for a given Cpl
func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) {
if targetCpl > maxCplForRefresh {
return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh)
}

var targetPrefix uint16
localPrefix := binary.BigEndian.Uint16(rt.local)
if targetCpl < 16 {
// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())

// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask)
} else {
targetPrefix = localPrefix
}

// For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B.
// Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L
// to our randomly generated prefix.
toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl)
randPrefix := uint16(rand.Uint32())

// Combine the toggled local prefix and the random bits at the correct offset
// such that ONLY the first `targetCpl` bits match the local ID.
mask := (^uint16(0)) << (16 - (targetCpl + 1))
targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask)

// Convert to a known peer ID.
key := keyPrefixMap[targetPrefix]
id := [34]byte{mh.SHA2_256, 32}
binary.BigEndian.PutUint32(id[2:], key)
return peer.ID(id[:])
return peer.ID(id[:]), nil
}

// Returns the bucket for a given ID
// should NOT modify the peer list on the returned bucket
func (rt *RoutingTable) BucketForID(id ID) *Bucket {
// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID.
func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) {
cpl := CommonPrefixLen(id, rt.local)

rt.tabLock.RLock()
defer rt.tabLock.RUnlock()
bucketID := cpl
if bucketID >= len(rt.Buckets) {
bucketID = len(rt.Buckets) - 1
if uint(cpl) > maxCplForRefresh {
return
}

return rt.Buckets[bucketID]
rt.cplRefreshLk.Lock()
defer rt.cplRefreshLk.Unlock()

rt.cplRefreshedAt[uint(cpl)] = newTime
}

// Update adds or moves the given peer to the front of its respective bucket
Expand Down
68 changes: 35 additions & 33 deletions table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/test"
pstore "github.com/libp2p/go-libp2p-peerstore"
"github.com/stretchr/testify/require"
)

// Test basic features of the bucket struct
Expand Down Expand Up @@ -53,48 +54,49 @@ func TestBucket(t *testing.T) {
func TestGenRandPeerID(t *testing.T) {
t.Parallel()

nBuckets := 21
local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)

// create nBuckets
for i := 0; i < nBuckets; i++ {
for {
if p := test.RandPeerIDFatal(t); CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == i {
rt.Update(p)
break
}
}
// generate above maxCplForRefresh fails
p, err := rt.GenRandPeerID(maxCplForRefresh + 1)
require.Error(t, err)
require.Empty(t, p)

// test generate rand peer ID
for cpl := uint(0); cpl <= maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)

require.True(t, uint(CommonPrefixLen(ConvertPeerID(peerID), rt.local)) == cpl, "failed for cpl=%d", cpl)
}
}

// test bucket for peer
peers := rt.ListPeers()
for _, p := range peers {
b := rt.BucketForID(ConvertPeerID(p))
if !b.Has(p) {
t.Fatalf("bucket should have peers %s", p.String())
}
func TestRefreshAndGetTrackedCpls(t *testing.T) {
t.Parallel()

local := test.RandPeerIDFatal(t)
m := pstore.NewMetrics()
rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m)

// add cpl's for tracking
for cpl := uint(0); cpl < maxCplForRefresh; cpl++ {
peerID, err := rt.GenRandPeerID(cpl)
require.NoError(t, err)
rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now())
}

// test generate rand peer ID
for bucketID := 0; bucketID < nBuckets; bucketID++ {
peerID := rt.GenRandPeerID(bucketID)

// for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID
if bucketID < 16 {
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) != bucketID {
t.Fatalf("cpl should be %d for bucket %d but got %d, generated peerID is %s", bucketID, bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
} else {
// from bucketID 16 onwards, CPL should be ATLEAST 16
if CommonPrefixLen(ConvertPeerID(peerID), rt.local) < 16 {
t.Fatalf("cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s", bucketID,
CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID)
}
}
// fetch cpl's
trackedCpls := rt.GetTrackedCplsForRefresh()
require.Len(t, trackedCpls, int(maxCplForRefresh))
actualCpls := make(map[uint]struct{})
for i := 0; i < len(trackedCpls); i++ {
actualCpls[trackedCpls[i].Cpl] = struct{}{}
}

for i := uint(0); i < maxCplForRefresh; i++ {
_, ok := actualCpls[i]
require.True(t, ok, "tracked cpl's should have cpl %d", i)
}
}

Expand Down