Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

Commit

Permalink
feat: bitswap protocol extensions
Browse files Browse the repository at this point in the history
This commit extends the bitswap protocol with two additional wantlist properties:

* WANT_HAVE/HAVE: Instead of asking for a block, a node can specify that they
  want to know if any peers "have" the block.
* WANT_HAVE_NOT/HAVE_NOT: Instead of waiting for a timeout, a node can explicitly
  request to be told immediately if their peers don't currently have the given
  block.

Additionally, nodes now tell their peers how much data they have queued to send
them when sending messages. This allows peers to better distribute requests,
keeping all peers busy but not overloaded.

Changes in this PR are described in: #186
  • Loading branch information
dirkmc authored and Stebalien committed Jan 30, 2020
1 parent dcfe40e commit b3a47bc
Show file tree
Hide file tree
Showing 60 changed files with 8,216 additions and 1,857 deletions.
434 changes: 350 additions & 84 deletions benchmarks_test.go

Large diffs are not rendered by default.

125 changes: 66 additions & 59 deletions bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@ package bitswap
import (
"context"
"errors"

"sync"
"time"

bssrs "github.com/ipfs/go-bitswap/sessionrequestsplitter"
delay "github.com/ipfs/go-ipfs-delay"

bsbpm "github.com/ipfs/go-bitswap/blockpresencemanager"
decision "github.com/ipfs/go-bitswap/decision"
bsgetter "github.com/ipfs/go-bitswap/getter"
bsmsg "github.com/ipfs/go-bitswap/message"
Expand All @@ -20,6 +21,7 @@ import (
bspm "github.com/ipfs/go-bitswap/peermanager"
bspqm "github.com/ipfs/go-bitswap/providerquerymanager"
bssession "github.com/ipfs/go-bitswap/session"
bssim "github.com/ipfs/go-bitswap/sessioninterestmanager"
bssm "github.com/ipfs/go-bitswap/sessionmanager"
bsspm "github.com/ipfs/go-bitswap/sessionpeermanager"
bswm "github.com/ipfs/go-bitswap/wantmanager"
Expand Down Expand Up @@ -113,24 +115,30 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
return bsmq.New(ctx, p, network)
}

wm := bswm.New(ctx, bspm.New(ctx, peerQueueFactory))
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
wm := bswm.New(ctx, pm, sim, bpm)
pqm := bspqm.New(ctx, network)

sessionFactory := func(ctx context.Context, id uint64, pm bssession.PeerManager, srs bssession.RequestSplitter,
sessionFactory := func(ctx context.Context, id uint64, spm bssession.SessionPeerManager,
sim *bssim.SessionInterestManager,
pm bssession.PeerManager,
bpm *bsbpm.BlockPresenceManager,
notif notifications.PubSub,
provSearchDelay time.Duration,
rebroadcastDelay delay.D) bssm.Session {
return bssession.New(ctx, id, wm, pm, srs, notif, provSearchDelay, rebroadcastDelay)
rebroadcastDelay delay.D,
self peer.ID) bssm.Session {
return bssession.New(ctx, id, wm, spm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.PeerManager {
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(ctx, id, network.ConnectionManager(), pqm)
}
sessionRequestSplitterFactory := func(ctx context.Context) bssession.RequestSplitter {
return bssrs.New(ctx)
}
notif := notifications.New()
sm := bssm.New(ctx, sessionFactory, sim, sessionPeerManagerFactory, bpm, pm, notif, network.Self())
wm.SetSessionManager(sm)
engine := decision.NewEngine(ctx, bstore, network.ConnectionManager(), network.Self())

engine := decision.NewEngine(ctx, bstore, network.ConnectionManager()) // TODO close the engine with Close() method
bs := &Bitswap{
blockstore: bstore,
engine: engine,
Expand All @@ -139,8 +147,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
newBlocks: make(chan cid.Cid, HasBlockBufferSize),
provideKeys: make(chan cid.Cid, provideKeysBufferSize),
wm: wm,
pm: pm,
pqm: pqm,
sm: bssm.New(ctx, sessionFactory, sessionPeerManagerFactory, sessionRequestSplitterFactory, notif),
sm: sm,
sim: sim,
notif: notif,
counters: new(counters),
dupMetric: dupHist,
Expand All @@ -156,7 +166,6 @@ func New(parent context.Context, network bsnet.BitSwapNetwork,
option(bs)
}

bs.wm.Startup()
bs.pqm.Startup()
network.SetDelegate(bs)

Expand All @@ -181,6 +190,8 @@ type Bitswap struct {
// the wantlist tracks global wants for bitswap
wm *bswm.WantManager

pm *bspm.PeerManager

// the provider query manager manages requests to find providers
pqm *bspqm.ProviderQueryManager

Expand Down Expand Up @@ -215,9 +226,13 @@ type Bitswap struct {
allMetric metrics.Histogram
sentHistogram metrics.Histogram

// the sessionmanager manages tracking sessions
// the SessionManager routes requests to interested sessions
sm *bssm.SessionManager

// the SessionInterestManager keeps track of which sessions are interested
// in which CIDs
sim *bssim.SessionInterestManager

// whether or not to make provide announcements
provideEnabled bool

Expand Down Expand Up @@ -275,14 +290,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks
// HasBlock announces the existence of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *Bitswap) HasBlock(blk blocks.Block) error {
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk})
return bs.receiveBlocksFrom(context.Background(), "", []blocks.Block{blk}, nil, nil)
}

// TODO: Some of this stuff really only needs to be done when adding a block
// from the user, not when receiving it from the network.
// In case you run `git blame` on this comment, I'll save you some time: ask
// @whyrusleeping, I don't know the answers you seek.
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block) error {
func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []blocks.Block, haves []cid.Cid, dontHaves []cid.Cid) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
Expand All @@ -293,22 +308,20 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b

// If blocks came from the network
if from != "" {
// Split blocks into wanted blocks vs duplicates
wanted = make([]blocks.Block, 0, len(blks))
for _, b := range blks {
if bs.sm.IsWanted(b.Cid()) {
wanted = append(wanted, b)
} else {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
var notWanted []blocks.Block
wanted, notWanted = bs.sim.SplitWantedUnwanted(blks)
for _, b := range notWanted {
log.Debugf("[recv] block not in wantlist; cid=%s, peer=%s", b.Cid(), from)
}
}

// Put wanted blocks into blockstore
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
if len(wanted) > 0 {
err := bs.blockstore.PutMany(wanted)
if err != nil {
log.Errorf("Error writing %d blocks to datastore: %s", len(wanted), err)
return err
}
}

// NOTE: There exists the possiblity for a race condition here. If a user
Expand All @@ -322,33 +335,25 @@ func (bs *Bitswap) receiveBlocksFrom(ctx context.Context, from peer.ID, blks []b
allKs = append(allKs, b.Cid())
}

wantedKs := allKs
if len(blks) != len(wanted) {
wantedKs = make([]cid.Cid, 0, len(wanted))
for _, b := range wanted {
wantedKs = append(wantedKs, b.Cid())
}
}

// Send all block keys (including duplicates) to any sessions that want them.
// (The duplicates are needed by sessions for accounting purposes)
bs.sm.ReceiveFrom(from, allKs)
bs.wm.ReceiveFrom(ctx, from, allKs, haves, dontHaves)

// Send wanted block keys to decision engine
bs.engine.AddBlocks(wantedKs)
// Send wanted blocks to decision engine
bs.engine.ReceiveFrom(from, wanted, haves)

// Publish the block to any Bitswap clients that had requested blocks.
// (the sessions use this pubsub mechanism to inform clients of received
// (the sessions use this pubsub mechanism to inform clients of incoming
// blocks)
for _, b := range wanted {
bs.notif.Publish(b)
}

// If the reprovider is enabled, send wanted blocks to reprovider
if bs.provideEnabled {
for _, k := range wantedKs {
for _, blk := range wanted {
select {
case bs.newBlocks <- k:
case bs.newBlocks <- blk.Cid():
// send block off to be reprovided
case <-bs.process.Closing():
return bs.process.Close()
Expand Down Expand Up @@ -380,20 +385,22 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg

iblocks := incoming.Blocks()

if len(iblocks) == 0 {
return
}

bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
if len(iblocks) > 0 {
bs.updateReceiveCounters(iblocks)
for _, b := range iblocks {
log.Debugf("[recv] block; cid=%s, peer=%s", b.Cid(), p)
}
}

// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
haves := incoming.Haves()
dontHaves := incoming.DontHaves()
if len(iblocks) > 0 || len(haves) > 0 || len(dontHaves) > 0 {
// Process blocks
err := bs.receiveBlocksFrom(ctx, p, iblocks, haves, dontHaves)
if err != nil {
log.Warningf("ReceiveMessage recvBlockFrom error: %s", err)
return
}
}
}

Expand Down Expand Up @@ -479,12 +486,12 @@ func (bs *Bitswap) Close() error {

// GetWantlist returns the current local wantlist.
func (bs *Bitswap) GetWantlist() []cid.Cid {
entries := bs.wm.CurrentWants()
out := make([]cid.Cid, 0, len(entries))
for _, e := range entries {
out = append(out, e.Cid)
}
return out
return bs.pm.CurrentWants()
}

// GetWanthaves returns the current list of want-haves.
func (bs *Bitswap) GetWantHaves() []cid.Cid {
return bs.pm.CurrentWantHaves()
}

// IsOnline is needed to match go-ipfs-exchange-interface
Expand Down
25 changes: 18 additions & 7 deletions bitswap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,15 +571,17 @@ func TestWantlistCleanup(t *testing.T) {
defer ig.Close()
bg := blocksutil.NewBlockGenerator()

instances := ig.Instances(1)[0]
bswap := instances.Exchange
instances := ig.Instances(2)
instance := instances[0]
bswap := instance.Exchange
blocks := bg.Blocks(20)

var keys []cid.Cid
for _, b := range blocks {
keys = append(keys, b.Cid())
}

// Once context times out, key should be removed from wantlist
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err := bswap.GetBlock(ctx, keys[0])
Expand All @@ -589,10 +591,11 @@ func TestWantlistCleanup(t *testing.T) {

time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Once context times out, keys should be removed from wantlist
ctx, cancel = context.WithTimeout(context.Background(), time.Millisecond*50)
defer cancel()
_, err = bswap.GetBlocks(ctx, keys[:10])
Expand All @@ -603,29 +606,37 @@ func TestWantlistCleanup(t *testing.T) {
<-ctx.Done()
time.Sleep(time.Millisecond * 50)

if len(bswap.GetWantlist()) > 0 {
if len(bswap.GetWantHaves()) > 0 {
t.Fatal("should not have anyting in wantlist")
}

// Send want for single block, with no timeout
_, err = bswap.GetBlocks(context.Background(), keys[:1])
if err != nil {
t.Fatal(err)
}

// Send want for 10 blocks
ctx, cancel = context.WithCancel(context.Background())
_, err = bswap.GetBlocks(ctx, keys[10:])
if err != nil {
t.Fatal(err)
}

// Even after 50 milli-seconds we haven't explicitly cancelled anything
// and no timeouts have expired, so we should have 11 want-haves
time.Sleep(time.Millisecond * 50)
if len(bswap.GetWantlist()) != 5 {
t.Fatal("should have 5 keys in wantlist")
if len(bswap.GetWantHaves()) != 11 {
t.Fatal("should have 11 keys in wantlist")
}

// Cancel the timeout for the request for 10 blocks. This should remove
// the want-haves
cancel()

// Once the cancel is processed, we are left with the request for 1 block
time.Sleep(time.Millisecond * 50)
if !(len(bswap.GetWantlist()) == 1 && bswap.GetWantlist()[0] == keys[0]) {
if !(len(bswap.GetWantHaves()) == 1 && bswap.GetWantHaves()[0] == keys[0]) {
t.Fatal("should only have keys[0] in wantlist")
}
}
Expand Down
Loading

0 comments on commit b3a47bc

Please sign in to comment.