Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge Upstream v0.2.4 #22

Merged
merged 9 commits into from
Jul 30, 2020
Merged
109 changes: 82 additions & 27 deletions connmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,31 @@ var SilencePeriod = 10 * time.Second
//
// See configuration parameters in NewConnManager.
type BasicConnMgr struct {
highWater int
lowWater int
connCount int32
gracePeriod time.Duration
segments segments
*decayer

cfg *BasicConnManagerConfig
segments segments

plk sync.RWMutex
protected map[peer.ID]map[string]struct{}

trimTrigger chan chan<- struct{}
// channel-based semaphore that enforces only a single trim is in progress
trimRunningCh chan struct{}
trimTrigger chan chan<- struct{}
connCount int32

lastTrimMu sync.RWMutex
lastTrim time.Time

silencePeriod time.Duration

logger *zap.Logger
ctx context.Context
cancel context.CancelFunc
}

var _ connmgr.ConnManager = (*BasicConnMgr)(nil)
var (
_ connmgr.ConnManager = (*BasicConnMgr)(nil)
_ connmgr.Decayer = (*BasicConnMgr)(nil)
)

type segment struct {
sync.Mutex
Expand Down Expand Up @@ -80,6 +84,7 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
firstSeen: time.Now(), // this timestamp will be updated when the first Connected notification arrives.
temp: true,
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[p] = pi
Expand All @@ -92,15 +97,34 @@ func (s *segment) tagInfoFor(p peer.ID) *peerInfo {
// their connections terminated) until 'low watermark' peers remain.
// * grace is the amount of time a newly opened connection is given before it becomes
// subject to pruning.
func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace time.Duration) *BasicConnMgr {
cm := &BasicConnMgr{

func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace time.Duration, opts ...Option) *BasicConnMgr {
ctx, cancel := context.WithCancel(ctx)
cfg := &BasicConnManagerConfig{
highWater: hi,
lowWater: low,
gracePeriod: grace,
silencePeriod: SilencePeriod,
}

for _, o := range opts {
// TODO we're ignoring errors from options because we have no way to
// return them, or otherwise act on them.
_ = o(cfg)
}

if cfg.decayer == nil {
// Set the default decayer config.
cfg.decayer = (&DecayerCfg{}).WithDefaults()
}

cm := &BasicConnMgr{
cfg: cfg,
trimRunningCh: make(chan struct{}, 1),
trimTrigger: make(chan chan<- struct{}),
protected: make(map[peer.ID]map[string]struct{}, 16),
silencePeriod: SilencePeriod,
ctx: ctx,
cancel: cancel,
logger: logger.Named("connmgr"),
segments: func() (ret segments) {
for i := range ret {
Expand All @@ -111,13 +135,21 @@ func NewConnManager(ctx context.Context, logger *zap.Logger, low, hi int, grace
return ret
}(),
}

decay, _ := NewDecayer(cfg.decayer, cm)
cm.decayer = decay

go cm.background()
return cm
}

// Close is here to satisfy the interface of ConnectionManager
// previously in the libp2p version this called a cancel func
func (cm *BasicConnMgr) Close() error {
if err := cm.decayer.Close(); err != nil {
return err
}
cm.cancel()
return nil
}

Expand Down Expand Up @@ -151,12 +183,31 @@ func (cm *BasicConnMgr) Unprotect(id peer.ID, tag string) (protected bool) {
return true
}

func (cm *BasicConnMgr) IsProtected(id peer.ID, tag string) (protected bool) {
cm.plk.Lock()
defer cm.plk.Unlock()

tags, ok := cm.protected[id]
if !ok {
return false
}

if tag == "" {
return true
}

_, protected = tags[tag]
return protected
}

// peerInfo stores metadata for a given peer.
type peerInfo struct {
id peer.ID
tags map[string]int // value for each tag
value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections
id peer.ID
tags map[string]int // value for each tag
decaying map[*decayingTag]*connmgr.DecayingValue // decaying tags

value int // cached sum of all tag values
temp bool // this is a temporary entry holding early tags, and awaiting connections

conns map[network.Conn]time.Time // start time of each connection

Expand Down Expand Up @@ -200,7 +251,7 @@ func (cm *BasicConnMgr) background() {
var waiting chan<- struct{}
select {
case <-ticker.C:
if atomic.LoadInt32(&cm.connCount) < int32(cm.highWater) {
if atomic.LoadInt32(&cm.connCount) < int32(cm.cfg.highWater) {
// Below high water, skip.
continue
}
Expand Down Expand Up @@ -236,7 +287,7 @@ func (cm *BasicConnMgr) trim() {
cm.lastTrimMu.RUnlock()

// skip this attempt to trim if the last one just took place.
if time.Since(lastTrim) < cm.silencePeriod {
if time.Since(lastTrim) < cm.cfg.silencePeriod {
return
}

Expand All @@ -256,21 +307,21 @@ func (cm *BasicConnMgr) trim() {
// getConnsToClose runs the heuristics described in TrimOpenConns and returns the
// connections to close.
func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
if cm.lowWater == 0 || cm.highWater == 0 {
if cm.cfg.lowWater == 0 || cm.cfg.highWater == 0 {
// disabled
return nil
}

nconns := int(atomic.LoadInt32(&cm.connCount))
if nconns <= cm.lowWater {
if nconns <= cm.cfg.lowWater {
cm.logger.Debug("open connection count below limit")
return nil
}

npeers := cm.segments.countPeers()
candidates := make([]*peerInfo, 0, npeers)
ncandidates := 0
gracePeriodStart := time.Now().Add(-cm.gracePeriod)
gracePeriodStart := time.Now().Add(-cm.cfg.gracePeriod)

cm.plk.RLock()
for _, s := range cm.segments {
Expand All @@ -291,7 +342,7 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
}
cm.plk.RUnlock()

if ncandidates < cm.lowWater {
if ncandidates < cm.cfg.lowWater {
cm.logger.Info("open connection count above limit but too many are in the grace period")
// We have too many connections but fewer than lowWater
// connections out of the grace period.
Expand All @@ -311,9 +362,9 @@ func (cm *BasicConnMgr) getConnsToClose() []network.Conn {
return left.value < right.value
})

target := ncandidates - cm.lowWater
target := ncandidates - cm.cfg.lowWater

// slightly overallocate because we may have more than one conns per peer
// slightly over allocate because we may have more than one conns per peer
selected := make([]network.Conn, 0, target+10)

for _, inf := range candidates {
Expand Down Expand Up @@ -363,6 +414,9 @@ func (cm *BasicConnMgr) GetTagInfo(p peer.ID) *connmgr.TagInfo {
for t, v := range pi.tags {
out.Tags[t] = v
}
for t, v := range pi.decaying {
out.Tags[t.name] = v.Value
}
for c, t := range pi.conns {
out.Conns[c.RemoteMultiaddr().String()] = t
}
Expand Down Expand Up @@ -439,10 +493,10 @@ func (cm *BasicConnMgr) GetInfo() CMInfo {
cm.lastTrimMu.RUnlock()

return CMInfo{
HighWater: cm.highWater,
LowWater: cm.lowWater,
HighWater: cm.cfg.highWater,
LowWater: cm.cfg.lowWater,
LastTrim: lastTrim,
GracePeriod: cm.gracePeriod,
GracePeriod: cm.cfg.gracePeriod,
ConnCount: int(atomic.LoadInt32(&cm.connCount)),
}
}
Expand Down Expand Up @@ -478,6 +532,7 @@ func (nn *cmNotifee) Connected(n network.Network, c network.Conn) {
id: id,
firstSeen: time.Now(),
tags: make(map[string]int),
decaying: make(map[*decayingTag]*connmgr.DecayingValue),
conns: make(map[network.Conn]time.Time),
}
s.peers[id] = pinfo
Expand Down
Loading