Skip to content

Commit

Permalink
Bitswap: make ProviderFinder a param for client.New(). Fix tests.
Browse files Browse the repository at this point in the history
  • Loading branch information
hsanjuan committed Nov 19, 2024
1 parent b4ab3e9 commit 78e7a7c
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 16 deletions.
4 changes: 2 additions & 2 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Bitswap struct {
net network.BitSwapNetwork
}

func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Blockstore, options ...Option) *Bitswap {
func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Bitswap {
bs := &Bitswap{
net: net,
}
Expand Down Expand Up @@ -86,7 +86,7 @@ func New(ctx context.Context, net network.BitSwapNetwork, bstore blockstore.Bloc
ctx = metrics.CtxSubScope(ctx, "bitswap")

bs.Server = server.New(ctx, net, bstore, serverOptions...)
bs.Client = client.New(ctx, net, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
bs.Client = client.New(ctx, net, providerFinder, bstore, append(clientOptions, client.WithBlockReceivedNotifier(bs.Server))...)
net.Start(bs) // use the polyfill receiver to log received errors and trace messages only once

return bs
Expand Down
5 changes: 4 additions & 1 deletion bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Routing.Provide(ctx, blk.Cid(), true)
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down Expand Up @@ -236,7 +240,6 @@ func TestFetchNotConnected(t *testing.T) {
thisNode := ig.Next()
ses := thisNode.Exchange.NewSession(ctx).(*session.Session)
ses.SetBaseTickDelay(time.Millisecond * 10)

ch, err := ses.GetBlocks(ctx, cids)
if err != nil {
t.Fatal(err)
Expand Down
32 changes: 23 additions & 9 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
bsmq "github.com/ipfs/boxo/bitswap/client/internal/messagequeue"
"github.com/ipfs/boxo/bitswap/client/internal/notifications"
bspm "github.com/ipfs/boxo/bitswap/client/internal/peermanager"
"github.com/ipfs/boxo/bitswap/client/internal/session"

Check failure on line 16 in bitswap/client/client.go

View workflow job for this annotation

GitHub Actions / go-check / All

package "github.com/ipfs/boxo/bitswap/client/internal/session" is being imported more than once (ST1019)
bssession "github.com/ipfs/boxo/bitswap/client/internal/session"
bssim "github.com/ipfs/boxo/bitswap/client/internal/sessioninterestmanager"
bssm "github.com/ipfs/boxo/bitswap/client/internal/sessionmanager"
Expand All @@ -32,7 +33,6 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -118,10 +118,16 @@ type BlockReceivedNotifier interface {
ReceivedBlocks(peer.ID, []blocks.Block)
}

// ProviderFinder is a subset of
// https://pkg.go.dev/github.com/libp2p/[email protected]/core/routing#ContentRouting
type ProviderFinder interface {
FindProvidersAsync(context.Context, cid.Cid, int) <-chan peer.AddrInfo
}

// New initializes a Bitswap client that runs until client.Close is called.
// The Content router paramteter can be nil to disable content-routing
// The Content providerFinder paramteter can be nil to disable content-routing
// lookups for content (rely only on bitswap for discovery).
func New(parent context.Context, network bsnet.BitSwapNetwork, router routing.Routing, bstore blockstore.Blockstore, options ...Option) *Client {
func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder ProviderFinder, bstore blockstore.Blockstore, options ...Option) *Client {
// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
// coupled to the concerns of the ipfs daemon in this way.
Expand All @@ -133,7 +139,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, router routing.Ro

bs := &Client{
network: network,
router: router,
providerFinder: providerFinder,
blockstore: bstore,
cancel: cancelFunc,
closing: make(chan struct{}),
Expand Down Expand Up @@ -168,9 +174,10 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, router routing.Ro
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
if bs.router != nil && bs.defaultProviderQueryManager {

if bs.providerFinder != nil && bs.defaultProviderQueryManager {
// network can do dialing.
pqm, err := rpqm.New(ctx, network, bs.router, rpqm.WithMaxProviders(10))
pqm, err := rpqm.New(ctx, network, bs.providerFinder, rpqm.WithMaxProviders(10))
if err != nil {
// Should not be possible to hit this
panic(err)
Expand All @@ -192,7 +199,15 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, router routing.Ro
rebroadcastDelay delay.D,
self peer.ID,
) bssm.Session {
return bssession.New(sessctx, sessmgr, id, spm, bs.pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
// careful when bs.pqm is nil. Since we are type-casting it
// into session.ProviderFinder when passing it, it will become
// not nil. Related:
// https://groups.google.com/g/golang-nuts/c/wnH302gBa4I?pli=1
var pqm session.ProviderFinder
if bs.pqm != nil {
pqm = bs.pqm
}
return bssession.New(sessctx, sessmgr, id, spm, pqm, sim, pm, bpm, notif, provSearchDelay, rebroadcastDelay, self)
}
sessionPeerManagerFactory := func(ctx context.Context, id uint64) bssession.SessionPeerManager {
return bsspm.New(id, network.ConnectionManager())
Expand All @@ -212,8 +227,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, router routing.Ro
type Client struct {
pm *bspm.PeerManager

// content router
router routing.Routing
providerFinder ProviderFinder

// the provider query manager manages requests to find providers
pqm *rpqm.ProviderQueryManager
Expand Down
1 change: 0 additions & 1 deletion bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,6 @@ func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
if s.providerFinder == nil {
return
}

go func(k cid.Cid) {
ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers")
defer span.End()
Expand Down
12 changes: 9 additions & 3 deletions bitswap/testinstance/testinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ import (
bsnet "github.com/ipfs/boxo/bitswap/network"
tn "github.com/ipfs/boxo/bitswap/testnet"
blockstore "github.com/ipfs/boxo/blockstore"
mockrouting "github.com/ipfs/boxo/routing/mock"
ds "github.com/ipfs/go-datastore"
delayed "github.com/ipfs/go-datastore/delayed"
ds_sync "github.com/ipfs/go-datastore/sync"
delay "github.com/ipfs/go-ipfs-delay"
tnet "github.com/libp2p/go-libp2p-testing/net"
p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil"
peer "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
)

// NewTestInstanceGenerator generates a new InstanceGenerator for the given
Expand All @@ -28,6 +30,7 @@ func NewTestInstanceGenerator(net tn.Network, netOptions []bsnet.NetOpt, bsOptio
cancel: cancel,
bsOptions: bsOptions,
netOptions: netOptions,
routing: mockrouting.NewServer(),
}
}

Expand All @@ -39,6 +42,7 @@ type InstanceGenerator struct {
cancel context.CancelFunc
bsOptions []bitswap.Option
netOptions []bsnet.NetOpt
routing mockrouting.Server
}

// Close closes the clobal context, shutting down all test instances
Expand All @@ -54,7 +58,7 @@ func (g *InstanceGenerator) Next() Instance {
if err != nil {
panic("FIXME") // TODO change signature
}
return NewInstance(g.ctx, g.net, p, g.netOptions, g.bsOptions)
return NewInstance(g.ctx, g.net, g.routing.Client(p), p, g.netOptions, g.bsOptions)
}

// Instances creates N test instances of bitswap + dependencies and connects
Expand Down Expand Up @@ -88,6 +92,7 @@ type Instance struct {
Exchange *bitswap.Bitswap
blockstore blockstore.Blockstore
Adapter bsnet.BitSwapNetwork
Routing routing.Routing
blockstoreDelay delay.D
}

Expand All @@ -107,7 +112,7 @@ func (i *Instance) SetBlockstoreLatency(t time.Duration) time.Duration {
// NB: It's easy make mistakes by providing the same peer ID to two different
// instances. To safeguard, use the InstanceGenerator to generate instances. It's
// just a much better idea.
func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) Instance {
func NewInstance(ctx context.Context, net tn.Network, router routing.Routing, p tnet.Identity, netOptions []bsnet.NetOpt, bsOptions []bitswap.Option) Instance {
bsdelay := delay.Fixed(0)

adapter := net.Adapter(p, netOptions...)
Expand All @@ -120,12 +125,13 @@ func NewInstance(ctx context.Context, net tn.Network, p tnet.Identity, netOption
panic(err.Error()) // FIXME perhaps change signature and return error.
}

bs := bitswap.New(ctx, adapter, bstore, bsOptions...)
bs := bitswap.New(ctx, adapter, router, bstore, bsOptions...)

return Instance{
Adapter: adapter,
Peer: p.ID(),
Exchange: bs,
Routing: router,
blockstore: bstore,
blockstoreDelay: bsdelay,
}
Expand Down
1 change: 1 addition & 0 deletions routing/mock/centralized_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ func (c *client) FindPeer(ctx context.Context, pid peer.ID) (peer.AddrInfo, erro
}

func (c *client) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo {
log.Debugf("FindProvidersAsync: %s %d", k, max)
out := make(chan peer.AddrInfo)
go func() {
defer close(out)
Expand Down

0 comments on commit 78e7a7c

Please sign in to comment.