From a58d596863ddea2f2cd443206f5f9a03f6a153b0 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 18 Nov 2019 18:21:37 +0800 Subject: [PATCH 1/3] Refresh Cpl's, not buckets --- bucket.go | 19 --------- table.go | 111 +++++++++++++++++++++++++------------------------- table_test.go | 68 ++++++++++++++++--------------- 3 files changed, 90 insertions(+), 108 deletions(-) diff --git a/bucket.go b/bucket.go index 268e432..6a26f7b 100644 --- a/bucket.go +++ b/bucket.go @@ -5,7 +5,6 @@ package kbucket import ( "container/list" "sync" - "time" "github.com/libp2p/go-libp2p-core/peer" ) @@ -14,32 +13,14 @@ import ( type Bucket struct { lk sync.RWMutex list *list.List - - lastRefreshedAtLk sync.RWMutex - lastRefreshedAt time.Time // the last time we looked up a key in the bucket } func newBucket() *Bucket { b := new(Bucket) b.list = list.New() - b.lastRefreshedAt = time.Now() return b } -func (b *Bucket) RefreshedAt() time.Time { - b.lastRefreshedAtLk.RLock() - defer b.lastRefreshedAtLk.RUnlock() - - return b.lastRefreshedAt -} - -func (b *Bucket) ResetRefreshedAt(newTime time.Time) { - b.lastRefreshedAtLk.Lock() - defer b.lastRefreshedAtLk.Unlock() - - b.lastRefreshedAt = newTime -} - func (b *Bucket) Peers() []peer.ID { b.lk.RLock() defer b.lk.RUnlock() diff --git a/table.go b/table.go index c4c7be7..708f758 100644 --- a/table.go +++ b/table.go @@ -21,6 +21,15 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") +// MaxCplForRefresh is the maximum cpl we support for refresh. +// This limit exists because we can only generate 'MaxCplForRefresh' bit prefixes for now. +var MaxCplForRefresh uint = 15 + +type CplRefresh struct { + Cpl uint + LastRefreshAt time.Time +} + // RoutingTable defines the routing table. type RoutingTable struct { // ID of the local peer @@ -39,6 +48,9 @@ type RoutingTable struct { Buckets []*Bucket bucketsize int + cplRefreshLk sync.RWMutex + cplRefreshedAt map[uint]time.Time + // notification functions PeerRemoved func(peer.ID) PeerAdded func(peer.ID) @@ -47,84 +59,71 @@ type RoutingTable struct { // NewRoutingTable creates a new routing table with a given bucketsize, local ID, and latency tolerance. func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerstore.Metrics) *RoutingTable { rt := &RoutingTable{ - Buckets: []*Bucket{newBucket()}, - bucketsize: bucketsize, - local: localID, - maxLatency: latency, - metrics: m, - PeerRemoved: func(peer.ID) {}, - PeerAdded: func(peer.ID) {}, + Buckets: []*Bucket{newBucket()}, + bucketsize: bucketsize, + local: localID, + maxLatency: latency, + metrics: m, + cplRefreshedAt: make(map[uint]time.Time), + PeerRemoved: func(peer.ID) {}, + PeerAdded: func(peer.ID) {}, } return rt } -// GetAllBuckets is safe to call as rt.Buckets is append-only -// caller SHOULD NOT modify the returned slice -func (rt *RoutingTable) GetAllBuckets() []*Bucket { - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - return rt.Buckets -} +// GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. +// Caller is free to modify the returned slice as it is a defensive copy. +func (rt *RoutingTable) GetTrackedCplsForRefresh() []*CplRefresh { + rt.cplRefreshLk.RLock() + defer rt.cplRefreshLk.RUnlock() -// GenRandPeerID generates a random peerID in bucket=bucketID -func (rt *RoutingTable) GenRandPeerID(bucketID int) peer.ID { - if bucketID < 0 { - panic(fmt.Sprintf("bucketID %d is not non-negative", bucketID)) - } - rt.tabLock.RLock() - bucketLen := len(rt.Buckets) - rt.tabLock.RUnlock() + var cpls []*CplRefresh - var targetCpl uint - if bucketID > (bucketLen - 1) { - targetCpl = uint(bucketLen) - 1 - } else { - targetCpl = uint(bucketID) + for c, t := range rt.cplRefreshedAt { + cpls = append(cpls, &CplRefresh{c, t}) } - // We can only handle upto 16 bit prefixes - if targetCpl > 16 { - targetCpl = 16 + return cpls +} + +// GenRandPeerID generates a random peerID for a given Cpl +func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) { + if targetCpl > MaxCplForRefresh { + return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", MaxCplForRefresh) } - var targetPrefix uint16 localPrefix := binary.BigEndian.Uint16(rt.local) - if targetCpl < 16 { - // For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B. - // Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L - // to our randomly generated prefix. - toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl) - randPrefix := uint16(rand.Uint32()) - - // Combine the toggled local prefix and the random bits at the correct offset - // such that ONLY the first `targetCpl` bits match the local ID. - mask := (^uint16(0)) << (16 - (targetCpl + 1)) - targetPrefix = (toggledLocalPrefix & mask) | (randPrefix & ^mask) - } else { - targetPrefix = localPrefix - } + + // For host with ID `L`, an ID `K` belongs to a bucket with ID `B` ONLY IF CommonPrefixLen(L,K) is EXACTLY B. + // Hence, to achieve a targetPrefix `T`, we must toggle the (T+1)th bit in L & then copy (T+1) bits from L + // to our randomly generated prefix. + toggledLocalPrefix := localPrefix ^ (uint16(0x8000) >> targetCpl) + randPrefix := uint16(rand.Uint32()) + + // Combine the toggled local prefix and the random bits at the correct offset + // such that ONLY the first `targetCpl` bits match the local ID. + mask := (^uint16(0)) << (16 - (targetCpl + 1)) + targetPrefix := (toggledLocalPrefix & mask) | (randPrefix & ^mask) // Convert to a known peer ID. key := keyPrefixMap[targetPrefix] id := [34]byte{mh.SHA2_256, 32} binary.BigEndian.PutUint32(id[2:], key) - return peer.ID(id[:]) + return peer.ID(id[:]), nil } -// Returns the bucket for a given ID -// should NOT modify the peer list on the returned bucket -func (rt *RoutingTable) BucketForID(id ID) *Bucket { +// ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID. +func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { cpl := CommonPrefixLen(id, rt.local) - - rt.tabLock.RLock() - defer rt.tabLock.RUnlock() - bucketID := cpl - if bucketID >= len(rt.Buckets) { - bucketID = len(rt.Buckets) - 1 + if uint(cpl) > MaxCplForRefresh { + return } - return rt.Buckets[bucketID] + rt.cplRefreshLk.Lock() + defer rt.cplRefreshLk.Unlock() + + rt.cplRefreshedAt[uint(cpl)] = newTime } // Update adds or moves the given peer to the front of its respective bucket diff --git a/table_test.go b/table_test.go index 4067ea8..97d2f1c 100644 --- a/table_test.go +++ b/table_test.go @@ -8,6 +8,7 @@ import ( "github.com/libp2p/go-libp2p-core/peer" "github.com/libp2p/go-libp2p-core/test" pstore "github.com/libp2p/go-libp2p-peerstore" + "github.com/stretchr/testify/require" ) // Test basic features of the bucket struct @@ -53,48 +54,49 @@ func TestBucket(t *testing.T) { func TestGenRandPeerID(t *testing.T) { t.Parallel() - nBuckets := 21 local := test.RandPeerIDFatal(t) m := pstore.NewMetrics() rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) - // create nBuckets - for i := 0; i < nBuckets; i++ { - for { - if p := test.RandPeerIDFatal(t); CommonPrefixLen(ConvertPeerID(local), ConvertPeerID(p)) == i { - rt.Update(p) - break - } - } + // generate above MaxCplForRefresh fails + p, err := rt.GenRandPeerID(MaxCplForRefresh + 1) + require.Error(t, err) + require.Empty(t, p) + + // test generate rand peer ID + for cpl := uint(0); cpl <= MaxCplForRefresh; cpl++ { + peerID, err := rt.GenRandPeerID(cpl) + require.NoError(t, err) + + require.True(t, uint(CommonPrefixLen(ConvertPeerID(peerID), rt.local)) == cpl, "failed for cpl=%d", cpl) } +} - // test bucket for peer - peers := rt.ListPeers() - for _, p := range peers { - b := rt.BucketForID(ConvertPeerID(p)) - if !b.Has(p) { - t.Fatalf("bucket should have peers %s", p.String()) - } +func TestRefreshAndGetTrackedCpls(t *testing.T) { + t.Parallel() + + local := test.RandPeerIDFatal(t) + m := pstore.NewMetrics() + rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) + + // add cpl's for tracking + for cpl := uint(0); cpl < MaxCplForRefresh; cpl++ { + peerID, err := rt.GenRandPeerID(cpl) + require.NoError(t, err) + rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now()) } - // test generate rand peer ID - for bucketID := 0; bucketID < nBuckets; bucketID++ { - peerID := rt.GenRandPeerID(bucketID) - - // for bucketID upto maxPrefixLen of 16, CPL should be Exactly bucketID - if bucketID < 16 { - if CommonPrefixLen(ConvertPeerID(peerID), rt.local) != bucketID { - t.Fatalf("cpl should be %d for bucket %d but got %d, generated peerID is %s", bucketID, bucketID, - CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID) - } - } else { - // from bucketID 16 onwards, CPL should be ATLEAST 16 - if CommonPrefixLen(ConvertPeerID(peerID), rt.local) < 16 { - t.Fatalf("cpl should be ATLEAST 16 for bucket %d but got %d, generated peerID is %s", bucketID, - CommonPrefixLen(ConvertPeerID(peerID), rt.local), peerID) - } - } + // fetch cpl's + trackedCpls := rt.GetTrackedCplsForRefresh() + require.Len(t, trackedCpls, int(MaxCplForRefresh)) + actualCpls := make(map[uint]struct{}) + for i := 0; i < len(trackedCpls); i++ { + actualCpls[trackedCpls[i].Cpl] = struct{}{} + } + for i := uint(0); i < MaxCplForRefresh; i++ { + _, ok := actualCpls[i] + require.True(t, ok, "tracked cpl's should have cpl %d", i) } } From 6e226d95407946a849e1ac656f106f7cf760c980 Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Mon, 16 Dec 2019 23:59:57 +0800 Subject: [PATCH 2/3] changes as per review --- table.go | 22 +++++++++++++--------- table_test.go | 12 ++++++------ 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/table.go b/table.go index 708f758..3a69aad 100644 --- a/table.go +++ b/table.go @@ -21,10 +21,12 @@ var log = logging.Logger("table") var ErrPeerRejectedHighLatency = errors.New("peer rejected; latency too high") var ErrPeerRejectedNoCapacity = errors.New("peer rejected; insufficient capacity") -// MaxCplForRefresh is the maximum cpl we support for refresh. -// This limit exists because we can only generate 'MaxCplForRefresh' bit prefixes for now. -var MaxCplForRefresh uint = 15 +// maxCplForRefresh is the maximum cpl we support for refresh. +// This limit exists because we can only generate 'maxCplForRefresh' bit prefixes for now. +const maxCplForRefresh uint = 15 +// CplRefresh contains a CPL(common prefix length) with the host & the last time +// we refreshed that cpl/searched for an ID which has that cpl with the host. type CplRefresh struct { Cpl uint LastRefreshAt time.Time @@ -74,14 +76,16 @@ func NewRoutingTable(bucketsize int, localID ID, latency time.Duration, m peerst // GetTrackedCplsForRefresh returns the Cpl's we are tracking for refresh. // Caller is free to modify the returned slice as it is a defensive copy. -func (rt *RoutingTable) GetTrackedCplsForRefresh() []*CplRefresh { +func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh { rt.cplRefreshLk.RLock() defer rt.cplRefreshLk.RUnlock() - var cpls []*CplRefresh + cpls := make([]CplRefresh, len(rt.cplRefreshedAt)) + i := 0 for c, t := range rt.cplRefreshedAt { - cpls = append(cpls, &CplRefresh{c, t}) + cpls[i] = CplRefresh{c, t} + i++ } return cpls @@ -89,8 +93,8 @@ func (rt *RoutingTable) GetTrackedCplsForRefresh() []*CplRefresh { // GenRandPeerID generates a random peerID for a given Cpl func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) { - if targetCpl > MaxCplForRefresh { - return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", MaxCplForRefresh) + if targetCpl > maxCplForRefresh { + return "", fmt.Errorf("cannot generate peer ID for Cpl greater than %d", maxCplForRefresh) } localPrefix := binary.BigEndian.Uint16(rt.local) @@ -116,7 +120,7 @@ func (rt *RoutingTable) GenRandPeerID(targetCpl uint) (peer.ID, error) { // ResetCplRefreshedAtForID resets the refresh time for the Cpl of the given ID. func (rt *RoutingTable) ResetCplRefreshedAtForID(id ID, newTime time.Time) { cpl := CommonPrefixLen(id, rt.local) - if uint(cpl) > MaxCplForRefresh { + if uint(cpl) > maxCplForRefresh { return } diff --git a/table_test.go b/table_test.go index 97d2f1c..ce83aff 100644 --- a/table_test.go +++ b/table_test.go @@ -58,13 +58,13 @@ func TestGenRandPeerID(t *testing.T) { m := pstore.NewMetrics() rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) - // generate above MaxCplForRefresh fails - p, err := rt.GenRandPeerID(MaxCplForRefresh + 1) + // generate above maxCplForRefresh fails + p, err := rt.GenRandPeerID(maxCplForRefresh + 1) require.Error(t, err) require.Empty(t, p) // test generate rand peer ID - for cpl := uint(0); cpl <= MaxCplForRefresh; cpl++ { + for cpl := uint(0); cpl <= maxCplForRefresh; cpl++ { peerID, err := rt.GenRandPeerID(cpl) require.NoError(t, err) @@ -80,7 +80,7 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { rt := NewRoutingTable(1, ConvertPeerID(local), time.Hour, m) // add cpl's for tracking - for cpl := uint(0); cpl < MaxCplForRefresh; cpl++ { + for cpl := uint(0); cpl < maxCplForRefresh; cpl++ { peerID, err := rt.GenRandPeerID(cpl) require.NoError(t, err) rt.ResetCplRefreshedAtForID(ConvertPeerID(peerID), time.Now()) @@ -88,13 +88,13 @@ func TestRefreshAndGetTrackedCpls(t *testing.T) { // fetch cpl's trackedCpls := rt.GetTrackedCplsForRefresh() - require.Len(t, trackedCpls, int(MaxCplForRefresh)) + require.Len(t, trackedCpls, int(maxCplForRefresh)) actualCpls := make(map[uint]struct{}) for i := 0; i < len(trackedCpls); i++ { actualCpls[trackedCpls[i].Cpl] = struct{}{} } - for i := uint(0); i < MaxCplForRefresh; i++ { + for i := uint(0); i < maxCplForRefresh; i++ { _, ok := actualCpls[i] require.True(t, ok, "tracked cpl's should have cpl %d", i) } From 028e6c64ebea71e4d7da7ca34428ade3a32d1e8f Mon Sep 17 00:00:00 2001 From: Aarsh Shah Date: Tue, 17 Dec 2019 00:59:02 +0800 Subject: [PATCH 3/3] use make correctly --- table.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/table.go b/table.go index 3a69aad..c24361e 100644 --- a/table.go +++ b/table.go @@ -80,12 +80,10 @@ func (rt *RoutingTable) GetTrackedCplsForRefresh() []CplRefresh { rt.cplRefreshLk.RLock() defer rt.cplRefreshLk.RUnlock() - cpls := make([]CplRefresh, len(rt.cplRefreshedAt)) + cpls := make([]CplRefresh, 0, len(rt.cplRefreshedAt)) - i := 0 for c, t := range rt.cplRefreshedAt { - cpls[i] = CplRefresh{c, t} - i++ + cpls = append(cpls, CplRefresh{c, t}) } return cpls