Skip to content

Commit

Permalink
have a configurable maximum active work per peer
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann committed Jul 1, 2021
1 parent 0d56745 commit 2b501cb
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 27 deletions.
13 changes: 0 additions & 13 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -15,18 +15,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo
github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE=
github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU=
github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU=
github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc=
github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ=
github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU=
github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY=
github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY=
github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY=
Expand All @@ -40,20 +32,17 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8
github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g=
github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ=
github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5 h1:l16XLUUJ34wIz+RIvLhSwGvLvKyy+W598b135bJN6mg=
github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U=
github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs=
github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8=
github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78=
github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc=
github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL8NSQj0zQ5Nz/AA=
github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s=
github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44=
github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs=
github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ=
github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U=
github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik=
github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po=
Expand All @@ -68,7 +57,6 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b
github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M=
golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo=
Expand All @@ -78,7 +66,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0=
golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
27 changes: 19 additions & 8 deletions peertaskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,14 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent)
// to execute the block with the highest priority, or otherwise the one added
// first if priorities are equal.
type PeerTaskQueue struct {
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
lock sync.Mutex
pQueue pq.PQ
peerTrackers map[peer.ID]*peertracker.PeerTracker
frozenPeers map[peer.ID]struct{}
hooks []hookFunc
ignoreFreezing bool
taskMerger peertracker.TaskMerger
maxOutstandingWorkPerPeer int
}

// Option is a function that configures the peer task queue
Expand Down Expand Up @@ -62,6 +63,16 @@ func TaskMerger(tmfp peertracker.TaskMerger) Option {
}
}

// MaxOutstandingWorkPerPeer is an option that specifies how many tasks a peer can have outstanding
// with the same Topic as an existing Topic.
func MaxOutstandingWorkPerPeer(count int) Option {
return func(ptq *PeerTaskQueue) Option {
previous := ptq.maxOutstandingWorkPerPeer
ptq.maxOutstandingWorkPerPeer = count
return MaxOutstandingWorkPerPeer(previous)
}
}

func removeHook(hook hookFunc) Option {
return func(ptq *PeerTaskQueue) Option {
for i, testHook := range ptq.hooks {
Expand Down Expand Up @@ -139,7 +150,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) {

peerTracker, ok := ptq.peerTrackers[to]
if !ok {
peerTracker = peertracker.New(to, ptq.taskMerger)
peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer)
ptq.pQueue.Push(peerTracker)
ptq.peerTrackers[to] = peerTracker
ptq.callHooks(to, peerAdded)
Expand Down
25 changes: 19 additions & 6 deletions peertracker/peertracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type PeerTracker struct {
activelk sync.Mutex
activeWork int

maxActiveWorkPerPeer int

// for the PQ interface
index int

Expand All @@ -57,13 +59,14 @@ type PeerTracker struct {
}

// New creates a new PeerTracker
func New(target peer.ID, taskMerger TaskMerger) *PeerTracker {
func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker {
return &PeerTracker{
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
target: target,
taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)),
pendingTasks: make(map[peertask.Topic]*peertask.QueueTask),
activeTasks: make(map[*peertask.Task]struct{}),
taskMerger: taskMerger,
maxActiveWorkPerPeer: maxActiveWorkPerPeer,
}
}

Expand Down Expand Up @@ -172,6 +175,16 @@ func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) {
var out []*peertask.Task
work := 0
for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork {
if p.maxActiveWorkPerPeer > 0 {
// Do not add work to a peer that is already maxed out
p.activelk.Lock()
activeWork := p.activeWork
p.activelk.Unlock()
if activeWork >= p.maxActiveWorkPerPeer {
break
}
}

// Pop the next task off the queue
t := p.taskQueue.Pop().(*peertask.QueueTask)

Expand Down

0 comments on commit 2b501cb

Please sign in to comment.