Skip to content

Commit

Permalink
kvserver/rangefeed: add rangefeed.buffered_sender.queue_size
Browse files Browse the repository at this point in the history
This patch adds a new metric, rangefeed.buffered_sender.queue_size, to track the
number of entries pending in the buffered sender queue waiting to be sent to rpc
stream.

Part of: #129816
Release note: none
  • Loading branch information
wenyihu6 committed Feb 28, 2025
1 parent c7d4c23 commit 9d91bf6
Show file tree
Hide file tree
Showing 7 changed files with 48 additions and 8 deletions.
1 change: 1 addition & 0 deletions docs/generated/metrics/metrics.html
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,7 @@
<tr><td>STORAGE</td><td>kv.rangefeed.budget_allocation_blocked</td><td>Number of times RangeFeed waited for budget availability</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.budget_allocation_failed</td><td>Number of times RangeFeed failed because memory budget was exceeded</td><td>Events</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.buffered_registrations</td><td>Number of active RangeFeed buffered registrations</td><td>Registrations</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.buffered_sender.queue_size</td><td>Number of entries in the buffered sender queue</td><td>Pending Events</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.catchup_scan_nanos</td><td>Time spent in RangeFeed catchup scan</td><td>Nanoseconds</td><td>COUNTER</td><td>NANOSECONDS</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp.slow_ranges</td><td>Number of ranges that have a closed timestamp lagging by more than 5x target lag. Periodically re-calculated</td><td>Ranges</td><td>GAUGE</td><td>COUNT</td><td>AVG</td><td>NONE</td></tr>
<tr><td>STORAGE</td><td>kv.rangefeed.closed_timestamp.slow_ranges.cancelled</td><td>Number of rangefeeds that were cancelled due to a chronically lagging closed timestamp</td><td>Cancellation Count</td><td>COUNTER</td><td>COUNT</td><td>AVG</td><td>NON_NEGATIVE_DERIVATIVE</td></tr>
Expand Down
16 changes: 14 additions & 2 deletions pkg/kv/kvserver/rangefeed/buffered_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,19 @@ type BufferedSender struct {
// are events to send. Channel is initialised with a buffer of 1 and all writes to it
// are non-blocking.
notifyDataC chan struct{}

// metrics is used to track the set of BufferedSender related metrics for a
// given node. Note that there could be multiple buffered senders in a node,
// sharing the metrics.
metrics *BufferedSenderMetrics
}

func NewBufferedSender(sender ServerStreamSender) *BufferedSender {
func NewBufferedSender(
sender ServerStreamSender, bsMetrics *BufferedSenderMetrics,
) *BufferedSender {
bs := &BufferedSender{
sender: sender,
sender: sender,
metrics: bsMetrics,
}
bs.queueMu.buffer = newEventQueue()
bs.notifyDataC = make(chan struct{}, 1)
Expand All @@ -90,6 +98,7 @@ func (bs *BufferedSender) sendBuffered(
// TODO(wenyihu6): pass an actual context here
alloc.Use(context.Background())
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
bs.metrics.BufferedSenderQueueSize.Inc(1)
select {
case bs.notifyDataC <- struct{}{}:
default:
Expand Down Expand Up @@ -122,6 +131,7 @@ func (bs *BufferedSender) run(
case <-bs.notifyDataC:
for {
e, success := bs.popFront()
bs.metrics.BufferedSenderQueueSize.Dec(1)
if !success {
break
}
Expand Down Expand Up @@ -153,7 +163,9 @@ func (bs *BufferedSender) cleanup(ctx context.Context) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
bs.queueMu.stopped = true
remaining := bs.queueMu.buffer.len()
bs.queueMu.buffer.drain(ctx)
bs.metrics.BufferedSenderQueueSize.Dec(remaining)
}

// Used for testing only.
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)
Expand Down Expand Up @@ -89,7 +89,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
testServerStream := newTestServerStream()

smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))

Expand Down
23 changes: 23 additions & 0 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,3 +266,26 @@ func NewStreamManagerMetrics() *StreamManagerMetrics {
NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed),
}
}

// BufferedSenderMetrics are for monitoring of BufferedSender.
type BufferedSenderMetrics struct {
BufferedSenderQueueSize *metric.Gauge
}

var (
metaBufferedSenderQueueSize = metric.Metadata{
Name: "kv.rangefeed.buffered_sender.queue_size",
Help: `Number of entries in the buffered sender queue`,
Measurement: "Pending Events",
Unit: metric.Unit_COUNT,
}
)

func (*BufferedSenderMetrics) MetricStruct() {}

// NewBufferedSenderMetrics makes the metrics for BufferedSender.
func NewBufferedSenderMetrics() *BufferedSenderMetrics {
return &BufferedSenderMetrics{
BufferedSenderQueueSize: metric.NewGauge(metaBufferedSenderQueueSize),
}
}
2 changes: 1 addition & 1 deletion pkg/kv/kvserver/rangefeed/stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
case scheduledProcessorWithUnbufferedSender:
s = NewUnbufferedSender(testServerStream)
case scheduledProcessorWithBufferedSender:
s = NewBufferedSender(testServerStream)
s = NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
default:
t.Fatalf("unknown rangefeed test type %v", rt)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))

Expand Down Expand Up @@ -106,7 +106,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)
Expand Down
6 changes: 5 additions & 1 deletion pkg/server/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,9 @@ type nodeMetrics struct {
// StreamManagerMetrics is for monitoring of StreamManagers for rangefeed.
// Note that there could be multiple stream managers in a node.
StreamManagerMetrics *rangefeed.StreamManagerMetrics
// BufferedSenderMetrics is for monitoring of BufferedSenders for rangefeed.
// Note that there could be multiple buffered senders in a node.
BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
}

func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeMetrics {
Expand All @@ -279,6 +282,7 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeM
CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest),
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
StreamManagerMetrics: rangefeed.NewStreamManagerMetrics(),
BufferedSenderMetrics: rangefeed.NewBufferedSenderMetrics(),
}

for i := range nm.MethodCounts {
Expand Down Expand Up @@ -2192,7 +2196,7 @@ func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {

sm := &rangefeed.StreamManager{}
if kvserver.RangefeedUseBufferedSender.Get(&n.storeCfg.Settings.SV) {
sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream),
sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream, n.metrics.BufferedSenderMetrics),
n.metrics.StreamManagerMetrics)
} else {
sm = rangefeed.NewStreamManager(rangefeed.NewUnbufferedSender(lockedMuxStream), n.metrics.StreamManagerMetrics)
Expand Down

0 comments on commit 9d91bf6

Please sign in to comment.