Skip to content

Commit

Permalink
kvserver/rangefeed: add StreamManagerMetrics
Browse files Browse the repository at this point in the history
This patch refactors rangefeed metrics in node.Metrics into a dedicated
StreamManagerMetrics struct. This improves modularity and aligns with future
additions like BufferedSenderMetrics.

Epic: none
Release note: none
  • Loading branch information
wenyihu6 committed Feb 28, 2025
1 parent 2288b99 commit c7d4c23
Show file tree
Hide file tree
Showing 9 changed files with 99 additions and 129 deletions.
23 changes: 13 additions & 10 deletions pkg/kv/kvserver/rangefeed/buffered_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
sm := NewStreamManager(bs, testRangefeedCounter)
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)

Expand All @@ -51,13 +51,15 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
require.Equal(t, 0, bs.len())
sm.DisconnectStream(int64(streamID), err)
testServerStream.waitForEvent(t, errEvent)
require.Equal(t, int32(1), num.Load())
require.Equal(t, 1, testServerStream.totalEventsSent())
testRangefeedCounter.waitForRangefeedCount(t, 0)
smMetrics.ActiveMuxRangeFeed.Value()

waitForRangefeedCount(t, smMetrics, 0)
testServerStream.reset()
})
t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) {
Expand All @@ -66,14 +68,14 @@ func TestBufferedSenderDisconnectStream(t *testing.T) {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
sm.DisconnectStream(int64(streamID), err)
require.NoError(t, bs.waitForEmptyBuffer(ctx))
sm.DisconnectStream(int64(streamID), err)
require.NoError(t, bs.waitForEmptyBuffer(ctx))
require.Equalf(t, 1, testServerStream.totalEventsSent(),
"expected only 1 error event in %s", testServerStream.String())
testRangefeedCounter.waitForRangefeedCount(t, 0)
waitForRangefeedCount(t, smMetrics, 0)
})
}

Expand All @@ -85,9 +87,10 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()

smMetrics := NewStreamManagerMetrics()
bs := NewBufferedSender(testServerStream)
sm := NewStreamManager(bs, testRangefeedCounter)
sm := NewStreamManager(bs, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))

rng, _ := randutil.NewTestRand()
Expand Down Expand Up @@ -139,12 +142,12 @@ func TestBufferedSenderChaosWithStop(t *testing.T) {
require.Equal(t, activeStreamStart, int64(testServerStream.totalEventsSent()))
expectedActiveStreams := activeStreamEnd - activeStreamStart
require.Equal(t, int(expectedActiveStreams), sm.activeStreamCount())
testRangefeedCounter.waitForRangefeedCount(t, int(expectedActiveStreams))
waitForRangefeedCount(t, smMetrics, int(expectedActiveStreams))
})

t.Run("stream manager on stop", func(t *testing.T) {
sm.Stop(ctx)
require.Equal(t, 0, testRangefeedCounter.get())
require.Equal(t, int64(0), smMetrics.ActiveMuxRangeFeed.Value())
require.Equal(t, 0, sm.activeStreamCount())
// Cleanup functions should be called for all active streams.
require.Equal(t, int32(activeStreamEnd), actualSum.Load())
Expand Down
32 changes: 32 additions & 0 deletions pkg/kv/kvserver/rangefeed/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,3 +234,35 @@ func newSchedulerShardMetrics(name string, histogramWindow time.Duration) *Shard
QueueSize: metric.NewGauge(expandTemplate(metaQueueSizeTemplate)),
}
}

// StreamManagerMetrics are for monitoring of a StreamManager. Used by both
// buffered and unbuffered sender.
type StreamManagerMetrics struct {
NumMuxRangeFeed *metric.Counter
ActiveMuxRangeFeed *metric.Gauge
}

var (
metaActiveMuxRangeFeed = metric.Metadata{
Name: "rpc.streams.mux_rangefeed.active",
Help: `Number of currently running MuxRangeFeed streams`,
Measurement: "Streams",
Unit: metric.Unit_COUNT,
}
metaTotalMuxRangeFeed = metric.Metadata{
Name: "rpc.streams.mux_rangefeed.recv",
Help: `Total number of MuxRangeFeed streams`,
Measurement: "Streams",
Unit: metric.Unit_COUNT,
}
)

func (*StreamManagerMetrics) MetricStruct() {}

// NewStreamManagerMetrics creates new metrics for StreamManager.
func NewStreamManagerMetrics() *StreamManagerMetrics {
return &StreamManagerMetrics{
ActiveMuxRangeFeed: metric.NewGauge(metaActiveMuxRangeFeed),
NumMuxRangeFeed: metric.NewCounter(metaTotalMuxRangeFeed),
}
}
35 changes: 4 additions & 31 deletions pkg/kv/kvserver/rangefeed/sender_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"

"github.com/cockroachdb/cockroach/pkg/kv/kvpb"
Expand All @@ -19,39 +18,13 @@ import (
"github.com/cockroachdb/errors"
)

// testRangefeedCounter mocks rangefeed metrics for testing.
type testRangefeedCounter struct {
count atomic.Int32
}

var _ RangefeedMetricsRecorder = &testRangefeedCounter{}

func newTestRangefeedCounter() *testRangefeedCounter {
return &testRangefeedCounter{}
}

func (c *testRangefeedCounter) UpdateMetricsOnRangefeedConnect() {
c.count.Add(1)
}

func (c *testRangefeedCounter) UpdateMetricsOnRangefeedDisconnect() {
c.UpdateMetricsOnRangefeedDisconnectBy(1)
}

func (c *testRangefeedCounter) UpdateMetricsOnRangefeedDisconnectBy(num int64) {
c.count.Add(int32(-num))
}

func (c *testRangefeedCounter) get() int {
return int(c.count.Load())
}

func (c *testRangefeedCounter) waitForRangefeedCount(t *testing.T, count int) {
func waitForRangefeedCount(t *testing.T, smm *StreamManagerMetrics, count int) {
testutils.SucceedsSoon(t, func() error {
if c.get() == count {
v := smm.ActiveMuxRangeFeed.Value()
if v == int64(count) {
return nil
}
return errors.Newf("expected %d rangefeeds, found %d", count, c.get())
return errors.Newf("expected %d rangefeeds, found %d", count, v)
})
}

Expand Down
11 changes: 6 additions & 5 deletions pkg/kv/kvserver/rangefeed/stream_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ type StreamManager struct {

// metrics is used to record rangefeed metrics for the node. It tracks number
// of active rangefeeds.
metrics RangefeedMetricsRecorder
metrics *StreamManagerMetrics
}

// sender is an interface that is implemented by BufferedSender and
Expand All @@ -80,7 +80,7 @@ type sender interface {
cleanup(ctx context.Context)
}

func NewStreamManager(sender sender, metrics RangefeedMetricsRecorder) *StreamManager {
func NewStreamManager(sender sender, metrics *StreamManagerMetrics) *StreamManager {
sm := &StreamManager{
sender: sender,
metrics: metrics,
Expand Down Expand Up @@ -112,7 +112,7 @@ func (sm *StreamManager) OnError(streamID int64) {
defer sm.streams.Unlock()
if _, ok := sm.streams.m[streamID]; ok {
delete(sm.streams.m, streamID)
sm.metrics.UpdateMetricsOnRangefeedDisconnect()
sm.metrics.ActiveMuxRangeFeed.Dec(1)
}
}

Expand Down Expand Up @@ -149,7 +149,8 @@ func (sm *StreamManager) AddStream(streamID int64, d Disconnector) {
log.Fatalf(context.Background(), "stream %d already exists", streamID)
}
sm.streams.m[streamID] = d
sm.metrics.UpdateMetricsOnRangefeedConnect()
sm.metrics.ActiveMuxRangeFeed.Inc(1)
sm.metrics.NumMuxRangeFeed.Inc(1)
}

// Start launches sender.run in the background if no error is returned.
Expand Down Expand Up @@ -190,7 +191,7 @@ func (sm *StreamManager) Stop(ctx context.Context) {
defer sm.streams.Unlock()
rangefeedClosedErr := kvpb.NewError(
kvpb.NewRangeFeedRetryError(kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED))
sm.metrics.UpdateMetricsOnRangefeedDisconnectBy(int64(len(sm.streams.m)))
sm.metrics.ActiveMuxRangeFeed.Dec(int64(len(sm.streams.m)))
for _, disconnector := range sm.streams.m {
// Disconnect all streams with a retry error. No rangefeed errors will be
// sent to the client after shutdown, but the gRPC stream will still
Expand Down
34 changes: 17 additions & 17 deletions pkg/kv/kvserver/rangefeed/stream_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestStreamManagerDisconnectStream(t *testing.T) {

testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) {
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
smMetrics := NewStreamManagerMetrics()
var s sender
switch rt {
case scheduledProcessorWithUnbufferedSender:
Expand All @@ -45,7 +45,7 @@ func TestStreamManagerDisconnectStream(t *testing.T) {
t.Fatalf("unknown rangefeed test type %v", rt)
}

sm := NewStreamManager(s, testRangefeedCounter)
sm := NewStreamManager(s, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))
defer sm.Stop(ctx)

Expand All @@ -61,13 +61,13 @@ func TestStreamManagerDisconnectStream(t *testing.T) {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
require.Equal(t, 0, testServerStream.totalEventsSent())
sm.DisconnectStream(int64(streamID), err)
testServerStream.waitForEvent(t, errEvent)
require.Equal(t, int32(1), num.Load())
require.Equal(t, 1, testServerStream.totalEventsSent())
testRangefeedCounter.waitForRangefeedCount(t, 0)
waitForRangefeedCount(t, smMetrics, 0)
testServerStream.reset()
})
t.Run("disconnect stream on the same stream is idempotent", func(t *testing.T) {
Expand All @@ -76,13 +76,13 @@ func TestStreamManagerDisconnectStream(t *testing.T) {
require.NoError(t, sm.sender.sendBuffered(errEvent, nil))
},
})
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
sm.DisconnectStream(int64(streamID), err)
sm.DisconnectStream(int64(streamID), err)
testServerStream.waitForEvent(t, errEvent)
require.Equalf(t, 1, testServerStream.totalEventsSent(),
"expected only 1 error event but got %s", testServerStream.String())
testRangefeedCounter.waitForRangefeedCount(t, 0)
waitForRangefeedCount(t, smMetrics, 0)
})
})
}
Expand All @@ -98,15 +98,15 @@ func TestStreamManagerChaosWithStop(t *testing.T) {
defer stopper.Stop(ctx)
testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) {
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
smMetrics := NewStreamManagerMetrics()
var s sender
switch rt {
case scheduledProcessorWithUnbufferedSender:
s = NewUnbufferedSender(testServerStream)
default:
t.Fatalf("unknown rangefeed test type %v", rt)
}
sm := NewStreamManager(s, testRangefeedCounter)
sm := NewStreamManager(s, smMetrics)
require.NoError(t, sm.Start(ctx, stopper))

rng, _ := randutil.NewTestRand()
Expand Down Expand Up @@ -150,12 +150,12 @@ func TestStreamManagerChaosWithStop(t *testing.T) {
testServerStream.waitForEventCount(t, int(activeStreamStart))
expectedActiveStreams := activeStreamEnd - activeStreamStart
require.Equal(t, int(expectedActiveStreams), sm.activeStreamCount())
testRangefeedCounter.waitForRangefeedCount(t, int(expectedActiveStreams))
waitForRangefeedCount(t, smMetrics, int(expectedActiveStreams))
})

t.Run("stream manager on stop", func(t *testing.T) {
sm.Stop(ctx)
require.Equal(t, 0, testRangefeedCounter.get())
require.Equal(t, int64(0), smMetrics.ActiveMuxRangeFeed.Value())
require.Equal(t, 0, sm.activeStreamCount())
// Cleanup functions should be called for all active streams.
require.Equal(t, int32(activeStreamEnd), actualSum.Load())
Expand All @@ -174,7 +174,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {

testutils.RunValues(t, "feed type", testTypes, func(t *testing.T, rt rangefeedTestType) {
testServerStream := newTestServerStream()
testRangefeedCounter := newTestRangefeedCounter()
smMetrics := NewStreamManagerMetrics()
var s sender
switch rt {
case scheduledProcessorWithUnbufferedSender:
Expand All @@ -185,15 +185,15 @@ func TestStreamManagerErrorHandling(t *testing.T) {
t.Fatalf("unknown rangefeed test type %v", rt)
}

sm := NewStreamManager(s, testRangefeedCounter)
sm := NewStreamManager(s, smMetrics)
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
require.NoError(t, sm.Start(ctx, stopper))
const sID, rID = int64(0), 1
disconnectErr := kvpb.NewError(fmt.Errorf("disconnection error"))

expectErrorHandlingInvariance := func(p Processor) {
testRangefeedCounter.waitForRangefeedCount(t, 0)
waitForRangefeedCount(t, smMetrics, 0)
testutils.SucceedsSoon(t, func() error {
if p.Len() == 0 {
return nil
Expand Down Expand Up @@ -226,7 +226,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
stream)
require.True(t, registered)
go p.StopWithErr(disconnectErr)
require.Equal(t, 0, testRangefeedCounter.get())
require.Equal(t, int64(0), smMetrics.ActiveMuxRangeFeed.Value())
sm.AddStream(sID, d)
expectErrorHandlingInvariance(p)
testServerStream.reset()
Expand All @@ -241,7 +241,7 @@ func TestStreamManagerErrorHandling(t *testing.T) {
require.True(t, registered)
sm.AddStream(sID, d)
require.Equal(t, 1, p.Len())
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
sm.DisconnectStream(sID, disconnectErr)
expectErrorHandlingInvariance(p)
testServerStream.reset()
Expand All @@ -255,11 +255,11 @@ func TestStreamManagerErrorHandling(t *testing.T) {
stream)
require.True(t, registered)
sm.AddStream(sID, d)
require.Equal(t, 1, testRangefeedCounter.get())
require.Equal(t, int64(1), smMetrics.ActiveMuxRangeFeed.Value())
require.Equal(t, 1, p.Len())
sm.Stop(ctx)
// No disconnect events should be sent during Stop().
testRangefeedCounter.waitForRangefeedCount(t, 0)
waitForRangefeedCount(t, smMetrics, 0)
testutils.SucceedsSoon(t, func() error {
if p.Len() == 0 {
return nil
Expand Down
Loading

0 comments on commit c7d4c23

Please sign in to comment.