Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Enable custom task prioritization logic #535

Merged
merged 8 commits into from
Oct 28, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,16 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

type TaskInfo = decision.TaskInfo
type TaskComparator = decision.TaskComparator

// WithTaskComparator configures custom task prioritization logic.
func WithTaskComparator(comparator TaskComparator) Option {
return func(bs *Bitswap) {
bs.taskComparator = comparator
}
}

// New initializes a BitSwap instance that communicates over the provided
// BitSwapNetwork. This function registers the returned instance as the network
// delegate. Runs until context is cancelled or bitswap.Close is called.
Expand Down Expand Up @@ -272,6 +282,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -375,6 +386,8 @@ type Bitswap struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool

taskComparator TaskComparator
}

type counters struct {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ require (
github.com/ipfs/go-ipfs-util v0.0.2
github.com/ipfs/go-log v1.0.5
github.com/ipfs/go-metrics-interface v0.0.1
github.com/ipfs/go-peertaskqueue v0.4.0
github.com/ipfs/go-peertaskqueue v0.6.0
github.com/jbenet/goprocess v0.1.4
github.com/libp2p/go-buffer-pool v0.0.2
github.com/libp2p/go-libp2p v0.14.3
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -302,8 +302,8 @@ github.com/ipfs/go-log/v2 v2.1.3 h1:1iS3IU7aXRlbgUpN8yTTpJ53NXYjAe37vcI5+5nYrzk=
github.com/ipfs/go-log/v2 v2.1.3/go.mod h1:/8d0SH3Su5Ooc31QlL1WysJhvyOTDCjcCZ9Axpmri6g=
github.com/ipfs/go-metrics-interface v0.0.1 h1:j+cpbjYvu4R8zbleSs36gvB7jR+wsL2fGD6n0jO4kdg=
github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j/b/tL7HTWtJ4VPgWY=
github.com/ipfs/go-peertaskqueue v0.4.0 h1:x1hFgA4JOUJ3ntPfqLRu6v4k6kKL0p07r3RSg9JNyHI=
github.com/ipfs/go-peertaskqueue v0.4.0/go.mod h1:KL9F49hXJMoXCad8e5anivjN+kWdr+CyGcyh4K6doLc=
github.com/ipfs/go-peertaskqueue v0.6.0 h1:BT1/PuNViVomiz1PnnP5+WmKsTNHrxIDvkZrkj4JhOg=
github.com/ipfs/go-peertaskqueue v0.6.0/go.mod h1:M/akTIE/z1jGNXMU7kFB4TeSEFvj68ow0Rrb04donIU=
github.com/jackpal/gateway v1.0.5/go.mod h1:lTpwd4ACLXmpyiCTRtfiNyVnUmqT9RivzCDQetPfnjA=
github.com/jackpal/go-nat-pmp v1.0.1/go.mod h1:QPH045xvCAeXUZOxsnwmrtiCoxIr9eob+4orBN1SBKc=
github.com/jackpal/go-nat-pmp v1.0.2 h1:KzKSgb7qkJvOUTqYl9/Hg/me3pWgBmERKrTGD7BdWus=
Expand Down
78 changes: 76 additions & 2 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/ipfs/go-metrics-interface"
"github.com/ipfs/go-peertaskqueue"
"github.com/ipfs/go-peertaskqueue/peertask"
"github.com/ipfs/go-peertaskqueue/peertracker"
process "github.com/jbenet/goprocess"
"github.com/libp2p/go-libp2p-core/peer"
)
Expand Down Expand Up @@ -175,6 +176,60 @@ type Engine struct {
// used to ensure metrics are reported each fixed number of operation
metricsLock sync.Mutex
metricUpdateCounter int

taskComparator TaskComparator
}

// TaskInfo represents the details of a request from a peer.
type TaskInfo struct {
synzhu marked this conversation as resolved.
Show resolved Hide resolved
Peer peer.ID
// The CID of the block
Cid cid.Cid
// Tasks can be want-have or want-block
IsWantBlock bool
// Whether to immediately send a response if the block is not found
SendDontHave bool
// The size of the block corresponding to the task
BlockSize int
// Whether the block was found
HaveBlock bool
}

// TaskComparator is used for task prioritization.
// It should return true if task 'ta' has higher priority than task 'tb'
type TaskComparator func(ta, tb *TaskInfo) bool

type Option func(*Engine)

func WithTaskComparator(comparator TaskComparator) Option {
return func(e *Engine) {
e.taskComparator = comparator
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
taskDataA := a.Task.Data.(*taskData)
taskInfoA := &TaskInfo{
Peer: a.Target,
Cid: a.Task.Topic.(cid.Cid),
IsWantBlock: taskDataA.IsWantBlock,
SendDontHave: taskDataA.SendDontHave,
BlockSize: taskDataA.BlockSize,
HaveBlock: taskDataA.HaveBlock,
}
taskDataB := b.Task.Data.(*taskData)
taskInfoB := &TaskInfo{
Peer: b.Target,
Cid: b.Task.Topic.(cid.Cid),
IsWantBlock: taskDataB.IsWantBlock,
SendDontHave: taskDataB.SendDontHave,
BlockSize: taskDataB.BlockSize,
HaveBlock: taskDataB.HaveBlock,
}
return tc(taskInfoA, taskInfoB)
}
}

// NewEngine creates a new block sending engine for the given block store.
Expand All @@ -192,6 +247,7 @@ func NewEngine(
activeEngineGauge metrics.Gauge,
pendingBlocksGauge metrics.Gauge,
activeBlocksGauge metrics.Gauge,
opts ...Option,
) *Engine {
return newEngine(
ctx,
Expand All @@ -207,6 +263,7 @@ func NewEngine(
activeEngineGauge,
pendingBlocksGauge,
activeBlocksGauge,
opts...,
)
}

Expand All @@ -223,6 +280,7 @@ func newEngine(
activeEngineGauge metrics.Gauge,
pendingBlocksGauge metrics.Gauge,
activeBlocksGauge metrics.Gauge,
opts ...Option,
) *Engine {

if scoreLedger == nil {
Expand All @@ -247,12 +305,28 @@ func newEngine(
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
e.peerRequestQueue = peertaskqueue.New(

for _, opt := range opts {
opt(e)
}

// default peer task queue options
peerTaskQueueOpts := []peertaskqueue.Option{
peertaskqueue.OnPeerAddedHook(e.onPeerAdded),
peertaskqueue.OnPeerRemovedHook(e.onPeerRemoved),
peertaskqueue.TaskMerger(newTaskMerger()),
peertaskqueue.IgnoreFreezing(true),
peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer))
peertaskqueue.MaxOutstandingWorkPerPeer(maxOutstandingBytesPerPeer),
}

if e.taskComparator != nil {
queueTaskComparator := wrapTaskComparator(e.taskComparator)
peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.PeerComparator(peertracker.TaskPriorityPeerComparator(queueTaskComparator)))
peerTaskQueueOpts = append(peerTaskQueueOpts, peertaskqueue.TaskComparator(queueTaskComparator))
}

e.peerRequestQueue = peertaskqueue.New(peerTaskQueueOpts...)

return e
}

Expand Down
66 changes: 62 additions & 4 deletions internal/decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/ipfs/go-metrics-interface"

blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ds "github.com/ipfs/go-datastore"
dssync "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
Expand Down Expand Up @@ -92,14 +93,14 @@ type engineSet struct {
Blockstore blockstore.Blockstore
}

func newTestEngine(ctx context.Context, idStr string) engineSet {
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New())
func newTestEngine(ctx context.Context, idStr string, opts ...Option) engineSet {
return newTestEngineWithSampling(ctx, idStr, shortTerm, nil, clock.New(), opts...)
}

func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock) engineSet {
func newTestEngineWithSampling(ctx context.Context, idStr string, peerSampleInterval time.Duration, sampleCh chan struct{}, clock clock.Clock, opts ...Option) engineSet {
fpt := &fakePeerTagger{}
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock))
e := newEngineForTesting(ctx, bs, 4, defaults.BitswapEngineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, NewTestScoreLedger(peerSampleInterval, sampleCh, clock), opts...)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))
return engineSet{
Peer: peer.ID(idStr),
Expand Down Expand Up @@ -193,6 +194,7 @@ func newEngineForTesting(
self peer.ID,
maxReplaceSize int,
scoreLedger ScoreLedger,
opts ...Option,
) *Engine {
testPendingEngineGauge := metrics.NewCtx(ctx, "pending_tasks", "Total number of pending tasks").Gauge()
testActiveEngineGauge := metrics.NewCtx(ctx, "active_tasks", "Total number of active tasks").Gauge()
Expand All @@ -212,6 +214,7 @@ func newEngineForTesting(
testActiveEngineGauge,
testPendingBlocksGauge,
testActiveBlocksGauge,
opts...,
)
}

Expand Down Expand Up @@ -1054,6 +1057,61 @@ func TestWantlistForPeer(t *testing.T) {

}

func TestTaskComparator(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

keys := []string{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}
cids := make(map[cid.Cid]int)
blks := make([]blocks.Block, 0, len(keys))
for i, letter := range keys {
block := blocks.NewBlock([]byte(letter))
blks = append(blks, block)
cids[block.Cid()] = i
}

fpt := &fakePeerTagger{}
sl := NewTestScoreLedger(shortTerm, nil, clock.New())
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
if err := bs.PutMany(blks); err != nil {
t.Fatal(err)
}

// use a single task worker so that the order of outgoing messages is deterministic
engineTaskWorkerCount := 1
e := newEngineForTesting(ctx, bs, 4, engineTaskWorkerCount, defaults.BitswapMaxOutstandingBytesPerPeer, fpt, "localhost", 0, sl,
// if this Option is omitted, the test fails
WithTaskComparator(func(ta, tb *TaskInfo) bool {
// prioritize based on lexicographic ordering of block content
return cids[ta.Cid] < cids[tb.Cid]
}),
)
e.StartWorkers(ctx, process.WithTeardown(func() error { return nil }))

// rely on randomness of Go map's iteration order to add Want entries in random order
peerIDs := make([]peer.ID, len(keys))
for _, i := range cids {
peerID := libp2ptest.RandPeerIDFatal(t)
peerIDs[i] = peerID
partnerWantBlocks(e, keys[i:i+1], peerID)
}

// check that outgoing messages are sent in the correct order
for i, peerID := range peerIDs {
next := <-e.Outbox()
envelope := <-next
if peerID != envelope.Peer {
t.Errorf("expected message for peer ID %#v but instead got message for peer ID %#v", peerID, envelope.Peer)
}
responseBlocks := envelope.Message.Blocks()
if len(responseBlocks) != 1 {
t.Errorf("expected 1 block in response but instead got %v", len(blks))
} else if responseBlocks[0].Cid() != blks[i].Cid() {
t.Errorf("expected block with CID %#v but instead got block with CID %#v", blks[i].Cid(), responseBlocks[0].Cid())
}
}
}

func TestTaggingPeers(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
Expand Down