From a4428ab289b8855028449f54262d029ab0002ab5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Thu, 14 May 2020 20:01:16 +0100 Subject: [PATCH 1/2] decaying tags: support removal and closure. --- decay.go | 112 ++++++++++++++++++++++++++++++++++++++++++-------- decay_test.go | 100 +++++++++++++++++++++++++++++++++++++++++++- go.mod | 2 +- go.sum | 10 +---- 4 files changed, 197 insertions(+), 27 deletions(-) diff --git a/decay.go b/decay.go index fd523bf..62029a5 100644 --- a/decay.go +++ b/decay.go @@ -22,6 +22,12 @@ type bumpCmd struct { delta int } +// removeCmd represents a tag removal command. +type removeCmd struct { + peer peer.ID + tag *decayingTag +} + // decayer tracks and manages all decaying tags and their values. type decayer struct { cfg *DecayerCfg @@ -34,8 +40,10 @@ type decayer struct { // lastTick stores the last time the decayer ticked. Guarded by atomic. lastTick atomic.Value - // bumpCh queues bump commands to be processed by the loop. - bumpCh chan bumpCmd + // bumpTagCh queues bump commands to be processed by the loop. + bumpTagCh chan bumpCmd + removeTagCh chan removeCmd + closeTagCh chan *decayingTag // closure thingies. closeCh chan struct{} @@ -70,13 +78,15 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) { } d := &decayer{ - cfg: cfg, - mgr: mgr, - clock: cfg.Clock, - knownTags: make(map[string]*decayingTag), - bumpCh: make(chan bumpCmd, 128), - closeCh: make(chan struct{}), - doneCh: make(chan struct{}), + cfg: cfg, + mgr: mgr, + clock: cfg.Clock, + knownTags: make(map[string]*decayingTag), + bumpTagCh: make(chan bumpCmd, 128), + removeTagCh: make(chan removeCmd, 128), + closeTagCh: make(chan *decayingTag, 128), + closeCh: make(chan struct{}), + doneCh: make(chan struct{}), } d.lastTick.Store(d.clock.Now()) @@ -203,7 +213,7 @@ func (d *decayer) process() { delete(visit, tag) } - case bmp = <-d.bumpCh: + case bmp = <-d.bumpTagCh: var ( now = d.clock.Now() peer, tag = bmp.peer, bmp.tag @@ -231,9 +241,42 @@ func (d *decayer) process() { s.Unlock() + case rm := <-d.removeTagCh: + s := d.mgr.segments.get(rm.peer) + s.Lock() + + p := s.tagInfoFor(rm.peer) + v, ok := p.decaying[rm.tag] + if !ok { + s.Unlock() + continue + } + p.value -= v.Value + delete(p.decaying, rm.tag) + s.Unlock() + + case t := <-d.closeTagCh: + // Stop tracking the tag. + d.tagsMu.Lock() + delete(d.knownTags, t.name) + d.tagsMu.Unlock() + + // Remove the tag from all peers that had it in the connmgr. + for _, s := range d.mgr.segments { + // visit all segments, and attempt to remove the tag from all the peers it stores. + s.Lock() + for _, p := range s.peers { + if dt, ok := p.decaying[t]; ok { + // decrease the value of the tagInfo, and delete the tag. + p.value -= dt.Value + delete(p.decaying, t) + } + } + s.Unlock() + } + case <-d.closeCh: return - } } } @@ -247,6 +290,10 @@ type decayingTag struct { nextTick time.Time decayFn connmgr.DecayFn bumpFn connmgr.BumpFn + + // closed marks this tag as closed, so that if it's bumped after being + // closed, we can return an error. 0 = false; 1 = true; guarded by atomic. + closed int32 } var _ connmgr.DecayingTag = (*decayingTag)(nil) @@ -261,18 +308,49 @@ func (t *decayingTag) Interval() time.Duration { // Bump bumps a tag for this peer. func (t *decayingTag) Bump(p peer.ID, delta int) error { + if atomic.LoadInt32(&t.closed) == 1 { + return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name) + } + bmp := bumpCmd{peer: p, tag: t, delta: delta} select { - case t.trkr.bumpCh <- bmp: + case t.trkr.bumpTagCh <- bmp: return nil - default: return fmt.Errorf( "unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)", - p.Pretty(), - t.name, - delta, - len(t.trkr.bumpCh)) + p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh)) + } +} + +func (t *decayingTag) Remove(p peer.ID) error { + if atomic.LoadInt32(&t.closed) == 1 { + return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name) + } + + rm := removeCmd{peer: p, tag: t} + + select { + case t.trkr.removeTagCh <- rm: + return nil + default: + return fmt.Errorf( + "unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)", + p.Pretty(), t.name, len(t.trkr.removeTagCh)) + } +} + +func (t *decayingTag) Close() error { + if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) { + log.Warnf("duplicate decaying tag closure: %s; skipping", t.name) + return nil + } + + select { + case t.trkr.closeTagCh <- t: + return nil + default: + return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh)) } } diff --git a/decay_test.go b/decay_test.go index fad5e0d..b8a55be 100644 --- a/decay_test.go +++ b/decay_test.go @@ -283,7 +283,7 @@ func TestResolutionMisaligned(t *testing.T) { // allow the background goroutine to process bumps. <-time.After(500 * time.Millisecond) - // nothing has happened. + // first tick. mockClock.Add(TestResolution) require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"]) require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"]) @@ -301,6 +301,104 @@ func TestResolutionMisaligned(t *testing.T) { require.Equal(1997, mgr.GetTagInfo(id).Value) } +func TestTagRemoval(t *testing.T) { + var ( + id1, id2 = tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t) + mgr, decay, mockClock = testDecayTracker(t) + require = require.New(t) + ) + + tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + // id1 has both tags; id1 only has the first tag. + _ = tag1.Bump(id1, 1000) + _ = tag2.Bump(id1, 1000) + _ = tag1.Bump(id2, 1000) + + // allow the background goroutine to process bumps. + <-time.After(500 * time.Millisecond) + + // first tick. + mockClock.Add(TestResolution) + require.Equal(999, mgr.GetTagInfo(id1).Tags["beep"]) + require.Equal(999, mgr.GetTagInfo(id1).Tags["bop"]) + require.Equal(999, mgr.GetTagInfo(id2).Tags["beep"]) + + require.Equal(999*2, mgr.GetTagInfo(id1).Value) + require.Equal(999, mgr.GetTagInfo(id2).Value) + + // remove tag1 from p1. + err = tag1.Remove(id1) + + // allow the background goroutine to process the removal. + <-time.After(500 * time.Millisecond) + require.NoError(err) + + // next tick. both peers only have 1 tag, both at 998 value. + mockClock.Add(TestResolution) + require.Zero(mgr.GetTagInfo(id1).Tags["beep"]) + require.Equal(998, mgr.GetTagInfo(id1).Tags["bop"]) + require.Equal(998, mgr.GetTagInfo(id2).Tags["beep"]) + + require.Equal(998, mgr.GetTagInfo(id1).Value) + require.Equal(998, mgr.GetTagInfo(id2).Value) + + // remove tag1 from p1 again; no error. + err = tag1.Remove(id1) + require.NoError(err) +} + +func TestTagClosure(t *testing.T) { + var ( + id = tu.RandPeerIDFatal(t) + mgr, decay, mockClock = testDecayTracker(t) + require = require.New(t) + ) + + tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) + require.NoError(err) + + _ = tag1.Bump(id, 1000) + _ = tag2.Bump(id, 1000) + // allow the background goroutine to process bumps. + <-time.After(500 * time.Millisecond) + + // nothing has happened. + mockClock.Add(TestResolution) + require.Equal(999, mgr.GetTagInfo(id).Tags["beep"]) + require.Equal(999, mgr.GetTagInfo(id).Tags["bop"]) + require.Equal(999*2, mgr.GetTagInfo(id).Value) + + // next tick; tag1 would've ticked. + mockClock.Add(TestResolution) + require.Equal(998, mgr.GetTagInfo(id).Tags["beep"]) + require.Equal(998, mgr.GetTagInfo(id).Tags["bop"]) + require.Equal(998*2, mgr.GetTagInfo(id).Value) + + // close the tag. + err = tag1.Close() + require.NoError(err) + + // allow the background goroutine to process the closure. + <-time.After(500 * time.Millisecond) + require.Equal(998, mgr.GetTagInfo(id).Value) + + // a second closure should not error. + err = tag1.Close() + require.NoError(err) + + // bumping a tag after it's been closed should error. + err = tag1.Bump(id, 5) + require.Error(err) +} + func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) { mockClock := clock.NewMock() cfg := &DecayerCfg{ diff --git a/go.mod b/go.mod index d91bc58..518b3e1 100644 --- a/go.mod +++ b/go.mod @@ -6,7 +6,7 @@ require ( github.com/benbjohnson/clock v1.0.1 github.com/ipfs/go-detect-race v0.0.1 github.com/ipfs/go-log v1.0.4 - github.com/libp2p/go-libp2p-core v0.5.5 + github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de github.com/multiformats/go-multiaddr v0.2.1 github.com/stretchr/testify v1.4.0 ) diff --git a/go.sum b/go.sum index 2cfb77b..209bc0f 100644 --- a/go.sum +++ b/go.sum @@ -55,15 +55,9 @@ github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs= github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM= github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs= -github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca h1:9xF2NgTB7L0GFdRBnEE8sa7sZbeshEznH1pEuqH5A8o= -github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= -github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174 h1:OID2AvF6Ax15EvUjAsJVbVhhY9CWPA3ub6hSRXk9vxU= -github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y= -github.com/libp2p/go-libp2p-core v0.5.5 h1:/yiFUZDoBWqvpWeHHJ1iA8SOs5obT1/+UdNfckwD57M= -github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM= +github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de h1:zkQm/5xoY3kB4nGcYYaJU+Q1NY++qv7C0NjI4ym/kiM= +github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM= github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ= -github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg= -github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/libp2p/go-openssl v0.0.5 h1:pQkejVhF0xp08D4CQUcw8t+BFJeXowja6RVcb5p++EA= github.com/libp2p/go-openssl v0.0.5/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= From 0650f48dbccc27e6c4bb564daea1b752dbad350c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Fri, 15 May 2020 09:27:34 +0100 Subject: [PATCH 2/2] fix comment in test. Co-authored-by: Yusef Napora --- decay_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/decay_test.go b/decay_test.go index b8a55be..c75efce 100644 --- a/decay_test.go +++ b/decay_test.go @@ -314,7 +314,7 @@ func TestTagRemoval(t *testing.T) { tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite()) require.NoError(err) - // id1 has both tags; id1 only has the first tag. + // id1 has both tags; id2 only has the first tag. _ = tag1.Bump(id1, 1000) _ = tag2.Bump(id1, 1000) _ = tag1.Bump(id2, 1000)