Skip to content

Commit

Permalink
feat: timeout when trying to enqueue incoming mempool messages
Browse files Browse the repository at this point in the history
  • Loading branch information
lklimek committed Feb 2, 2024
1 parent e0c447c commit cfa50c2
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 13 deletions.
4 changes: 3 additions & 1 deletion internal/p2p/channel_params.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package p2p

import (
"fmt"
"time"

"github.com/gogo/protobuf/proto"

Expand Down Expand Up @@ -77,10 +78,11 @@ func ChannelDescriptors(cfg *config.Config) map[ChannelID]*ChannelDescriptor {
ID: MempoolChannel,
Priority: 2, // 5
RecvMessageCapacity: mempoolBatchSize(cfg.Mempool.MaxTxBytes),
RecvBufferCapacity: cfg.Mempool.Size,
RecvBufferCapacity: 1000,
Name: "mempool",
SendRateLimit: 5,
SendRateBurst: 20,
EnqueueTimeout: 1 * time.Millisecond,
},
}

Expand Down
7 changes: 7 additions & 0 deletions internal/p2p/conn/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -620,6 +620,12 @@ type ChannelDescriptor struct {
// Human readable name of the channel, used in logging and
// diagnostics.
Name string

// Timeout for enqueue operations on the incoming queue.
// When timeout expires, messages will be silently dropped.
//
// If zero, enqueue operations will not time out.
EnqueueTimeout time.Duration
}

func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
Expand All @@ -632,6 +638,7 @@ func (chDesc ChannelDescriptor) FillDefaults() (filled ChannelDescriptor) {
if chDesc.RecvMessageCapacity == 0 {
chDesc.RecvMessageCapacity = defaultRecvMessageCapacity
}

filled = chDesc
return
}
Expand Down
7 changes: 5 additions & 2 deletions internal/p2p/pqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,16 @@ func newPQScheduler(
logger log.Logger,
m *Metrics,
lc *metricsLabelCache,
chDescs []*ChannelDescriptor,
chDescs map[ChannelID]*ChannelDescriptor,
enqueueBuf, dequeueBuf, capacity uint,
) *pqScheduler {

// copy each ChannelDescriptor and sort them by ascending channel priority
chDescsCopy := make([]*ChannelDescriptor, len(chDescs))
copy(chDescsCopy, chDescs)
for _, chDesc := range chDescs {
chDescsCopy = append(chDescsCopy, chDesc)
}

sort.Slice(chDescsCopy, func(i, j int) bool { return chDescsCopy[i].Priority < chDescsCopy[j].Priority })

var (
Expand Down
4 changes: 2 additions & 2 deletions internal/p2p/pqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ type testMessage = gogotypes.StringValue

func TestCloseWhileDequeueFull(t *testing.T) {
enqueueLength := 5
chDescs := []*ChannelDescriptor{
{ID: 0x01, Priority: 1},
chDescs := map[ChannelID]*ChannelDescriptor{
0x01: {ID: 0x01, Priority: 1},
}
pqueue := newPQScheduler(log.NewNopLogger(), NopMetrics(), newMetricsLabelCache(), chDescs, uint(enqueueLength), 1, 120)

Expand Down
35 changes: 29 additions & 6 deletions internal/p2p/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"io"
"net"
"reflect"
"runtime"
"time"

Expand Down Expand Up @@ -151,7 +152,7 @@ type Router struct {
options RouterOptions
privKey crypto.PrivKey
peerManager *PeerManager
chDescs []*ChannelDescriptor
chDescs map[ChannelID]*ChannelDescriptor
transport Transport
endpoint *Endpoint
connTracker connectionTracker
Expand Down Expand Up @@ -198,7 +199,7 @@ func NewRouter(
options.MaxIncomingConnectionAttempts,
options.IncomingConnectionWindow,
),
chDescs: make([]*ChannelDescriptor, 0),
chDescs: make(map[ChannelID]*ChannelDescriptor, 0),
transport: transport,
endpoint: endpoint,
peerManager: peerManager,
Expand Down Expand Up @@ -256,7 +257,7 @@ func (r *Router) OpenChannel(ctx context.Context, chDesc *ChannelDescriptor) (Ch
if _, ok := r.channelQueues[id]; ok {
return nil, fmt.Errorf("channel %v already exists", id)
}
r.chDescs = append(r.chDescs, chDesc)
r.chDescs[id] = chDesc

queue := r.queueFactory(chDesc.RecvBufferCapacity)
outCh := make(chan Envelope, chDesc.RecvBufferCapacity)
Expand Down Expand Up @@ -771,18 +772,25 @@ func (r *Router) routePeer(ctx context.Context, peerID types.NodeID, conn Connec
// receivePeer receives inbound messages from a peer, deserializes them and
// passes them on to the appropriate channel.
func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Connection) error {
// default timeout; by default, we set it to ~ 10 years so that it will practically never fire
timeout := time.NewTimer(24 * 30 * 12 * 10 * time.Hour)
defer timeout.Stop()

for {
chID, bz, err := conn.ReceiveMessage(ctx)
if err != nil {
return err
}

r.channelMtx.RLock()
queue, ok := r.channelQueues[chID]
queue, queueOk := r.channelQueues[chID]
chDesc, chDescOk := r.chDescs[chID]
r.channelMtx.RUnlock()

if !ok {
r.logger.Debug("dropping message for unknown channel", "peer", peerID, "channel", chID)
if !queueOk || !chDescOk {
r.logger.Debug("dropping message for unknown channel",
"peer", peerID, "channel", chID,
"queue", queueOk, "chDesc", chDescOk)
continue
}

Expand All @@ -799,6 +807,11 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
}
envelope.From = peerID
envelope.ChannelID = chID

if chDesc.EnqueueTimeout > 0 {
timeout.Reset(chDesc.EnqueueTimeout)
}

select {
case queue.enqueue() <- envelope:
r.metrics.PeerReceiveBytesTotal.With(
Expand All @@ -811,6 +824,16 @@ func (r *Router) receivePeer(ctx context.Context, peerID types.NodeID, conn Conn
case <-queue.closed():
r.logger.Debug("channel closed, dropping message", "peer", peerID, "channel", chID)

case <-timeout.C:
// TODO change to Trace
r.logger.Debug("dropping message from peer due to enqueue timeout",
"peer", peerID,
"channel", chID,
"channel_name", chDesc.Name,
"timeout", chDesc.EnqueueTimeout,
"type", reflect.TypeOf((envelope.Message)).Name(),
)

case <-ctx.Done():
return nil
}
Expand Down
5 changes: 3 additions & 2 deletions internal/p2p/throttled_channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,11 @@ import (
"testing"
"time"

"github.com/dashpay/tenderdash/internal/p2p"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/time/rate"

"github.com/dashpay/tenderdash/internal/p2p"
)

type mockChannel struct {
Expand All @@ -21,7 +22,7 @@ func (c *mockChannel) Len() int {
return int(c.counter.Load())
}

func (c *mockChannel) Send(_ context.Context, e p2p.Envelope) error {
func (c *mockChannel) Send(_ context.Context, _e p2p.Envelope) error {
c.counter.Add(1)
return nil
}
Expand Down

0 comments on commit cfa50c2

Please sign in to comment.