Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Dont hang outstanding jobs when blockstoreManager shuts down #239

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

// This call records changes to wantlists, blocks received,
// and number of bytes transfered.
bs.engine.MessageReceived(ctx, p, incoming)
bs.engine.MessageReceived(p, incoming)
// TODO: this is bad, and could be easily abused.
// Should only track *useful* messages in ledger

Expand Down
46 changes: 32 additions & 14 deletions decision/blockstoremanager.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package decision

import (
"context"
"fmt"
"sync"

blocks "github.com/ipfs/go-block-format"
Expand All @@ -14,17 +14,19 @@ import (
type blockstoreManager struct {
bs bstore.Blockstore
workerCount int
jobs chan func()
jobs chan func(error)
px process.Process
}

var scheduleErr = fmt.Errorf("Could not schedule worker - blockstore manager shut down")

// newBlockstoreManager creates a new blockstoreManager with the given context
// and number of workers
func newBlockstoreManager(ctx context.Context, bs bstore.Blockstore, workerCount int) *blockstoreManager {
func newBlockstoreManager(bs bstore.Blockstore, workerCount int) *blockstoreManager {
return &blockstoreManager{
bs: bs,
workerCount: workerCount,
jobs: make(chan func()),
jobs: make(chan func(error)),
}
}

Expand All @@ -43,29 +45,42 @@ func (bsm *blockstoreManager) worker() {
for {
select {
case <-bsm.px.Closing():
// If the process is closing, shut down the worker
bsm.shutdownWorker()
return
case job := <-bsm.jobs:
job()
job(nil)
}
}
}

func (bsm *blockstoreManager) shutdownWorker() {
// Drain the queue of outstanding jobs
for len(bsm.jobs) > 0 {
select {
case job := <-bsm.jobs:
job(scheduleErr)
default:
}
}
}

func (bsm *blockstoreManager) addJob(ctx context.Context, job func()) {
func (bsm *blockstoreManager) addJob(job func(error)) {
select {
case <-ctx.Done():
case <-bsm.px.Closing():
job(scheduleErr)
case bsm.jobs <- job:
}
}

func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) map[cid.Cid]int {
func (bsm *blockstoreManager) getBlockSizes(ks []cid.Cid) map[cid.Cid]int {
res := make(map[cid.Cid]int)
if len(ks) == 0 {
return res
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
bsm.jobPerKey(ks, func(c cid.Cid) {
size, err := bsm.bs.GetSize(c)
if err != nil {
if err != bstore.ErrNotFound {
Expand All @@ -81,14 +96,14 @@ func (bsm *blockstoreManager) getBlockSizes(ctx context.Context, ks []cid.Cid) m
return res
}

func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[cid.Cid]blocks.Block {
func (bsm *blockstoreManager) getBlocks(ks []cid.Cid) map[cid.Cid]blocks.Block {
res := make(map[cid.Cid]blocks.Block)
if len(ks) == 0 {
return res
}

var lk sync.Mutex
bsm.jobPerKey(ctx, ks, func(c cid.Cid) {
bsm.jobPerKey(ks, func(c cid.Cid) {
blk, err := bsm.bs.Get(c)
if err != nil {
if err != bstore.ErrNotFound {
Expand All @@ -104,13 +119,16 @@ func (bsm *blockstoreManager) getBlocks(ctx context.Context, ks []cid.Cid) map[c
return res
}

func (bsm *blockstoreManager) jobPerKey(ctx context.Context, ks []cid.Cid, jobFn func(c cid.Cid)) {
func (bsm *blockstoreManager) jobPerKey(ks []cid.Cid, jobFn func(c cid.Cid)) {
wg := sync.WaitGroup{}
for _, k := range ks {
c := k
wg.Add(1)
bsm.addJob(ctx, func() {
jobFn(c)
bsm.addJob(func(err error) {
// Check if there was a scheduling error (because the manager was shut down)
if err == nil {
jobFn(c)
}
wg.Done()
})
}
Expand Down
64 changes: 16 additions & 48 deletions decision/blockstoremanager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,16 +21,15 @@ import (
)

func TestBlockstoreManagerNotFoundKey(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

cids := testutil.GenerateCids(4)
sizes := bsm.getBlockSizes(ctx, cids)
sizes := bsm.getBlockSizes(cids)
if len(sizes) != 0 {
t.Fatal("Wrong response length")
}
Expand All @@ -41,7 +40,7 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
}
}

blks := bsm.getBlocks(ctx, cids)
blks := bsm.getBlocks(cids)
if len(blks) != 0 {
t.Fatal("Wrong response length")
}
Expand All @@ -54,12 +53,11 @@ func TestBlockstoreManagerNotFoundKey(t *testing.T) {
}

func TestBlockstoreManager(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 5)
bsm := newBlockstoreManager(bstore, 5)
bsm.start(process.WithTeardown(func() error { return nil }))

exp := make(map[cid.Cid]blocks.Block)
Expand All @@ -82,7 +80,7 @@ func TestBlockstoreManager(t *testing.T) {
cids = append(cids, b.Cid())
}

sizes := bsm.getBlockSizes(ctx, cids)
sizes := bsm.getBlockSizes(cids)
if len(sizes) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand All @@ -106,7 +104,7 @@ func TestBlockstoreManager(t *testing.T) {
}
}

fetched := bsm.getBlocks(ctx, cids)
fetched := bsm.getBlocks(cids)
if len(fetched) != len(blks)-1 {
t.Fatal("Wrong response length")
}
Expand All @@ -131,13 +129,12 @@ func TestBlockstoreManager(t *testing.T) {
}

func TestBlockstoreManagerConcurrency(t *testing.T) {
ctx := context.Background()
bsdelay := delay.Fixed(3 * time.Millisecond)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

workerCount := 5
bsm := newBlockstoreManager(ctx, bstore, workerCount)
bsm := newBlockstoreManager(bstore, workerCount)
bsm.start(process.WithTeardown(func() error { return nil }))

blkSize := int64(8 * 1024)
Expand All @@ -160,7 +157,7 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
go func(t *testing.T) {
defer wg.Done()

sizes := bsm.getBlockSizes(ctx, ks)
sizes := bsm.getBlockSizes(ks)
if len(sizes) != len(blks) {
err = errors.New("Wrong response length")
}
Expand All @@ -174,13 +171,12 @@ func TestBlockstoreManagerConcurrency(t *testing.T) {
}

func TestBlockstoreManagerClose(t *testing.T) {
ctx := context.Background()
delayTime := 20 * time.Millisecond
bsdelay := delay.Fixed(delayTime)
dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 3)
bsm := newBlockstoreManager(bstore, 3)
px := process.WithTeardown(func() error { return nil })
bsm.start(px)

Expand All @@ -201,51 +197,23 @@ func TestBlockstoreManagerClose(t *testing.T) {

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
bsm.getBlockSizes(ks)
fnCallDone <- struct{}{}
}()

select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-px.Closed():
// px should be closed before getBlockSizes() returns
}
}

func TestBlockstoreManagerCtxDone(t *testing.T) {
delayTime := 20 * time.Millisecond
ctx := context.Background()
ctx, cancel := context.WithTimeout(context.Background(), delayTime/2)
defer cancel()
bsdelay := delay.Fixed(delayTime)

dstore := ds_sync.MutexWrap(delayed.New(ds.NewMapDatastore(), bsdelay))
bstore := blockstore.NewBlockstore(ds_sync.MutexWrap(dstore))

bsm := newBlockstoreManager(ctx, bstore, 3)
proc := process.WithTeardown(func() error { return nil })
bsm.start(proc)

blks := testutil.GenerateBlocksOfSize(3, 1024)
var ks []cid.Cid
for _, b := range blks {
ks = append(ks, b.Cid())
}

err := bstore.PutMany(blks)
if err != nil {
t.Fatal(err)
}

fnCallDone := make(chan struct{})
go func() {
bsm.getBlockSizes(ctx, ks)
fnCallDone <- struct{}{}
}()

ctx2, cancel2 := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel2()
select {
case <-fnCallDone:
t.Fatal("call to BlockstoreManager should be cancelled")
case <-ctx.Done():
// getBlockSizes() should return before context times out
case <-ctx2.Done():
t.Fatal("call to BlockstoreManager should eventually return")
}
}
8 changes: 4 additions & 4 deletions decision/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ type Engine struct {
func NewEngine(ctx context.Context, bs bstore.Blockstore, peerTagger PeerTagger) *Engine {
e := &Engine{
ledgerMap: make(map[peer.ID]*ledger),
bsm: newBlockstoreManager(ctx, bs, blockstoreWorkerCount),
bsm: newBlockstoreManager(bs, blockstoreWorkerCount),
peerTagger: peerTagger,
outbox: make(chan (<-chan *Envelope), outboxChanBuffer),
workSignal: make(chan struct{}, 1),
Expand Down Expand Up @@ -367,7 +367,7 @@ func (e *Engine) nextEnvelope(ctx context.Context) (*Envelope, error) {
for _, t := range nextTask.Tasks {
blockCids.Add(t.Identifier.(cid.Cid))
}
blks := e.bsm.getBlocks(ctx, blockCids.Keys())
blks := e.bsm.getBlocks(blockCids.Keys())

msg := bsmsg.New(true)
for _, b := range blks {
Expand Down Expand Up @@ -417,7 +417,7 @@ func (e *Engine) Peers() []peer.ID {

// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) {
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) {
if m.Empty() {
log.Debugf("received empty message from %s", p)
}
Expand All @@ -437,7 +437,7 @@ func (e *Engine) MessageReceived(ctx context.Context, p peer.ID, m bsmsg.BitSwap
wantKs.Add(entry.Cid)
}
}
blockSizes := e.bsm.getBlockSizes(ctx, wantKs.Keys())
blockSizes := e.bsm.getBlockSizes(wantKs.Keys())

l := e.findOrCreate(p)
l.lk.Lock()
Expand Down
8 changes: 4 additions & 4 deletions decision/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func TestConsistentAccounting(t *testing.T) {
m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " "))))

sender.Engine.MessageSent(receiver.Peer, m)
receiver.Engine.MessageReceived(ctx, sender.Peer, m)
receiver.Engine.MessageReceived(sender.Peer, m)
}

// Ensure sender records the change
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) {
m := message.New(true)

sanfrancisco.Engine.MessageSent(seattle.Peer, m)
seattle.Engine.MessageReceived(ctx, sanfrancisco.Peer, m)
seattle.Engine.MessageReceived(sanfrancisco.Peer, m)

if seattle.Peer == sanfrancisco.Peer {
t.Fatal("Sanity Check: Peers have same Key!")
Expand Down Expand Up @@ -328,7 +328,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) {
block := blocks.NewBlock([]byte(letter))
add.AddEntry(block.Cid(), len(keys)-i)
}
e.MessageReceived(context.Background(), partner, add)
e.MessageReceived(partner, add)
}

func partnerCancels(e *Engine, keys []string, partner peer.ID) {
Expand All @@ -337,7 +337,7 @@ func partnerCancels(e *Engine, keys []string, partner peer.ID) {
block := blocks.NewBlock([]byte(k))
cancels.Cancel(block.Cid())
}
e.MessageReceived(context.Background(), partner, cancels)
e.MessageReceived(partner, cancels)
}

func checkHandledInOrder(t *testing.T, e *Engine, expected [][]string) error {
Expand Down