Skip to content

Commit

Permalink
bitswap: remove content routing from network
Browse files Browse the repository at this point in the history
* Remove ContentRouter for network, as no longer used.
* Add options to bitswap-client for content-router and defaultProviderQueryManager.
* Initialize providerQueryManager with the given content router.
* Allow providerQueries to noop when content-routing in bitswap is disabled.
  • Loading branch information
hsanjuan committed Nov 18, 2024
1 parent 43913da commit 870ab41
Show file tree
Hide file tree
Showing 7 changed files with 52 additions and 62 deletions.
4 changes: 0 additions & 4 deletions bitswap/client/bitswap_with_sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@ func addBlock(t *testing.T, ctx context.Context, inst testinstance.Instance, blk
if err != nil {
t.Fatal(err)
}
err = inst.Adapter.Provide(ctx, blk.Cid())
if err != nil {
t.Fatal(err)
}
}

func TestBasicSessions(t *testing.T) {
Expand Down
59 changes: 37 additions & 22 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-metrics-interface"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/routing"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)
Expand Down Expand Up @@ -97,15 +98,24 @@ func WithoutDuplicatedBlockStats() Option {
}
}

// WithCustomLookupManagement indicates we should use a custom
// ProviderQueryManager, which provides a handling layer on top of the content
// routing performed by the bitswap Network for finding providers for data.
// The ProviderQueryManager provides bounded paralelism and limits for these
// lookups. The ProviderQueryManager setup by default uses
// maxInProcessRequests = 6 and maxProviders = 10.
func WithCustomLookupManagement(pqm *rpqm.ProviderQueryManager) Option {
// WithContentRouting enables content.Routing lookups to find providers for
// the content to be retrieved using Bitswap.
func WithContentRouting(router routing.Routing) Option {
return func(bs *Client) {
bs.pqm = pqm
bs.router = router
}
}

// WithDefaultProviderQueryManager indicates wether we should use a the
// default ProviderQueryManager, a wrapper of the content Router which
// provides bounded paralelism and limits for these lookups. The
// ProviderQueryManager setup by default uses maxInProcessRequests = 6 and
// maxProviders = 10. To use a custom ProviderQueryManager, set to false and
// wrap directly the content router provided with the WithContentRouting()
// option. Only takes effect if WithContentRouting is set.
func WithDefaultProviderQueryManager(defaultProviderQueryManager bool) Option {
return func(bs *Client) {
bs.defaultProviderQueryManager = defaultProviderQueryManager
}
}

Expand All @@ -128,16 +138,17 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
ctx, cancelFunc := context.WithCancel(parent)

bs := &Client{
blockstore: bstore,
network: network,
cancel: cancelFunc,
closing: make(chan struct{}),
counters: new(counters),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
simulateDontHavesOnTimeout: true,
blockstore: bstore,
network: network,
cancel: cancelFunc,
closing: make(chan struct{}),
counters: new(counters),
dupMetric: bmetrics.DupHist(ctx),
allMetric: bmetrics.AllHist(ctx),
provSearchDelay: defaults.ProvSearchDelay,
rebroadcastDelay: delay.Fixed(defaults.RebroadcastDelay),
simulateDontHavesOnTimeout: true,
defaultProviderQueryManager: true,
}

// apply functional options before starting and running bitswap
Expand All @@ -162,9 +173,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
sim := bssim.New()
bpm := bsbpm.New()
pm := bspm.New(ctx, peerQueueFactory, network.Self())
if bs.pqm == nil { // not set with the options
// network can do dialing and also content routing.
pqm, err := rpqm.New(ctx, network, network, rpqm.WithMaxProviders(10))
if bs.router != nil && bs.defaultProviderQueryManager {
// network can do dialing.
pqm, err := rpqm.New(ctx, network, bs.router, rpqm.WithMaxProviders(10))
if err != nil {
// Should not be possible to hit this
panic(err)
Expand Down Expand Up @@ -206,8 +217,12 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore
type Client struct {
pm *bspm.PeerManager

// content router
router routing.Routing

// the provider query manager manages requests to find providers
pqm *rpqm.ProviderQueryManager
pqm *rpqm.ProviderQueryManager
defaultProviderQueryManager bool

// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
Expand Down
5 changes: 5 additions & 0 deletions bitswap/client/internal/session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,11 @@ func (s *Session) handlePeriodicSearch(ctx context.Context) {
// findMorePeers attempts to find more peers for a session by searching for
// providers for the given Cid
func (s *Session) findMorePeers(ctx context.Context, c cid.Cid) {
// noop when provider finder is disabled
if s.providerFinder == nil {
return
}

go func(k cid.Cid) {
ctx, span := internal.StartSpan(ctx, "Session.FindMorePeers")
defer span.End()
Expand Down
2 changes: 0 additions & 2 deletions bitswap/network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,6 @@ type BitSwapNetwork interface {

Stats() Stats

Routing

Pinger
}

Expand Down
17 changes: 2 additions & 15 deletions bitswap/network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,12 @@ import (
bsmsg "github.com/ipfs/boxo/bitswap/message"
"github.com/ipfs/boxo/bitswap/network/internal"

"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/core/routing"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
"github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -35,12 +33,11 @@ var (
)

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, opts ...NetOpt) BitSwapNetwork {
s := processSettings(opts...)

bitswapNetwork := impl{
host: host,
routing: r,
host: host,

protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers,
protocolBitswapOneZero: s.ProtocolPrefix + ProtocolBitswapOneZero,
Expand Down Expand Up @@ -72,7 +69,6 @@ type impl struct {
stats Stats

host host.Host
routing routing.ContentRouting
connectEvtMgr *connectEventManager

protocolBitswapNoVers protocol.ID
Expand Down Expand Up @@ -370,15 +366,6 @@ func (bsnet *impl) DisconnectFrom(ctx context.Context, p peer.ID) error {
return bsnet.host.Network().ClosePeer(p)
}

func (bsnet *impl) FindProvidersAsync(ctx context.Context, k cid.Cid, max int) <-chan peer.AddrInfo {
return bsnet.routing.FindProvidersAsync(ctx, k, max)
}

// Provide provides the key to the network
func (bsnet *impl) Provide(ctx context.Context, k cid.Cid) error {
return bsnet.routing.Provide(ctx, k, true)
}

// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s network.Stream) {
defer s.Close()
Expand Down
15 changes: 4 additions & 11 deletions bitswap/network/ipfs_impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ import (
bsnet "github.com/ipfs/boxo/bitswap/network"
"github.com/ipfs/boxo/bitswap/network/internal"
tn "github.com/ipfs/boxo/bitswap/testnet"
mockrouting "github.com/ipfs/boxo/routing/mock"
ds "github.com/ipfs/go-datastore"
"github.com/ipfs/go-test/random"
tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/host"
Expand Down Expand Up @@ -170,8 +168,7 @@ func TestMessageSendAndReceive(t *testing.T) {
defer cancel()
mn := mocknet.New()
defer mn.Close()
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
streamNet, err := tn.StreamNet(ctx, mn)
if err != nil {
t.Fatal("Unable to setup network")
}
Expand Down Expand Up @@ -275,16 +272,14 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec
// create network
mn := mocknet.New()
defer mn.Close()
mr := mockrouting.NewServer()

// Host 1
h1, err := mn.AddPeer(p1.PrivateKey(), p1.Address())
if err != nil {
t.Fatal(err)
}
eh1 := &ErrHost{Host: h1}
routing1 := mr.ClientWithDatastore(context.TODO(), p1, ds.NewMapDatastore())
bsnet1 := bsnet.NewFromIpfsHost(eh1, routing1)
bsnet1 := bsnet.NewFromIpfsHost(eh1)
bsnet1.Start(r1)
t.Cleanup(bsnet1.Stop)
if r1.listener != nil {
Expand All @@ -297,8 +292,7 @@ func prepareNetwork(t *testing.T, ctx context.Context, p1 tnet.Identity, r1 *rec
t.Fatal(err)
}
eh2 := &ErrHost{Host: h2}
routing2 := mr.ClientWithDatastore(context.TODO(), p2, ds.NewMapDatastore())
bsnet2 := bsnet.NewFromIpfsHost(eh2, routing2)
bsnet2 := bsnet.NewFromIpfsHost(eh2)
bsnet2.Start(r2)
t.Cleanup(bsnet2.Stop)
if r2.listener != nil {
Expand Down Expand Up @@ -454,8 +448,7 @@ func TestSupportsHave(t *testing.T) {
ctx := context.Background()
mn := mocknet.New()
defer mn.Close()
mr := mockrouting.NewServer()
streamNet, err := tn.StreamNet(ctx, mn, mr)
streamNet, err := tn.StreamNet(ctx, mn)
if err != nil {
t.Fatalf("Unable to setup network: %s", err)
}
Expand Down
12 changes: 4 additions & 8 deletions bitswap/testnet/peernet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,27 @@ import (

bsnet "github.com/ipfs/boxo/bitswap/network"

mockrouting "github.com/ipfs/boxo/routing/mock"
ds "github.com/ipfs/go-datastore"

tnet "github.com/libp2p/go-libp2p-testing/net"
"github.com/libp2p/go-libp2p/core/peer"
mockpeernet "github.com/libp2p/go-libp2p/p2p/net/mock"
)

type peernet struct {
mockpeernet.Mocknet
routingserver mockrouting.Server
}

// StreamNet is a testnet that uses libp2p's MockNet
func StreamNet(ctx context.Context, net mockpeernet.Mocknet, rs mockrouting.Server) (Network, error) {
return &peernet{net, rs}, nil
func StreamNet(ctx context.Context, net mockpeernet.Mocknet) (Network, error) {
return &peernet{net}, nil
}

func (pn *peernet) Adapter(p tnet.Identity, opts ...bsnet.NetOpt) bsnet.BitSwapNetwork {
client, err := pn.Mocknet.AddPeer(p.PrivateKey(), p.Address())
if err != nil {
panic(err.Error())
}
routing := pn.routingserver.ClientWithDatastore(context.TODO(), p, ds.NewMapDatastore())
return bsnet.NewFromIpfsHost(client, routing, opts...)

return bsnet.NewFromIpfsHost(client, opts...)
}

func (pn *peernet) HasPeer(p peer.ID) bool {
Expand Down

0 comments on commit 870ab41

Please sign in to comment.