Skip to content

Commit

Permalink
feat(graphsync): configure message parameters
Browse files Browse the repository at this point in the history
allow setting custom values for timing out messages and number of retries
  • Loading branch information
hannahhoward committed Oct 13, 2021
1 parent 749e517 commit c684447
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 38 deletions.
2 changes: 1 addition & 1 deletion benchmarks/testnet/virtual.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ func (mp *messagePasser) Reset() error {
return nil
}

func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID) (gsnet.MessageSender, error) {
func (nc *networkClient) NewMessageSender(ctx context.Context, p peer.ID, _ gsnet.MessageSenderOpts) (gsnet.MessageSender, error) {
return &messagePasser{
net: nc,
target: p,
Expand Down
30 changes: 29 additions & 1 deletion impl/graphsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package graphsync

import (
"context"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/ipfs/go-peertaskqueue"
Expand Down Expand Up @@ -33,6 +34,8 @@ const maxRecursionDepth = 100
const defaultTotalMaxMemory = uint64(256 << 20)
const defaultMaxMemoryPerPeer = uint64(16 << 20)
const defaultMaxInProgressRequests = uint64(6)
const defaultMessageSendRetries = 10
const defaultSendMessageTimeout = 10 * time.Minute

// GraphSync is an instance of a GraphSync exchange that implements
// the graphsync protocol.
Expand Down Expand Up @@ -77,6 +80,8 @@ type graphsyncConfigOptions struct {
registerDefaultValidator bool
maxLinksPerOutgoingRequest uint64
maxLinksPerIncomingRequest uint64
messageSendRetries int
sendMessageTimeout time.Duration
}

// Option defines the functional option type that can be used to configure
Expand Down Expand Up @@ -171,6 +176,27 @@ func MaxLinksPerIncomingRequests(maxLinksPerIncomingRequest uint64) Option {
}
}

// MessageSendRetries sets the number of times graphsync will send
// attempt to send a message before giving up.
// Lower to increase the speed at which an unresponsive peer is
// detected.
func MessageSendRetries(messageSendRetries int) Option {
return func(gs *graphsyncConfigOptions) {
gs.messageSendRetries = messageSendRetries
}
}

// SendMessageTimeout sets the amount of time graphsync will wait
// for a message to go across the wire before giving up and
// trying again (up to max retries).
// Lower to increase the speed at which an unresponsive peer is
// detected.
func SendMessageTimeout(sendMessageTimeout time.Duration) Option {
return func(gs *graphsyncConfigOptions) {
gs.sendMessageTimeout = sendMessageTimeout
}
}

// New creates a new GraphSync Exchange on the given network,
// and the given link loader+storer.
func New(parent context.Context, network gsnet.GraphSyncNetwork,
Expand All @@ -185,6 +211,8 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
maxInProgressIncomingRequests: defaultMaxInProgressRequests,
maxInProgressOutgoingRequests: defaultMaxInProgressRequests,
registerDefaultValidator: true,
messageSendRetries: defaultMessageSendRetries,
sendMessageTimeout: defaultSendMessageTimeout,
}
for _, option := range options {
option(gsConfig)
Expand All @@ -207,7 +235,7 @@ func New(parent context.Context, network gsnet.GraphSyncNetwork,
}
responseAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryResponder, gsConfig.maxMemoryPerPeerResponder)
createMessageQueue := func(ctx context.Context, p peer.ID) peermanager.PeerQueue {
return messagequeue.New(ctx, p, network, responseAllocator)
return messagequeue.New(ctx, p, network, responseAllocator, gsConfig.messageSendRetries, gsConfig.sendMessageTimeout)
}
peerManager := peermanager.NewMessageManager(ctx, createMessageQueue)
requestAllocator := allocator.NewAllocator(gsConfig.totalMaxMemoryRequestor, gsConfig.maxMemoryPerPeerRequestor)
Expand Down
44 changes: 23 additions & 21 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

var log = logging.Logger("graphsync")

const maxRetries = 10

// max block size is the maximum size for batching blocks in a single payload
const maxBlockSize uint64 = 512 * 1024

Expand All @@ -38,7 +36,7 @@ type Event struct {
// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error)
NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error)
ConnectTo(context.Context, peer.ID) error
}

Expand All @@ -58,24 +56,28 @@ type MessageQueue struct {
done chan struct{}

// internal do not touch outside go routines
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
maxRetries int
sendMessageTimeout time.Duration
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator, maxRetries int, sendMessageTimeout time.Duration) *MessageQueue {
return &MessageQueue{
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
maxRetries: maxRetries,
sendMessageTimeout: sendMessageTimeout,
}
}

Expand Down Expand Up @@ -220,7 +222,7 @@ func (mq *MessageQueue) sendMessage() {
return
}

for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
for i := 0; i < mq.maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(message, publisher) {
return
}
Expand All @@ -232,7 +234,7 @@ func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p, mq.sendMessageTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +279,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, p
return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.MessageSender, error) {
func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTimeout time.Duration) (gsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
Expand All @@ -288,7 +290,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.M
return nil, err
}

nsender, err := network.NewMessageSender(ctx, p)
nsender, err := network.NewMessageSender(ctx, p, gsnet.MessageSenderOpts{SendTimeout: sendTimeout})
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

const sendMessageTimeout = 10 * time.Minute
const messageSendRetries = 10

type fakeMessageNetwork struct {
connectError error
messageSenderError error
Expand All @@ -32,7 +35,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
return fmn.connectError
}

func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error) {
fmn.wait.Done()
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
Expand Down Expand Up @@ -68,7 +71,7 @@ func TestStartupAndShutdown(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -154,7 +157,7 @@ func TestProcessingNotification(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)
Expand Down Expand Up @@ -210,7 +213,7 @@ func TestDedupingMessages(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
id := graphsync.RequestID(rand.Int31())
Expand Down Expand Up @@ -282,7 +285,7 @@ func TestSendsVeryLargeBlocksResponses(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down Expand Up @@ -342,7 +345,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) {
// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)

messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue := New(ctx, p, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down
8 changes: 7 additions & 1 deletion network/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package network

import (
"context"
"time"

"github.com/libp2p/go-libp2p-core/peer"
"github.com/libp2p/go-libp2p-core/protocol"
Expand Down Expand Up @@ -30,11 +31,16 @@ type GraphSyncNetwork interface {
// ConnectTo establishes a connection to the given peer
ConnectTo(context.Context, peer.ID) error

NewMessageSender(context.Context, peer.ID) (MessageSender, error)
NewMessageSender(context.Context, peer.ID, MessageSenderOpts) (MessageSender, error)

ConnectionManager() ConnManager
}

// MessageSenderOpts sets parameters for a message sender
type MessageSenderOpts struct {
SendTimeout time.Duration
}

// ConnManager provides the methods needed to protect and unprotect connections
type ConnManager interface {
Protect(peer.ID, string)
Expand Down
23 changes: 16 additions & 7 deletions network/libp2p_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ type libp2pGraphSyncNetwork struct {
}

type streamMessageSender struct {
s network.Stream
s network.Stream
opts MessageSenderOpts
}

func (s *streamMessageSender) Close() error {
Expand All @@ -50,14 +51,14 @@ func (s *streamMessageSender) Reset() error {
}

func (s *streamMessageSender) SendMsg(ctx context.Context, msg gsmsg.GraphSyncMessage) error {
return msgToStream(ctx, s.s, msg)
return msgToStream(ctx, s.s, msg, s.opts.SendTimeout)
}

func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage) error {
func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessage, timeout time.Duration) error {
log.Debugf("Outgoing message with %d requests, %d responses, and %d blocks",
len(msg.Requests()), len(msg.Responses()), len(msg.Blocks()))

deadline := time.Now().Add(sendMessageTimeout)
deadline := time.Now().Add(timeout)
if dl, ok := ctx.Deadline(); ok {
deadline = dl
}
Expand All @@ -81,13 +82,13 @@ func msgToStream(ctx context.Context, s network.Stream, msg gsmsg.GraphSyncMessa
return nil
}

func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID) (MessageSender, error) {
func (gsnet *libp2pGraphSyncNetwork) NewMessageSender(ctx context.Context, p peer.ID, opts MessageSenderOpts) (MessageSender, error) {
s, err := gsnet.newStreamToPeer(ctx, p)
if err != nil {
return nil, err
}

return &streamMessageSender{s: s}, nil
return &streamMessageSender{s: s, opts: setDefaults(opts)}, nil
}

func (gsnet *libp2pGraphSyncNetwork) newStreamToPeer(ctx context.Context, p peer.ID) (network.Stream, error) {
Expand All @@ -104,7 +105,7 @@ func (gsnet *libp2pGraphSyncNetwork) SendMessage(
return err
}

if err = msgToStream(ctx, s, outgoing); err != nil {
if err = msgToStream(ctx, s, outgoing, sendMessageTimeout); err != nil {
_ = s.Reset()
return err
}
Expand Down Expand Up @@ -173,3 +174,11 @@ func (nn *libp2pGraphSyncNotifee) OpenedStream(n network.Network, v network.Stre
func (nn *libp2pGraphSyncNotifee) ClosedStream(n network.Network, v network.Stream) {}
func (nn *libp2pGraphSyncNotifee) Listen(n network.Network, a ma.Multiaddr) {}
func (nn *libp2pGraphSyncNotifee) ListenClose(n network.Network, a ma.Multiaddr) {}

func setDefaults(opts MessageSenderOpts) MessageSenderOpts {
copy := opts
if opts.SendTimeout == 0 {
copy.SendTimeout = sendMessageTimeout
}
return copy
}

0 comments on commit c684447

Please sign in to comment.