Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
configurable target message size
Browse files Browse the repository at this point in the history
  • Loading branch information
whyrusleeping committed Dec 14, 2021
1 parent 616d0c2 commit ada55fc
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 4 deletions.
11 changes: 11 additions & 0 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithTargetMessageSize(tms int) Option {
return func(bs *Bitswap) {
bs.engineTargetMessageSize = tms
}
}

type TaskInfo = decision.TaskInfo
type TaskComparator = decision.TaskComparator

Expand Down Expand Up @@ -259,6 +265,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
engineTaskWorkerCount: defaults.BitswapEngineTaskWorkerCount,
taskWorkerCount: defaults.BitswapTaskWorkerCount,
engineMaxOutstandingBytesPerPeer: defaults.BitswapMaxOutstandingBytesPerPeer,
engineTargetMessageSize: defaults.BitswapEngineTargetMessageSize,
engineSetSendDontHaves: true,
simulateDontHavesOnTimeout: true,
}
Expand All @@ -283,6 +290,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
pendingBlocksGauge,
activeBlocksGauge,
decision.WithTaskComparator(bs.taskComparator),
decision.WithTargetMessageSize(bs.engineTargetMessageSize),
)
bs.engine.SetSendDontHaves(bs.engineSetSendDontHaves)

Expand Down Expand Up @@ -379,6 +387,9 @@ type Bitswap struct {
// the score ledger used by the decision engine
engineScoreLedger deciface.ScoreLedger

// target message size setting for engines peer task queue
engineTargetMessageSize int

// indicates what to do when the engine receives a want-block for a block that
// is not in the blockstore. Either send DONT_HAVE or do nothing.
// This is used to simulate older versions of bitswap that did nothing instead of sending back a DONT_HAVE.
Expand Down
17 changes: 13 additions & 4 deletions internal/decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ const (
// targetMessageSize is the ideal size of the batched payload. We try to
// pop this much data off the request queue, but it may be a little more
// or less depending on what's in the queue.
targetMessageSize = 16 * 1024
defaultTargetMessageSize = 16 * 1024
// tagFormat is the tag given to peers associated an engine
tagFormat = "bs-engine-%s-%s"

Expand Down Expand Up @@ -159,6 +159,8 @@ type Engine struct {
taskWorkerLock sync.Mutex
taskWorkerCount int

targetMessageSize int

// maxBlockSizeReplaceHasWithBlock is the maximum size of the block in
// bytes up to which we will replace a want-have with a want-block
maxBlockSizeReplaceHasWithBlock int
Expand Down Expand Up @@ -207,6 +209,12 @@ func WithTaskComparator(comparator TaskComparator) Option {
}
}

func WithTargetMessageSize(size int) Option {
return func(e *Engine) {
e.targetMessageSize = size
}
}

// wrapTaskComparator wraps a TaskComparator so it can be used as a QueueTaskComparator
func wrapTaskComparator(tc TaskComparator) peertask.QueueTaskComparator {
return func(a, b *peertask.QueueTask) bool {
Expand Down Expand Up @@ -302,6 +310,7 @@ func newEngine(
peerLedger: newPeerLedger(),
pendingGauge: pendingEngineGauge,
activeGauge: activeEngineGauge,
targetMessageSize: defaultTargetMessageSize,
}
e.tagQueued = fmt.Sprintf(tagFormat, "queued", uuid.New().String())
e.tagUseful = fmt.Sprintf(tagFormat, "useful", uuid.New().String())
Expand Down Expand Up @@ -450,21 +459,21 @@ func (e *Engine) taskWorkerExit() {
func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for {
// Pop some tasks off the request queue
p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes := e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
for len(nextTasks) == 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-e.workSignal:
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
case <-e.ticker.C:
// When a task is cancelled, the queue may be "frozen" for a
// period of time. We periodically "thaw" the queue to make
// sure it doesn't get stuck in a frozen state.
e.peerRequestQueue.ThawRound()
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(targetMessageSize)
p, nextTasks, pendingBytes = e.peerRequestQueue.PopTasks(e.targetMessageSize)
e.updateMetrics()
}
}
Expand Down
2 changes: 2 additions & 0 deletions internal/defaults/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ const (
BitswapEngineTaskWorkerCount = 8
// the total amount of bytes that a peer should have outstanding, it is utilized by the decision engine
BitswapMaxOutstandingBytesPerPeer = 1 << 20
// the number of bytes we attempt to make each outgoing bitswap message
BitswapEngineTargetMessageSize = 16 * 1024
)

0 comments on commit ada55fc

Please sign in to comment.