Skip to content
This repository has been archived by the owner on Feb 1, 2023. It is now read-only.

network: Allow specifying protocol prefix #171

Merged
merged 1 commit into from
Aug 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 28 additions & 13 deletions network/ipfs_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,16 @@ import (
"time"

bsmsg "github.com/ipfs/go-bitswap/message"
"github.com/libp2p/go-libp2p-core/helpers"

cid "github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log"
"github.com/libp2p/go-libp2p-core/connmgr"
"github.com/libp2p/go-libp2p-core/helpers"
"github.com/libp2p/go-libp2p-core/host"
"github.com/libp2p/go-libp2p-core/network"
"github.com/libp2p/go-libp2p-core/peer"
peerstore "github.com/libp2p/go-libp2p-core/peerstore"
"github.com/libp2p/go-libp2p-core/protocol"
"github.com/libp2p/go-libp2p-core/routing"
msgio "github.com/libp2p/go-msgio"
ma "github.com/multiformats/go-multiaddr"
Expand All @@ -27,10 +28,19 @@ var log = logging.Logger("bitswap_network")
var sendMessageTimeout = time.Minute * 10

// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host.
func NewFromIpfsHost(host host.Host, r routing.ContentRouting) BitSwapNetwork {
func NewFromIpfsHost(host host.Host, r routing.ContentRouting, opts ...NetOpt) BitSwapNetwork {
s := Settings{}
for _, opt := range opts {
opt(&s)
}

bitswapNetwork := impl{
host: host,
routing: r,

protocolBitswap: s.ProtocolPrefix + ProtocolBitswap,
protocolBitswapOne: s.ProtocolPrefix + ProtocolBitswapOne,
protocolBitswapNoVers: s.ProtocolPrefix + ProtocolBitswapNoVers,
}
return &bitswapNetwork
}
Expand All @@ -41,14 +51,19 @@ type impl struct {
host host.Host
routing routing.ContentRouting

protocolBitswap protocol.ID
protocolBitswapOne protocol.ID
protocolBitswapNoVers protocol.ID

// inbound messages from the network are forwarded to the receiver
receiver Receiver

stats Stats
}

type streamMessageSender struct {
s network.Stream
s network.Stream
bsnet *impl
}

func (s *streamMessageSender) Close() error {
Expand All @@ -60,10 +75,10 @@ func (s *streamMessageSender) Reset() error {
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg bsmsg.BitSwapMessage) error {
return msgToStream(ctx, s.s, msg)
return s.bsnet.msgToStream(ctx, s.s, msg)
}

func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
func (bsnet *impl) msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage) error {
deadline := time.Now().Add(sendMessageTimeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
Expand All @@ -74,12 +89,12 @@ func msgToStream(ctx context.Context, s network.Stream, msg bsmsg.BitSwapMessage
}

switch s.Protocol() {
case ProtocolBitswap:
case bsnet.protocolBitswap:
if err := msg.ToNetV1(s); err != nil {
log.Debugf("error: %s", err)
return err
}
case ProtocolBitswapOne, ProtocolBitswapNoVers:
case bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers:
if err := msg.ToNetV0(s); err != nil {
log.Debugf("error: %s", err)
return err
Expand All @@ -100,11 +115,11 @@ func (bsnet *impl) NewMessageSender(ctx context.Context, p peer.ID) (MessageSend
return nil, err
}

return &streamMessageSender{s: s}, nil
return &streamMessageSender{s: s, bsnet: bsnet}, nil
}

func (bsnet *impl) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
return bsnet.host.NewStream(ctx, p, ProtocolBitswap, ProtocolBitswapOne, ProtocolBitswapNoVers)
return bsnet.host.NewStream(ctx, p, bsnet.protocolBitswap, bsnet.protocolBitswapOne, bsnet.protocolBitswapNoVers)
}

func (bsnet *impl) SendMessage(
Expand All @@ -117,7 +132,7 @@ func (bsnet *impl) SendMessage(
return err
}

if err = msgToStream(ctx, s, outgoing); err != nil {
if err = bsnet.msgToStream(ctx, s, outgoing); err != nil {
s.Reset()
return err
}
Expand All @@ -131,9 +146,9 @@ func (bsnet *impl) SendMessage(

func (bsnet *impl) SetDelegate(r Receiver) {
bsnet.receiver = r
bsnet.host.SetStreamHandler(ProtocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(ProtocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(ProtocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswap, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapOne, bsnet.handleNewStream)
bsnet.host.SetStreamHandler(bsnet.protocolBitswapNoVers, bsnet.handleNewStream)
bsnet.host.Network().Notify((*netNotifiee)(bsnet))
// TODO: StopNotify.

Expand Down
15 changes: 15 additions & 0 deletions network/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package network

import "github.com/libp2p/go-libp2p-core/protocol"

type NetOpt func(*Settings)

type Settings struct {
ProtocolPrefix protocol.ID
}

func Prefix(prefix protocol.ID) NetOpt {
return func(settings *Settings) {
settings.ProtocolPrefix = prefix
}
}