Skip to content

Commit

Permalink
Configure message parameters (#247)
Browse files Browse the repository at this point in the history
* feat(graphsync): configure message parameters

allow setting custom values for timing out messages and number of retries

* Update impl/graphsync.go

Co-authored-by: Rod Vagg <[email protected]>

* Update impl/graphsync.go

Co-authored-by: Rod Vagg <[email protected]>

Co-authored-by: Rod Vagg <[email protected]>
  • Loading branch information
hannahhoward and rvagg authored Oct 13, 2021
1 parent 138183a commit 174c575
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 28 deletions.
44 changes: 23 additions & 21 deletions messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ import (

var log = logging.Logger("graphsync")

const maxRetries = 10

// max block size is the maximum size for batching blocks in a single payload
const maxBlockSize uint64 = 512 * 1024

Expand All @@ -38,7 +36,7 @@ type Event struct {
// MessageNetwork is any network that can connect peers and generate a message
// sender.
type MessageNetwork interface {
NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error)
NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error)
ConnectTo(context.Context, peer.ID) error
}

Expand All @@ -58,24 +56,28 @@ type MessageQueue struct {
done chan struct{}

// internal do not touch outside go routines
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
sender gsnet.MessageSender
eventPublisher notifications.Publisher
buildersLk sync.RWMutex
builders []*gsmsg.Builder
nextBuilderTopic gsmsg.Topic
allocator Allocator
maxRetries int
sendMessageTimeout time.Duration
}

// New creats a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator) *MessageQueue {
func New(ctx context.Context, p peer.ID, network MessageNetwork, allocator Allocator, maxRetries int, sendMessageTimeout time.Duration) *MessageQueue {
return &MessageQueue{
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
ctx: ctx,
network: network,
p: p,
outgoingWork: make(chan struct{}, 1),
done: make(chan struct{}),
eventPublisher: notifications.NewPublisher(),
allocator: allocator,
maxRetries: maxRetries,
sendMessageTimeout: sendMessageTimeout,
}
}

Expand Down Expand Up @@ -220,7 +222,7 @@ func (mq *MessageQueue) sendMessage() {
return
}

for i := 0; i < maxRetries; i++ { // try to send this message until we fail.
for i := 0; i < mq.maxRetries; i++ { // try to send this message until we fail.
if mq.attemptSendAndRecovery(message, publisher) {
return
}
Expand All @@ -232,7 +234,7 @@ func (mq *MessageQueue) initializeSender() error {
if mq.sender != nil {
return nil
}
nsender, err := openSender(mq.ctx, mq.network, mq.p)
nsender, err := openSender(mq.ctx, mq.network, mq.p, mq.sendMessageTimeout)
if err != nil {
return err
}
Expand Down Expand Up @@ -277,7 +279,7 @@ func (mq *MessageQueue) attemptSendAndRecovery(message gsmsg.GraphSyncMessage, p
return false
}

func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.MessageSender, error) {
func openSender(ctx context.Context, network MessageNetwork, p peer.ID, sendTimeout time.Duration) (gsnet.MessageSender, error) {
// allow ten minutes for connections this includes looking them up in the
// dht dialing them, and handshaking
conctx, cancel := context.WithTimeout(ctx, time.Minute*10)
Expand All @@ -288,7 +290,7 @@ func openSender(ctx context.Context, network MessageNetwork, p peer.ID) (gsnet.M
return nil, err
}

nsender, err := network.NewMessageSender(ctx, p)
nsender, err := network.NewMessageSender(ctx, p, gsnet.MessageSenderOpts{SendTimeout: sendTimeout})
if err != nil {
return nil, err
}
Expand Down
17 changes: 10 additions & 7 deletions messagequeue/messagequeue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ import (
"github.com/ipfs/go-graphsync/testutil"
)

const sendMessageTimeout = 10 * time.Minute
const messageSendRetries = 10

type fakeMessageNetwork struct {
connectError error
messageSenderError error
Expand All @@ -32,7 +35,7 @@ func (fmn *fakeMessageNetwork) ConnectTo(context.Context, peer.ID) error {
return fmn.connectError
}

func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID) (gsnet.MessageSender, error) {
func (fmn *fakeMessageNetwork) NewMessageSender(context.Context, peer.ID, gsnet.MessageSenderOpts) (gsnet.MessageSender, error) {
fmn.wait.Done()
if fmn.messageSenderError == nil {
return fmn.messageSender, nil
Expand Down Expand Up @@ -68,7 +71,7 @@ func TestStartupAndShutdown(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -106,7 +109,7 @@ func TestShutdownDuringMessageSend(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
id := graphsync.RequestID(rand.Int31())
priority := graphsync.Priority(rand.Int31())
Expand Down Expand Up @@ -154,7 +157,7 @@ func TestProcessingNotification(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
blks := testutil.GenerateBlocksOfSize(3, 128)
Expand Down Expand Up @@ -210,7 +213,7 @@ func TestDedupingMessages(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)
id := graphsync.RequestID(rand.Int31())
Expand Down Expand Up @@ -282,7 +285,7 @@ func TestSendsVeryLargeBlocksResponses(t *testing.T) {
messageNetwork := &fakeMessageNetwork{nil, nil, messageSender, &waitGroup}
allocator := allocator2.NewAllocator(1<<30, 1<<30)

messageQueue := New(ctx, peer, messageNetwork, allocator)
messageQueue := New(ctx, peer, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down Expand Up @@ -342,7 +345,7 @@ func TestSendsResponsesMemoryPressure(t *testing.T) {
// use allocator with very small limit
allocator := allocator2.NewAllocator(1000, 1000)

messageQueue := New(ctx, p, messageNetwork, allocator)
messageQueue := New(ctx, p, messageNetwork, allocator, messageSendRetries, sendMessageTimeout)
messageQueue.Startup()
waitGroup.Add(1)

Expand Down

0 comments on commit 174c575

Please sign in to comment.