From 11d65560bf9367726a033daf06c84b4cad80a9b5 Mon Sep 17 00:00:00 2001 From: Yevgeniy Miretskiy Date: Wed, 23 Aug 2023 11:51:00 -0400 Subject: [PATCH] kvcoord: Pace rangefeed client goroutine creation Acquire catchup scan quota prior to goroutine creation in order to pace the goroutine creation rate. This change results in nice and smooth growth in goroutine count, thus reducing the pressure on goroutine scheduler, which in turn reduces the impact on SQL latency during changefeed startup. This change also improves observability in rangefeed client by introducing new counters: * `distsender.rangefeed.post_catchup_ranges`: gauge keeping track of the number of ranges which have completed their catchup scan. * `distsender.rangefeed.retry.`: counter keeping track of the number of ranges that ecountered a retryable error of particular type (e.g. slow counsumer, range split, etc). Observability also enhanced by adding a column to `crdb_internal.active_rangefeed` virtual table augment to indicate if the range is currently in catchup scan mode. Fixes #98842 Release note: None --- pkg/cli/zip_table_registry.go | 1 + pkg/kv/kvclient/kvcoord/dist_sender.go | 43 ++++- .../kvcoord/dist_sender_mux_rangefeed.go | 26 +-- .../kvclient/kvcoord/dist_sender_rangefeed.go | 171 +++++++++++------- .../kvcoord/dist_sender_rangefeed_test.go | 9 + pkg/kv/kvpb/errors.go | 6 + pkg/sql/crdb_internal.go | 2 + .../testdata/logic_test/crdb_internal_catalog | 2 +- 8 files changed, 168 insertions(+), 92 deletions(-) diff --git a/pkg/cli/zip_table_registry.go b/pkg/cli/zip_table_registry.go index fc508193712e..5006bc27472a 100644 --- a/pkg/cli/zip_table_registry.go +++ b/pkg/cli/zip_table_registry.go @@ -598,6 +598,7 @@ var zipInternalTablesPerNode = DebugZipTableRegistry{ "range_end", "resolved", "last_event_utc", + "catchup", }, }, "crdb_internal.feature_usage": { diff --git a/pkg/kv/kvclient/kvcoord/dist_sender.go b/pkg/kv/kvclient/kvcoord/dist_sender.go index b6203424d82c..da840b4d46f8 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender.go @@ -206,6 +206,15 @@ This counts the number of ranges with an active rangefeed. Help: `Number of ranges in catchup mode This counts the number of ranges with an active rangefeed that are performing catchup scan. +`, + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + } + metaDistSenderRangefeedPostCatchupRanges = metric.Metadata{ + Name: "distsender.rangefeed.post_catchup_ranges", + Help: `Number of post-catchup ranges + +This counts the number of ranges that are active, and have completed their catchup scan `, Measurement: "Ranges", Unit: metric.Unit_COUNT, @@ -308,11 +317,13 @@ type DistSenderMetrics struct { // DistSenderRangeFeedMetrics is a set of rangefeed specific metrics. type DistSenderRangeFeedMetrics struct { - RangefeedRanges *metric.Gauge - RangefeedCatchupRanges *metric.Gauge - RangefeedErrorCatchup *metric.Counter - RangefeedRestartRanges *metric.Counter - RangefeedRestartStuck *metric.Counter + RangefeedRanges *metric.Gauge + RangefeedCatchupRanges *metric.Gauge + RangefeedPostCatchupRanges *metric.Gauge + RangefeedErrorCatchup *metric.Counter + RangefeedRetryErrors [kvpb.NumRangeFeedRetryErrors]*metric.Counter + RangefeedRestartRanges *metric.Counter + RangefeedRestartStuck *metric.Counter } func makeDistSenderMetrics() DistSenderMetrics { @@ -354,12 +365,24 @@ func makeDistSenderMetrics() DistSenderMetrics { } func makeDistSenderRangeFeedMetrics() DistSenderRangeFeedMetrics { + var retryCounters [kvpb.NumRangeFeedRetryErrors]*metric.Counter + for idx, name := range kvpb.RangeFeedRetryError_Reason_name { + retryCounters[idx] = metric.NewCounter(metric.Metadata{ + Name: fmt.Sprintf("distsender.rangefeed.retry.%s", strings.ToLower(name)), + Help: `Number of ranges in retried due to rangefeed retry error`, + Measurement: "Ranges", + Unit: metric.Unit_COUNT, + }) + } + return DistSenderRangeFeedMetrics{ - RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), - RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), - RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), - RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), - RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), + RangefeedRanges: metric.NewGauge(metaDistSenderRangefeedTotalRanges), + RangefeedCatchupRanges: metric.NewGauge(metaDistSenderRangefeedCatchupRanges), + RangefeedPostCatchupRanges: metric.NewGauge(metaDistSenderRangefeedPostCatchupRanges), + RangefeedRetryErrors: retryCounters, + RangefeedErrorCatchup: metric.NewCounter(metaDistSenderRangefeedErrorCatchupRanges), + RangefeedRestartRanges: metric.NewCounter(metaDistSenderRangefeedRestartRanges), + RangefeedRestartStuck: metric.NewCounter(metaDistSenderRangefeedRestartStuck), } } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go index 4c0e2105bd59..8ba27b031526 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_mux_rangefeed.go @@ -95,7 +95,10 @@ func muxRangeFeed( m.metrics = cfg.knobs.metrics } - divideAllSpansOnRangeBoundaries(spans, m.startSingleRangeFeed, ds, &m.g) + m.g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, m.startSingleRangeFeed, ds, &m.g) + }) + return errors.CombineErrors(m.g.Wait(), ctx.Err()) } @@ -165,12 +168,6 @@ type activeMuxRangeFeed struct { roachpb.ReplicaDescriptor startAfter hlc.Timestamp - // catchupRes is the catchup scan quota acquired upon the - // start of rangefeed. - // It is released when this stream receives first non-empty checkpoint - // (meaning: catchup scan completes). - catchupRes catchupAlloc - // State pertaining to execution of rangefeed call. token rangecache.EvictionToken transport Transport @@ -218,7 +215,7 @@ func (m *rangefeedMuxer) startSingleRangeFeed( // Register active mux range feed. stream := &activeMuxRangeFeed{ - activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics.RangefeedRanges), + activeRangeFeed: newActiveRangeFeed(span, startAfter, m.registry, m.metrics), rSpan: rs, startAfter: startAfter, token: token, @@ -409,7 +406,7 @@ func (m *rangefeedMuxer) startNodeMuxRangeFeed( // make sure that the underlying error is not fatal. If it is, there is no // reason to restart each rangefeed, so just bail out. - if _, err := handleRangefeedError(ctx, recvErr); err != nil { + if _, err := handleRangefeedError(ctx, m.metrics, recvErr); err != nil { // Regardless of an error, release any resources (i.e. metrics) still // being held by active stream. for _, s := range toRestart { @@ -468,8 +465,8 @@ func (m *rangefeedMuxer) receiveEventsFromNode( if t.Span.Contains(active.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil + active.releaseCatchupScan() + m.metrics.RangefeedPostCatchupRanges.Inc(1) } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -524,10 +521,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( // Release catchup scan reservation if any -- we will acquire another // one when we restart. - if active.catchupRes != nil { - active.catchupRes.Release() - active.catchupRes = nil - } + active.releaseCatchupScan() doRelease := true defer func() { @@ -536,7 +530,7 @@ func (m *rangefeedMuxer) restartActiveRangeFeed( } }() - errInfo, err := handleRangefeedError(ctx, reason) + errInfo, err := handleRangefeedError(ctx, m.metrics, reason) if err != nil { // If this is an error we cannot recover from, terminate the rangefeed. return err diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 6bc6c66fd504..49a82380f900 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -40,7 +40,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/iterutil" "github.com/cockroachdb/cockroach/pkg/util/limit" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/pprofutil" "github.com/cockroachdb/cockroach/pkg/util/retry" "github.com/cockroachdb/cockroach/pkg/util/syncutil" @@ -228,10 +227,37 @@ func (ds *DistSender) RangeFeedSpans( for { select { case sri := <-rangeCh: + // Bound the partial rangefeed to the partial span. + span := sri.rs.AsRawSpanWithNoLocals() + + // Register partial range feed with registry. We do this prior to acquiring + // catchup scan quota so that we have some observability into the ranges + // that are blocked, waiting for quota. + active := newActiveRangeFeed(span, sri.startAfter, rr, metrics) + + acquireStart := timeutil.Now() + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquisition)", span, sri.startAfter) + } + // Prior to spawning goroutine to process this feed, acquire catchup scan quota. + // This quota acquisition paces the rate of new goroutine creation. + catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, &catchupSem, metrics) + if err != nil { + active.release() + return err + } + active.catchupRes = catchupRes + if log.V(1) { + log.Infof(ctx, "RangeFeed starting for span %s@%s (quota acquired in %s)", + span, sri.startAfter, timeutil.Since(acquireStart)) + } + // Spawn a child goroutine to process this feed. g.GoCtx(func(ctx context.Context) error { - return ds.partialRangeFeed(ctx, rr, sri.rs, sri.startAfter, - sri.token, &catchupSem, rangeCh, eventCh, cfg, metrics) + defer active.release() + + return ds.partialRangeFeed(ctx, active, sri.rs, sri.startAfter, + sri.token, rangeCh, eventCh, cfg, metrics) }) case <-ctx.Done(): return ctx.Err() @@ -240,7 +266,9 @@ func (ds *DistSender) RangeFeedSpans( }) // Kick off the initial set of ranges. - divideAllSpansOnRangeBoundaries(spans, sendSingleRangeInfo(rangeCh), ds, &g) + g.GoCtx(func(ctx context.Context) error { + return divideAllSpansOnRangeBoundaries(ctx, spans, sendSingleRangeInfo(rangeCh), ds, &g) + }) return g.Wait() } @@ -249,8 +277,8 @@ func (ds *DistSender) RangeFeedSpans( // provided onRange function for each range. Resolution happens concurrently using provided // context group. func divideAllSpansOnRangeBoundaries( - spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, -) { + ctx context.Context, spans []SpanTimePair, onRange onRangeFn, ds *DistSender, g *ctxgroup.Group, +) error { // Sort input spans based on their start time -- older spans first. // Starting rangefeed over large number of spans is an expensive proposition, // since this involves initiating catch-up scan operation for each span. These @@ -265,17 +293,16 @@ func divideAllSpansOnRangeBoundaries( return spans[i].StartAfter.Less(spans[j].StartAfter) }) - for _, s := range spans { - func(stp SpanTimePair) { - g.GoCtx(func(ctx context.Context) error { - rs, err := keys.SpanAddr(stp.Span) - if err != nil { - return err - } - return divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange) - }) - }(s) + for _, stp := range spans { + rs, err := keys.SpanAddr(stp.Span) + if err != nil { + return err + } + if err := divideSpanOnRangeBoundaries(ctx, ds, rs, stp.StartAfter, onRange); err != nil { + return err + } } + return nil } // RangeFeedContext is the structure containing arguments passed to @@ -297,6 +324,7 @@ type PartialRangeFeed struct { CreatedTime time.Time LastValueReceived time.Time Resolved hlc.Timestamp + InCatchup bool NumErrs int LastErr error } @@ -311,10 +339,12 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr const continueIter = true const stopIter = false - partialRangeFeed := func(active *activeRangeFeed) PartialRangeFeed { + partialRangeFeed := func(active *activeRangeFeed) (p PartialRangeFeed) { active.Lock() defer active.Unlock() - return active.PartialRangeFeed + p = active.PartialRangeFeed + p.InCatchup = active.catchupRes != nil + return p } ds.activeRangeFeeds.Range(func(k, v interface{}) bool { @@ -336,6 +366,13 @@ func (ds *DistSender) ForEachActiveRangeFeed(fn ActiveRangeFeedIterFn) (iterErr // activeRangeFeed is a thread safe PartialRangeFeed. type activeRangeFeed struct { release func() + + // catchupRes is the catchup scan quota acquired upon the + // start of rangefeed. + // It is released when this stream receives first non-empty checkpoint + // (meaning: catchup scan completes). + catchupRes catchupAlloc + syncutil.Mutex PartialRangeFeed } @@ -436,7 +473,10 @@ func divideSpanOnRangeBoundaries( // newActiveRangeFeed registers active rangefeed with rangefeedRegistry. // The caller must call active.release() in order to cleanup. func newActiveRangeFeed( - span roachpb.Span, startAfter hlc.Timestamp, rr *rangeFeedRegistry, c *metric.Gauge, + span roachpb.Span, + startAfter hlc.Timestamp, + rr *rangeFeedRegistry, + metrics *DistSenderRangeFeedMetrics, ) *activeRangeFeed { // Register partial range feed with registry. active := &activeRangeFeed{ @@ -445,28 +485,46 @@ func newActiveRangeFeed( StartAfter: startAfter, CreatedTime: timeutil.Now(), }, - release: func() { - rr.ranges.Delete(active) - c.Dec(1) - }, } + + active.release = func() { + active.releaseCatchupScan() + rr.ranges.Delete(active) + metrics.RangefeedRanges.Dec(1) + + active.Lock() + defer active.Unlock() + if active.Resolved.IsSet() { + // Range reached post-catchup phase. + metrics.RangefeedPostCatchupRanges.Dec(1) + } + } + rr.ranges.Store(active, nil) - c.Inc(1) + metrics.RangefeedRanges.Inc(1) return active } +// releaseCatchupScan releases catchup scan allocation, if any. +// safe to call multiple times. +func (a *activeRangeFeed) releaseCatchupScan() { + if a.catchupRes != nil { + a.catchupRes.Release() + a.catchupRes = nil + } +} + // partialRangeFeed establishes a RangeFeed to the range specified by desc. It // manages lifecycle events of the range in order to maintain the RangeFeed // connection; this may involve instructing higher-level functions to retry // this rangefeed, or subdividing the range further in the event of a split. func (ds *DistSender) partialRangeFeed( ctx context.Context, - rr *rangeFeedRegistry, + active *activeRangeFeed, rs roachpb.RSpan, startAfter hlc.Timestamp, token rangecache.EvictionToken, - catchupSem *limit.ConcurrentRequestLimiter, rangeCh chan<- singleRangeInfo, eventCh chan<- RangeFeedMessage, cfg rangeFeedConfig, @@ -475,10 +533,6 @@ func (ds *DistSender) partialRangeFeed( // Bound the partial rangefeed to the partial span. span := rs.AsRawSpanWithNoLocals() - // Register partial range feed with registry. - active := newActiveRangeFeed(span, startAfter, rr, metrics.RangefeedRanges) - defer active.release() - // Start a retry loop for sending the batch to the range. for r := retry.StartWithCtx(ctx, ds.rpcRetryOptions); r.Next(); { // If we've cleared the descriptor on a send failure, re-lookup. @@ -500,20 +554,18 @@ func (ds *DistSender) partialRangeFeed( log.Infof(ctx, "RangeFeed starting for range %d@%s (%s)", token.Desc().RangeID, startAfter, span) } - maxTS, err := ds.singleRangeFeed( - ctx, span, startAfter, token.Desc(), - catchupSem, eventCh, active.onRangeEvent, cfg, metrics) + maxTS, err := ds.singleRangeFeed(ctx, active, span, startAfter, token.Desc(), eventCh, cfg, metrics) // Forward the timestamp in case we end up sending it again. startAfter.Forward(maxTS) if log.V(1) { - log.Infof(ctx, "RangeFeed %s@%s disconnected with last checkpoint %s ago: %v", - active.Span, active.StartAfter, timeutil.Since(active.Resolved.GoTime()), err) + log.Infof(ctx, "RangeFeed %d@%s (%s) disconnected with last checkpoint %s ago: %v", + token.Desc().RangeID, active.StartAfter, active.Span, timeutil.Since(active.Resolved.GoTime()), err) } active.setLastError(err) - errInfo, err := handleRangefeedError(ctx, err) + errInfo, err := handleRangefeedError(ctx, metrics, err) if err != nil { return err } @@ -522,7 +574,12 @@ func (ds *DistSender) partialRangeFeed( token.Evict(ctx) token = rangecache.EvictionToken{} } + if errInfo.resolveSpan { + // We must release catchup scan reservation prior to attempt to + // re-resolve since this will attempt to acquire 1 or more catchup + // scan reservations. + active.releaseCatchupScan() return divideSpanOnRangeBoundaries(ctx, ds, rs, startAfter, sendSingleRangeInfo(rangeCh)) } } @@ -537,7 +594,9 @@ type rangefeedErrorInfo struct { // handleRangefeedError handles an error that occurred while running rangefeed. // Returns rangefeedErrorInfo describing how the error should be handled for the // range. Returns an error if the entire rangefeed should terminate. -func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, error) { +func handleRangefeedError( + ctx context.Context, metrics *DistSenderRangeFeedMetrics, err error, +) (rangefeedErrorInfo, error) { if err == nil { return rangefeedErrorInfo{}, nil } @@ -578,10 +637,12 @@ func handleRangefeedError(ctx context.Context, err error) (rangefeedErrorInfo, e kvpb.RangeFeedRetryError_REASON_RANGEFEED_CLOSED: // Try again with same descriptor. These are transient // errors that should not show up again. + metrics.RangefeedRetryErrors[t.Reason].Inc(1) return rangefeedErrorInfo{}, nil case kvpb.RangeFeedRetryError_REASON_RANGE_SPLIT, kvpb.RangeFeedRetryError_REASON_RANGE_MERGED, kvpb.RangeFeedRetryError_REASON_NO_LEASEHOLDER: + metrics.RangefeedRetryErrors[t.Reason].Inc(1) return rangefeedErrorInfo{evict: true, resolveSpan: true}, nil default: return rangefeedErrorInfo{}, errors.AssertionFailedf("unrecognized retryable error type: %T", err) @@ -636,10 +697,6 @@ func newTransportForRange( return ds.transportFactory(opts, ds.nodeDialer, replicas) } -// onRangeEventCb is invoked for each non-error range event. -// nodeID identifies the node ID which generated the event. -type onRangeEventCb func(nodeID roachpb.NodeID, rangeID roachpb.RangeID, event *kvpb.RangeFeedEvent) - // makeRangeFeedRequest constructs kvpb.RangeFeedRequest for specified span and // rangeID. Request is constructed to watch event after specified timestamp, and // with optional diff. If the request corresponds to a system range, request @@ -702,12 +759,11 @@ func defaultStuckRangeThreshold(st *cluster.Settings) func() time.Duration { // request's timestamp if not checkpoints are seen. func (ds *DistSender) singleRangeFeed( ctx context.Context, + active *activeRangeFeed, span roachpb.Span, startAfter hlc.Timestamp, desc *roachpb.RangeDescriptor, - catchupSem *limit.ConcurrentRequestLimiter, eventCh chan<- RangeFeedMessage, - onRangeEvent onRangeEventCb, cfg rangeFeedConfig, metrics *DistSenderRangeFeedMetrics, ) (_ hlc.Timestamp, retErr error) { @@ -727,22 +783,6 @@ func (ds *DistSender) singleRangeFeed( } defer transport.Release() - // Indicate catchup scan is starting; Before potentially blocking on a semaphore, take - // opportunity to update semaphore limit. - catchupRes, err := acquireCatchupScanQuota(ctx, &ds.st.SV, catchupSem, metrics) - if err != nil { - return hlc.Timestamp{}, err - } - - finishCatchupScan := func() { - if catchupRes != nil { - catchupRes.Release() - catchupRes = nil - } - } - // cleanup catchup reservation in case of early termination. - defer finishCatchupScan() - stuckWatcher := newStuckRangeFeedCanceler(cancelFeed, defaultStuckRangeThreshold(ds.st)) defer stuckWatcher.stop() @@ -797,7 +837,7 @@ func (ds *DistSender) singleRangeFeed( }); err != nil { log.VErrEventf(ctx, 2, "RPC error: %s", err) if stuckWatcher.stuck() { - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, err @@ -818,8 +858,9 @@ func (ds *DistSender) singleRangeFeed( case *kvpb.RangeFeedCheckpoint: if t.Span.Contains(args.Span) { // If we see the first non-empty checkpoint, we know we're done with the catchup scan. - if !t.ResolvedTS.IsEmpty() && catchupRes != nil { - finishCatchupScan() + if !t.ResolvedTS.IsEmpty() && active.catchupRes != nil { + active.releaseCatchupScan() + metrics.RangefeedPostCatchupRanges.Inc(1) } // Note that this timestamp means that all rows in the span with // writes at or before the timestamp have now been seen. The @@ -830,19 +871,19 @@ func (ds *DistSender) singleRangeFeed( } case *kvpb.RangeFeedError: log.VErrEventf(ctx, 2, "RangeFeedError: %s", t.Error.GoError()) - if catchupRes != nil { + if active.catchupRes != nil { metrics.RangefeedErrorCatchup.Inc(1) } if stuckWatcher.stuck() { // When the stuck watcher fired, and the rangefeed call is local, // the remote might notice the cancellation first and return from // Recv with an error that we need to special-case here. - afterCatchUpScan := catchupRes == nil + afterCatchUpScan := active.catchupRes == nil return args.Timestamp, handleStuckEvent(&args, afterCatchUpScan, stuckWatcher.threshold(), metrics) } return args.Timestamp, t.Error.GoError() } - onRangeEvent(args.Replica.NodeID, desc.RangeID, event) + active.onRangeEvent(args.Replica.NodeID, desc.RangeID, event) select { case eventCh <- msg: diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go index 2452db21890b..c41ea0ca7da2 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed_test.go @@ -682,6 +682,7 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // Upon shutdown, make sure the metrics have correct values. defer func() { require.EqualValues(t, 0, metrics.RangefeedRanges.Value()) + require.EqualValues(t, 0, metrics.RangefeedPostCatchupRanges.Value()) require.EqualValues(t, 0, metrics.RangefeedRestartStuck.Count()) // We injected numRangesToRetry transient errors during catchup scan. @@ -802,6 +803,14 @@ func TestRangeFeedMetricsManagement(t *testing.T) { // We also know that we have blocked numCatchupToBlock ranges in their catchup scan. require.EqualValues(t, numCatchupToBlock, metrics.RangefeedCatchupRanges.Value()) + + // All but numCatchupToBlock ranges are also in the post catchup state. + testutils.SucceedsWithin(t, func() error { + if metrics.RangefeedPostCatchupRanges.Value() == numRanges-numCatchupToBlock { + return nil + } + return errors.New("waiting for post catchup counter") + }, 10*time.Second) }) } diff --git a/pkg/kv/kvpb/errors.go b/pkg/kv/kvpb/errors.go index 213019010bf1..251f11e15af2 100644 --- a/pkg/kv/kvpb/errors.go +++ b/pkg/kv/kvpb/errors.go @@ -307,6 +307,8 @@ const ( InternalErrType ErrorDetailType = 25 NumErrors int = 45 + + NumRangeFeedRetryErrors = 8 ) // Register the migration of all errors that used to be in the roachpb package @@ -348,6 +350,10 @@ func init() { errors.RegisterTypeMigration(roachpbPath, "*roachpb.RefreshFailedError", &RefreshFailedError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.MVCCHistoryMutationError", &MVCCHistoryMutationError{}) errors.RegisterTypeMigration(roachpbPath, "*roachpb.InsufficientSpaceError", &InsufficientSpaceError{}) + + if len(RangeFeedRetryError_Reason_value) != NumRangeFeedRetryErrors { + panic("NumRangeFeedRetryErrors constant must be updated whenever new rangefeed retry reason added") + } } // GoError returns a Go error converted from Error. If the error is a transaction diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index 9c748f959694..c78246a8dbfb 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -7095,6 +7095,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_end STRING, resolved STRING, last_event_utc INT, + catchup BOOL, num_errs INT, last_err STRING );`, @@ -7126,6 +7127,7 @@ CREATE TABLE crdb_internal.active_range_feeds ( tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)), tree.NewDString(rf.Resolved.AsOfSystemTime()), lastEvent, + tree.MakeDBool(tree.DBool(rf.InCatchup)), tree.NewDInt(tree.DInt(rf.NumErrs)), lastErr, ) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog index 77f1b20bbbcf..62dea292c8cf 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal_catalog @@ -404,7 +404,7 @@ SELECT id, strip_volatile(descriptor) FROM crdb_internal.kv_catalog_descriptor O 4294967207 {"table": {"columns": [{"id": 1, "name": "id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "super_region_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "regions", "nullable": true, "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967207, "name": "super_regions", "nextColumnId": 5, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967208 {"table": {"columns": [{"id": 1, "name": "name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "implemented", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967208, "name": "pg_catalog_table_is_implemented", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967209 {"table": {"columns": [{"id": 1, "name": "tenant_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "total_ru", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 3, "name": "total_read_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 4, "name": "total_read_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 5, "name": "total_write_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "total_write_requests", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "total_sql_pod_seconds", "nullable": true, "type": {"family": "FloatFamily", "oid": 701, "width": 64}}, {"id": 8, "name": "total_pgwire_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 9, "name": "total_external_io_ingress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 10, "name": "total_external_io_egress_bytes", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}], "formatVersion": 3, "id": 4294967209, "name": "tenant_usage_details", "nextColumnId": 11, "nextConstraintId": 1, "nextMutationId": 1, "primaryIndex": {"foreignKey": {}, "geoConfig": {}, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1", "viewQuery": "SELECT tenant_id, (j->>'rU')::FLOAT8 AS total_ru, (j->>'readBytes')::INT8 AS total_read_bytes, (j->>'readRequests')::INT8 AS total_read_requests, (j->>'writeBytes')::INT8 AS total_write_bytes, (j->>'writeRequests')::INT8 AS total_write_requests, (j->>'sqlPodsCpuSeconds')::FLOAT8 AS total_sql_pod_seconds, (j->>'pgwireEgressBytes')::INT8 AS total_pgwire_egress_bytes, (j->>'externalIOIngressBytes')::INT8 AS total_external_io_ingress_bytes, (j->>'externalIOEgressBytes')::INT8 AS total_external_io_egress_bytes FROM (SELECT tenant_id, crdb_internal.pb_to_json('cockroach.roachpb.TenantConsumption', total_consumption) AS j FROM system.tenant_usage WHERE instance_id = 0)"}} -4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 13, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 14, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} +4294967210 {"table": {"columns": [{"id": 1, "name": "id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "tags", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "startts", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "diff", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "node_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 6, "name": "range_id", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 7, "name": "created", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 8, "name": "range_start", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 9, "name": "range_end", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 10, "name": "resolved", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 11, "name": "last_event_utc", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 12, "name": "catchup", "nullable": true, "type": {"oid": 16}}, {"id": 13, "name": "num_errs", "nullable": true, "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 14, "name": "last_err", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967210, "name": "active_range_feeds", "nextColumnId": 15, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967211 {"table": {"columns": [{"id": 1, "name": "database_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "schema_name", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 3, "name": "role", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "for_all_roles", "nullable": true, "type": {"oid": 16}}, {"id": 5, "name": "object_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 6, "name": "grantee", "type": {"family": "StringFamily", "oid": 25}}, {"id": 7, "name": "privilege_type", "type": {"family": "StringFamily", "oid": 25}}, {"id": 8, "name": "is_grantable", "nullable": true, "type": {"oid": 16}}], "formatVersion": 3, "id": 4294967211, "name": "default_privileges", "nextColumnId": 9, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967212 {"table": {"columns": [{"id": 1, "name": "region", "type": {"family": "StringFamily", "oid": 25}}, {"id": 2, "name": "zones", "type": {"arrayContents": {"family": "StringFamily", "oid": 25}, "arrayElemType": "StringFamily", "family": "ArrayFamily", "oid": 1009}}], "formatVersion": 3, "id": 4294967212, "name": "regions", "nextColumnId": 3, "nextConstraintId": 2, "nextIndexId": 2, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}} 4294967213 {"table": {"columns": [{"id": 1, "name": "trace_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 2, "name": "node_id", "type": {"family": "IntFamily", "oid": 20, "width": 64}}, {"id": 3, "name": "root_op_name", "type": {"family": "StringFamily", "oid": 25}}, {"id": 4, "name": "trace_str", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}, {"id": 5, "name": "jaeger_json", "nullable": true, "type": {"family": "StringFamily", "oid": 25}}], "formatVersion": 3, "id": 4294967213, "indexes": [{"foreignKey": {}, "geoConfig": {}, "id": 2, "interleave": {}, "keyColumnDirections": ["ASC"], "keyColumnIds": [1], "keyColumnNames": ["trace_id"], "name": "cluster_inflight_traces_trace_id_idx", "partitioning": {}, "sharded": {}, "storeColumnIds": [2, 3, 4, 5], "storeColumnNames": ["node_id", "root_op_name", "trace_str", "jaeger_json"], "version": 3}], "name": "cluster_inflight_traces", "nextColumnId": 6, "nextConstraintId": 2, "nextIndexId": 3, "nextMutationId": 1, "primaryIndex": {"constraintId": 1, "foreignKey": {}, "geoConfig": {}, "id": 1, "interleave": {}, "partitioning": {}, "sharded": {}}, "privileges": {"ownerProto": "node", "users": [{"privileges": "32", "userProto": "public"}], "version": 2}, "replacementOf": {"time": {}}, "unexposedParentSchemaId": 4294967295, "version": "1"}}