Skip to content
This repository has been archived by the owner on Apr 21, 2022. It is now read-only.

decaying tags: support removal and closure. #72

Merged
merged 2 commits into from
May 15, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 95 additions & 17 deletions decay.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,12 @@ type bumpCmd struct {
delta int
}

// removeCmd represents a tag removal command.
type removeCmd struct {
peer peer.ID
tag *decayingTag
}

// decayer tracks and manages all decaying tags and their values.
type decayer struct {
cfg *DecayerCfg
Expand All @@ -34,8 +40,10 @@ type decayer struct {
// lastTick stores the last time the decayer ticked. Guarded by atomic.
lastTick atomic.Value

// bumpCh queues bump commands to be processed by the loop.
bumpCh chan bumpCmd
// bumpTagCh queues bump commands to be processed by the loop.
bumpTagCh chan bumpCmd
removeTagCh chan removeCmd
closeTagCh chan *decayingTag

// closure thingies.
closeCh chan struct{}
Expand Down Expand Up @@ -70,13 +78,15 @@ func NewDecayer(cfg *DecayerCfg, mgr *BasicConnMgr) (*decayer, error) {
}

d := &decayer{
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpCh: make(chan bumpCmd, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
cfg: cfg,
mgr: mgr,
clock: cfg.Clock,
knownTags: make(map[string]*decayingTag),
bumpTagCh: make(chan bumpCmd, 128),
removeTagCh: make(chan removeCmd, 128),
closeTagCh: make(chan *decayingTag, 128),
closeCh: make(chan struct{}),
doneCh: make(chan struct{}),
}

d.lastTick.Store(d.clock.Now())
Expand Down Expand Up @@ -203,7 +213,7 @@ func (d *decayer) process() {
delete(visit, tag)
}

case bmp = <-d.bumpCh:
case bmp = <-d.bumpTagCh:
var (
now = d.clock.Now()
peer, tag = bmp.peer, bmp.tag
Expand Down Expand Up @@ -231,9 +241,42 @@ func (d *decayer) process() {

s.Unlock()

case rm := <-d.removeTagCh:
s := d.mgr.segments.get(rm.peer)
s.Lock()

p := s.tagInfoFor(rm.peer)
v, ok := p.decaying[rm.tag]
if !ok {
s.Unlock()
continue
}
p.value -= v.Value
delete(p.decaying, rm.tag)
s.Unlock()

case t := <-d.closeTagCh:
// Stop tracking the tag.
d.tagsMu.Lock()
delete(d.knownTags, t.name)
d.tagsMu.Unlock()

// Remove the tag from all peers that had it in the connmgr.
for _, s := range d.mgr.segments {
// visit all segments, and attempt to remove the tag from all the peers it stores.
s.Lock()
for _, p := range s.peers {
if dt, ok := p.decaying[t]; ok {
// decrease the value of the tagInfo, and delete the tag.
p.value -= dt.Value
delete(p.decaying, t)
}
}
s.Unlock()
}

case <-d.closeCh:
return

}
}
}
Expand All @@ -247,6 +290,10 @@ type decayingTag struct {
nextTick time.Time
decayFn connmgr.DecayFn
bumpFn connmgr.BumpFn

// closed marks this tag as closed, so that if it's bumped after being
// closed, we can return an error. 0 = false; 1 = true; guarded by atomic.
closed int32
}

var _ connmgr.DecayingTag = (*decayingTag)(nil)
Expand All @@ -261,18 +308,49 @@ func (t *decayingTag) Interval() time.Duration {

// Bump bumps a tag for this peer.
func (t *decayingTag) Bump(p peer.ID, delta int) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further bumps are accepted", t.name)
}

bmp := bumpCmd{peer: p, tag: t, delta: delta}

select {
case t.trkr.bumpCh <- bmp:
case t.trkr.bumpTagCh <- bmp:
return nil

default:
return fmt.Errorf(
"unable to bump decaying tag for peer %s, tag %s, delta %d; queue full (len=%d)",
p.Pretty(),
t.name,
delta,
len(t.trkr.bumpCh))
p.Pretty(), t.name, delta, len(t.trkr.bumpTagCh))
}
}

func (t *decayingTag) Remove(p peer.ID) error {
if atomic.LoadInt32(&t.closed) == 1 {
return fmt.Errorf("decaying tag %s had been closed; no further removals are accepted", t.name)
}

rm := removeCmd{peer: p, tag: t}

select {
case t.trkr.removeTagCh <- rm:
return nil
default:
return fmt.Errorf(
"unable to remove decaying tag for peer %s, tag %s; queue full (len=%d)",
p.Pretty(), t.name, len(t.trkr.removeTagCh))
}
}

func (t *decayingTag) Close() error {
if !atomic.CompareAndSwapInt32(&t.closed, 0, 1) {
log.Warnf("duplicate decaying tag closure: %s; skipping", t.name)
return nil
}

select {
case t.trkr.closeTagCh <- t:
return nil
default:
return fmt.Errorf("unable to close decaying tag %s; queue full (len=%d)", t.name, len(t.trkr.closeTagCh))
}
}
100 changes: 99 additions & 1 deletion decay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ func TestResolutionMisaligned(t *testing.T) {
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// nothing has happened.
// first tick.
mockClock.Add(TestResolution)
require.Equal(1000, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(1000, mgr.GetTagInfo(id).Tags["bop"])
Expand All @@ -301,6 +301,104 @@ func TestResolutionMisaligned(t *testing.T) {
require.Equal(1997, mgr.GetTagInfo(id).Value)
}

func TestTagRemoval(t *testing.T) {
var (
id1, id2 = tu.RandPeerIDFatal(t), tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)

tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

// id1 has both tags; id2 only has the first tag.
_ = tag1.Bump(id1, 1000)
_ = tag2.Bump(id1, 1000)
_ = tag1.Bump(id2, 1000)

// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// first tick.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(999, mgr.GetTagInfo(id2).Tags["beep"])

require.Equal(999*2, mgr.GetTagInfo(id1).Value)
require.Equal(999, mgr.GetTagInfo(id2).Value)

// remove tag1 from p1.
err = tag1.Remove(id1)

// allow the background goroutine to process the removal.
<-time.After(500 * time.Millisecond)
require.NoError(err)

// next tick. both peers only have 1 tag, both at 998 value.
mockClock.Add(TestResolution)
require.Zero(mgr.GetTagInfo(id1).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id1).Tags["bop"])
require.Equal(998, mgr.GetTagInfo(id2).Tags["beep"])

require.Equal(998, mgr.GetTagInfo(id1).Value)
require.Equal(998, mgr.GetTagInfo(id2).Value)

// remove tag1 from p1 again; no error.
err = tag1.Remove(id1)
require.NoError(err)
}

func TestTagClosure(t *testing.T) {
var (
id = tu.RandPeerIDFatal(t)
mgr, decay, mockClock = testDecayTracker(t)
require = require.New(t)
)

tag1, err := decay.RegisterDecayingTag("beep", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

tag2, err := decay.RegisterDecayingTag("bop", TestResolution, connmgr.DecayFixed(1), connmgr.BumpOverwrite())
require.NoError(err)

_ = tag1.Bump(id, 1000)
_ = tag2.Bump(id, 1000)
// allow the background goroutine to process bumps.
<-time.After(500 * time.Millisecond)

// nothing has happened.
mockClock.Add(TestResolution)
require.Equal(999, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(999, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(999*2, mgr.GetTagInfo(id).Value)

// next tick; tag1 would've ticked.
mockClock.Add(TestResolution)
require.Equal(998, mgr.GetTagInfo(id).Tags["beep"])
require.Equal(998, mgr.GetTagInfo(id).Tags["bop"])
require.Equal(998*2, mgr.GetTagInfo(id).Value)

// close the tag.
err = tag1.Close()
require.NoError(err)

// allow the background goroutine to process the closure.
<-time.After(500 * time.Millisecond)
require.Equal(998, mgr.GetTagInfo(id).Value)

// a second closure should not error.
err = tag1.Close()
require.NoError(err)

// bumping a tag after it's been closed should error.
err = tag1.Bump(id, 5)
require.Error(err)
}

func testDecayTracker(tb testing.TB) (*BasicConnMgr, connmgr.Decayer, *clock.Mock) {
mockClock := clock.NewMock()
cfg := &DecayerCfg{
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ require (
github.com/benbjohnson/clock v1.0.1
github.com/ipfs/go-detect-race v0.0.1
github.com/ipfs/go-log v1.0.4
github.com/libp2p/go-libp2p-core v0.5.5
github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de
github.com/multiformats/go-multiaddr v0.2.1
github.com/stretchr/testify v1.4.0
)
10 changes: 2 additions & 8 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,15 +55,9 @@ github.com/libp2p/go-buffer-pool v0.0.1/go.mod h1:xtyIz9PMobb13WaxR6Zo1Pd1zXJKYg
github.com/libp2p/go-buffer-pool v0.0.2 h1:QNK2iAFa8gjAe1SPz6mHSMuCcjs+X1wlHzeOSqcmlfs=
github.com/libp2p/go-buffer-pool v0.0.2/go.mod h1:MvaB6xw5vOrDl8rYZGLFdKAuk/hRoRZd1Vi32+RXyFM=
github.com/libp2p/go-flow-metrics v0.0.3/go.mod h1:HeoSNUrOJVK1jEpDqVEiUOIXqhbnS27omG0uWU5slZs=
github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca h1:9xF2NgTB7L0GFdRBnEE8sa7sZbeshEznH1pEuqH5A8o=
github.com/libp2p/go-libp2p-core v0.5.4-0.20200514121551-d3277047d6ca/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174 h1:OID2AvF6Ax15EvUjAsJVbVhhY9CWPA3ub6hSRXk9vxU=
github.com/libp2p/go-libp2p-core v0.5.5-0.20200514134608-fd0d4abfc174/go.mod h1:uN7L2D4EvPCvzSH5SrhR72UWbnSGpt5/a35Sm4upn4Y=
github.com/libp2p/go-libp2p-core v0.5.5 h1:/yiFUZDoBWqvpWeHHJ1iA8SOs5obT1/+UdNfckwD57M=
github.com/libp2p/go-libp2p-core v0.5.5/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM=
github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de h1:zkQm/5xoY3kB4nGcYYaJU+Q1NY++qv7C0NjI4ym/kiM=
github.com/libp2p/go-libp2p-core v0.5.6-0.20200514185821-3fd1d20845de/go.mod h1:vj3awlOr9+GMZJFH9s4mpt9RHHgGqeHCopzbYKZdRjM=
github.com/libp2p/go-msgio v0.0.4/go.mod h1:63lBBgOTDKQL6EWazRMCwXsEeEeK9O2Cd+0+6OOuipQ=
github.com/libp2p/go-openssl v0.0.4 h1:d27YZvLoTyMhIN4njrkr8zMDOM4lfpHIp6A+TK9fovg=
github.com/libp2p/go-openssl v0.0.4/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/libp2p/go-openssl v0.0.5 h1:pQkejVhF0xp08D4CQUcw8t+BFJeXowja6RVcb5p++EA=
github.com/libp2p/go-openssl v0.0.5/go.mod h1:unDrJpgy3oFr+rqXsarWifmJuNnJR4chtO1HmaZjggc=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
Expand Down