From 2b501cbd3313394e04b3e124193c2f99c0dad574 Mon Sep 17 00:00:00 2001 From: Adin Schmahmann Date: Thu, 1 Jul 2021 15:49:29 -0400 Subject: [PATCH] have a configurable maximum active work per peer --- go.sum | 13 ------------- peertaskqueue.go | 27 +++++++++++++++++++-------- peertracker/peertracker.go | 25 +++++++++++++++++++------ 3 files changed, 38 insertions(+), 27 deletions(-) diff --git a/go.sum b/go.sum index fdabadf..4f17757 100644 --- a/go.sum +++ b/go.sum @@ -15,18 +15,10 @@ github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMo github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/gxed/hashland/keccakpg v0.0.1 h1:wrk3uMNaMxbXiHibbPO4S0ymqJMm41WiudyFSs7UnsU= github.com/gxed/hashland/keccakpg v0.0.1/go.mod h1:kRzw3HkwxFU1mpmPP8v1WyQzwdGfmKFJ6tItnhQ67kU= -github.com/gxed/hashland/murmur3 v0.0.1 h1:SheiaIt0sda5K+8FLz952/1iWS9zrnKsEJaOJu4ZbSc= github.com/gxed/hashland/murmur3 v0.0.1/go.mod h1:KjXop02n4/ckmZSnY2+HKcLud/tcmvhST0bie/0lS48= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/ipfs/go-cid v0.0.1/go.mod h1:GHWU/WuQdMPmIosc4Yn1bcCT7dSeX4lBafM7iqUPQvM= -github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529 h1:izQqDLe/uSPKe6NYr3FjwnvU0AAg0im/4DLVXplLFUQ= -github.com/ipfs/go-ipfs-pq v0.0.0-20191101181110-8122fa6a9529/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= -github.com/ipfs/go-ipfs-pq v0.0.1 h1:zgUotX8dcAB/w/HidJh1zzc1yFq6Vm8J7T2F4itj/RU= -github.com/ipfs/go-ipfs-pq v0.0.1/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/ipfs/go-ipfs-pq v0.0.2 h1:e1vOOW6MuOwG2lqxcLA+wEn93i/9laCY8sXAw76jFOY= github.com/ipfs/go-ipfs-pq v0.0.2/go.mod h1:LWIqQpqfRG3fNc5XsnIhz/wQ2XXGyugQwls7BgUmUfY= github.com/jbenet/goprocess v0.0.0-20160826012719-b497e2f366b8/go.mod h1:Ly/wlsjFq/qrU3Rar62tu1gASgGw6chQbSh/XgIIXCY= @@ -40,12 +32,10 @@ github.com/libp2p/go-libp2p-core v0.0.1 h1:HSTZtFIq/W5Ue43Zw+uWZyy2Vl5WtF0zDjKN8 github.com/libp2p/go-libp2p-core v0.0.1/go.mod h1:g/VxnTZ/1ygHxH3dKok7Vno1VfpvGcGip57wjTU4fco= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= -github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16 h1:5W7KhL8HVF3XCFOweFD3BNESdnO8ewyYTFT2R+/b8FQ= github.com/minio/sha256-simd v0.0.0-20190131020904-2d45a736cd16/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5 h1:l16XLUUJ34wIz+RIvLhSwGvLvKyy+W598b135bJN6mg= github.com/minio/sha256-simd v0.0.0-20190328051042-05b4dd3047e5/go.mod h1:2FMWW+8GMoPweT6+pI63m9YE3Lmw4J71hV56Chs1E/U= github.com/mr-tron/base58 v1.1.0/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= -github.com/mr-tron/base58 v1.1.1 h1:OJIdWOWYe2l5PQNgimGtuwHY8nDskvJ5vvs//YnzRLs= github.com/mr-tron/base58 v1.1.1/go.mod h1:xcD2VGqlgYjBdcBLw+TuYLr8afG+Hj8g2eTVqeSzSU8= github.com/mr-tron/base58 v1.1.2 h1:ZEw4I2EgPKDJ2iEw0cNmLB3ROrEmkOtXIkaG7wZg+78= github.com/mr-tron/base58 v1.1.2/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= @@ -53,7 +43,6 @@ github.com/multiformats/go-base32 v0.0.3/go.mod h1:pLiuGC8y0QR3Ue4Zug5UzK9LjgbkL github.com/multiformats/go-multiaddr v0.0.2 h1:RBysRCv5rv3FWlhKWKoXv8tnsCUpEpIZpCmqAGZos2s= github.com/multiformats/go-multiaddr v0.0.2/go.mod h1:xKVEak1K9cS1VdmPZW3LSIb6lgmoS58qz/pzqmAxV44= github.com/multiformats/go-multibase v0.0.1/go.mod h1:bja2MqRZ3ggyXtZSEDKpl0uO/gviWFaSteVbWT51qgs= -github.com/multiformats/go-multihash v0.0.1 h1:HHwN1K12I+XllBCrqKnhX949Orn4oawPkegHMu2vDqQ= github.com/multiformats/go-multihash v0.0.1/go.mod h1:w/5tugSrLEbWqlcgJabL3oHFKTwfvkofsjW2Qa1ct4U= github.com/multiformats/go-multihash v0.0.5 h1:1wxmCvTXAifAepIMyF39vZinRw5sbqjPs/UIi93+uik= github.com/multiformats/go-multihash v0.0.5/go.mod h1:lt/HCbqlQwlPBz7lv0sQCdtfcMtlJvakRUn/0Ual8po= @@ -68,7 +57,6 @@ github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0b github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= golang.org/x/crypto v0.0.0-20170930174604-9419663f5a44/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190211182817-74369b46fc67/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= -golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b h1:+/WWzjwW6gidDJnMKWLKLX1gxn7irUTF1fLpQovfQ5M= golang.org/x/crypto v0.0.0-20190225124518-7f87c0fbb88b/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190426145343-a29dc8fdc734 h1:p/H982KKEjUnLJkM3tt/LemDnOc1GiZL5FCVlORJ5zo= @@ -78,7 +66,6 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190219092855-153ac476189d h1:Z0Ahzd7HltpJtjAHHxX8QFP3j1yYgiuvjbjRzDj/KH0= golang.org/x/sys v0.0.0-20190219092855-153ac476189d/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d h1:+R4KGOnez64A81RvjARKc4UT5/tI9ujCIVX+P5KiHuI= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/peertaskqueue.go b/peertaskqueue.go index 7780ea1..f4468b8 100644 --- a/peertaskqueue.go +++ b/peertaskqueue.go @@ -23,13 +23,14 @@ type hookFunc func(p peer.ID, event peerTaskQueueEvent) // to execute the block with the highest priority, or otherwise the one added // first if priorities are equal. type PeerTaskQueue struct { - lock sync.Mutex - pQueue pq.PQ - peerTrackers map[peer.ID]*peertracker.PeerTracker - frozenPeers map[peer.ID]struct{} - hooks []hookFunc - ignoreFreezing bool - taskMerger peertracker.TaskMerger + lock sync.Mutex + pQueue pq.PQ + peerTrackers map[peer.ID]*peertracker.PeerTracker + frozenPeers map[peer.ID]struct{} + hooks []hookFunc + ignoreFreezing bool + taskMerger peertracker.TaskMerger + maxOutstandingWorkPerPeer int } // Option is a function that configures the peer task queue @@ -62,6 +63,16 @@ func TaskMerger(tmfp peertracker.TaskMerger) Option { } } +// MaxOutstandingWorkPerPeer is an option that specifies how many tasks a peer can have outstanding +// with the same Topic as an existing Topic. +func MaxOutstandingWorkPerPeer(count int) Option { + return func(ptq *PeerTaskQueue) Option { + previous := ptq.maxOutstandingWorkPerPeer + ptq.maxOutstandingWorkPerPeer = count + return MaxOutstandingWorkPerPeer(previous) + } +} + func removeHook(hook hookFunc) Option { return func(ptq *PeerTaskQueue) Option { for i, testHook := range ptq.hooks { @@ -139,7 +150,7 @@ func (ptq *PeerTaskQueue) PushTasks(to peer.ID, tasks ...peertask.Task) { peerTracker, ok := ptq.peerTrackers[to] if !ok { - peerTracker = peertracker.New(to, ptq.taskMerger) + peerTracker = peertracker.New(to, ptq.taskMerger, ptq.maxOutstandingWorkPerPeer) ptq.pQueue.Push(peerTracker) ptq.peerTrackers[to] = peerTracker ptq.callHooks(to, peerAdded) diff --git a/peertracker/peertracker.go b/peertracker/peertracker.go index 7d73e35..19f2406 100644 --- a/peertracker/peertracker.go +++ b/peertracker/peertracker.go @@ -45,6 +45,8 @@ type PeerTracker struct { activelk sync.Mutex activeWork int + maxActiveWorkPerPeer int + // for the PQ interface index int @@ -57,13 +59,14 @@ type PeerTracker struct { } // New creates a new PeerTracker -func New(target peer.ID, taskMerger TaskMerger) *PeerTracker { +func New(target peer.ID, taskMerger TaskMerger, maxActiveWorkPerPeer int) *PeerTracker { return &PeerTracker{ - target: target, - taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), - pendingTasks: make(map[peertask.Topic]*peertask.QueueTask), - activeTasks: make(map[*peertask.Task]struct{}), - taskMerger: taskMerger, + target: target, + taskQueue: pq.New(peertask.WrapCompare(peertask.PriorityCompare)), + pendingTasks: make(map[peertask.Topic]*peertask.QueueTask), + activeTasks: make(map[*peertask.Task]struct{}), + taskMerger: taskMerger, + maxActiveWorkPerPeer: maxActiveWorkPerPeer, } } @@ -172,6 +175,16 @@ func (p *PeerTracker) PopTasks(targetMinWork int) ([]*peertask.Task, int) { var out []*peertask.Task work := 0 for p.taskQueue.Len() > 0 && p.freezeVal == 0 && work < targetMinWork { + if p.maxActiveWorkPerPeer > 0 { + // Do not add work to a peer that is already maxed out + p.activelk.Lock() + activeWork := p.activeWork + p.activelk.Unlock() + if activeWork >= p.maxActiveWorkPerPeer { + break + } + } + // Pop the next task off the queue t := p.taskQueue.Pop().(*peertask.QueueTask)