diff --git a/docs/generated/metrics/metrics.html b/docs/generated/metrics/metrics.html
index 0d3df5d5abe2..6bc9fdc09b46 100644
--- a/docs/generated/metrics/metrics.html
+++ b/docs/generated/metrics/metrics.html
@@ -248,6 +248,7 @@
STORAGE | kv.rangefeed.budget_allocation_blocked | Number of times RangeFeed waited for budget availability | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.budget_allocation_failed | Number of times RangeFeed failed because memory budget was exceeded | Events | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.buffered_registrations | Number of active RangeFeed buffered registrations | Registrations | GAUGE | COUNT | AVG | NONE |
+STORAGE | kv.rangefeed.buffered_sender.queue_size | Number of entries in the buffered sender queue | Pending Events | GAUGE | COUNT | AVG | NONE |
STORAGE | kv.rangefeed.catchup_scan_nanos | Time spent in RangeFeed catchup scan | Nanoseconds | COUNTER | NANOSECONDS | AVG | NON_NEGATIVE_DERIVATIVE |
STORAGE | kv.rangefeed.closed_timestamp.slow_ranges | Number of ranges that have a closed timestamp lagging by more than 5x target lag. Periodically re-calculated | Ranges | GAUGE | COUNT | AVG | NONE |
STORAGE | kv.rangefeed.closed_timestamp.slow_ranges.cancelled | Number of rangefeeds that were cancelled due to a chronically lagging closed timestamp | Cancellation Count | COUNTER | COUNT | AVG | NON_NEGATIVE_DERIVATIVE |
diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender.go b/pkg/kv/kvserver/rangefeed/buffered_sender.go
index 92c8667d219b..8edeabaa1418 100644
--- a/pkg/kv/kvserver/rangefeed/buffered_sender.go
+++ b/pkg/kv/kvserver/rangefeed/buffered_sender.go
@@ -64,11 +64,19 @@ type BufferedSender struct {
// are events to send. Channel is initialised with a buffer of 1 and all writes to it
// are non-blocking.
notifyDataC chan struct{}
+
+ // metrics is used to track the set of BufferedSender related metrics for a
+ // given node. Note that there could be multiple buffered senders in a node,
+ // sharing the metrics.
+ metrics *BufferedSenderMetrics
}
-func NewBufferedSender(sender ServerStreamSender) *BufferedSender {
+func NewBufferedSender(
+ sender ServerStreamSender, bsMetrics *BufferedSenderMetrics,
+) *BufferedSender {
bs := &BufferedSender{
- sender: sender,
+ sender: sender,
+ metrics: bsMetrics,
}
bs.queueMu.buffer = newEventQueue()
bs.notifyDataC = make(chan struct{}, 1)
@@ -90,6 +98,7 @@ func (bs *BufferedSender) sendBuffered(
// TODO(wenyihu6): pass an actual context here
alloc.Use(context.Background())
bs.queueMu.buffer.pushBack(sharedMuxEvent{ev, alloc})
+ bs.metrics.BufferedSenderQueueSize.Inc(1)
select {
case bs.notifyDataC <- struct{}{}:
default:
@@ -122,6 +131,7 @@ func (bs *BufferedSender) run(
case <-bs.notifyDataC:
for {
e, success := bs.popFront()
+ bs.metrics.BufferedSenderQueueSize.Dec(1)
if !success {
break
}
@@ -153,7 +163,9 @@ func (bs *BufferedSender) cleanup(ctx context.Context) {
bs.queueMu.Lock()
defer bs.queueMu.Unlock()
bs.queueMu.stopped = true
+ remaining := bs.queueMu.buffer.len()
bs.queueMu.buffer.drain(ctx)
+ bs.metrics.BufferedSenderQueueSize.Dec(remaining)
}
// Used for testing only.
diff --git a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go
index bf04299c7aa9..8f2cf53a9cab 100644
--- a/pkg/kv/kvserver/rangefeed/buffered_sender_test.go
+++ b/pkg/kv/kvserver/rangefeed/buffered_sender_test.go
@@ -34,7 +34,7 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
- bs := NewBufferedSender(testServerStream)
+ bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)
@@ -89,7 +89,7 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
- bs := NewBufferedSender(testServerStream)
+ bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
diff --git a/pkg/kv/kvserver/rangefeed/metrics.go b/pkg/kv/kvserver/rangefeed/metrics.go
index 22087ad0a1a5..13ec19866229 100644
--- a/pkg/kv/kvserver/rangefeed/metrics.go
+++ b/pkg/kv/kvserver/rangefeed/metrics.go
@@ -266,3 +266,26 @@ func NewStreamManagerMetrics() *StreamManagerMetrics {
NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed),
}
}
+
+// BufferedSenderMetrics are for monitoring of BufferedSender.
+type BufferedSenderMetrics struct {
+ BufferedSenderQueueSize *metric.Gauge
+}
+
+var (
+ metaBufferedSenderQueueSize = metric.Metadata{
+ Name: "kv.rangefeed.buffered_sender.queue_size",
+ Help: `Number of entries in the buffered sender queue`,
+ Measurement: "Pending Events",
+ Unit: metric.Unit_COUNT,
+ }
+)
+
+func (*BufferedSenderMetrics) MetricStruct() {}
+
+// NewBufferedSenderMetrics makes the metrics for BufferedSender.
+func NewBufferedSenderMetrics() *BufferedSenderMetrics {
+ return &BufferedSenderMetrics{
+ BufferedSenderQueueSize: metric.NewGauge(metaBufferedSenderQueueSize),
+ }
+}
diff --git a/pkg/kv/kvserver/rangefeed/stream_manager_test.go b/pkg/kv/kvserver/rangefeed/stream_manager_test.go
index d00b42ec1436..d84b4f9dadf8 100644
--- a/pkg/kv/kvserver/rangefeed/stream_manager_test.go
+++ b/pkg/kv/kvserver/rangefeed/stream_manager_test.go
@@ -180,7 +180,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
case scheduledProcessorWithUnbufferedSender:
s = NewUnbufferedSender(testServerStream)
case scheduledProcessorWithBufferedSender:
- s = NewBufferedSender(testServerStream)
+ s = NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
default:
t.Fatalf("unknown rangefeed test type %v", rt)
}
diff --git a/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go
index bea395fb3f51..d07572650e27 100644
--- a/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go
+++ b/pkg/kv/kvserver/rangefeed/unbuffered_registration_test.go
@@ -34,7 +34,7 @@ func TestUnbufferedRegWithStreamManager(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
- bs := NewBufferedSender(testServerStream)
+ bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
@@ -106,7 +106,7 @@ func TestUnbufferedRegCorrectnessOnDisconnect(t *testing.T) {
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
smMetrics := NewStreamManagerMetrics()
- bs := NewBufferedSender(testServerStream)
+ bs := NewBufferedSender(testServerStream, NewBufferedSenderMetrics())
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)
diff --git a/pkg/server/node.go b/pkg/server/node.go
index dcf837da74d5..216a65de7d3a 100644
--- a/pkg/server/node.go
+++ b/pkg/server/node.go
@@ -259,6 +259,9 @@ type nodeMetrics struct {
// StreamManagerMetrics is for monitoring of StreamManagers for rangefeed.
// Note that there could be multiple stream managers in a node.
StreamManagerMetrics *rangefeed.StreamManagerMetrics
+ // BufferedSenderMetrics is for monitoring of BufferedSenders for rangefeed.
+ // Note that there could be multiple buffered senders in a node.
+ BufferedSenderMetrics *rangefeed.BufferedSenderMetrics
}
func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeMetrics {
@@ -279,6 +282,7 @@ func makeNodeMetrics(reg *metric.Registry, histogramWindow time.Duration) *nodeM
CrossZoneBatchRequestBytes: metric.NewCounter(metaCrossZoneBatchRequest),
CrossZoneBatchResponseBytes: metric.NewCounter(metaCrossZoneBatchResponse),
StreamManagerMetrics: rangefeed.NewStreamManagerMetrics(),
+ BufferedSenderMetrics: rangefeed.NewBufferedSenderMetrics(),
}
for i := range nm.MethodCounts {
@@ -2192,7 +2196,7 @@ func (n *Node) MuxRangeFeed(muxStream kvpb.Internal_MuxRangeFeedServer) error {
sm := &rangefeed.StreamManager{}
if kvserver.RangefeedUseBufferedSender.Get(&n.storeCfg.Settings.SV) {
- sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream),
+ sm = rangefeed.NewStreamManager(rangefeed.NewBufferedSender(lockedMuxStream, n.metrics.BufferedSenderMetrics),
n.metrics.StreamManagerMetrics)
} else {
sm = rangefeed.NewStreamManager(rangefeed.NewUnbufferedSender(lockedMuxStream), n.metrics.StreamManagerMetrics)