Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
113966: kvcoord: Reintroduce catchup scan semaphore for regular rangefeed r=miretskiy a=miretskiy

Re-introduce catchup scan semaphore limit, removed by #110919, for regular rangefeed.  This hard limit on the number of catchup scans is necessary to avoid OOMs when handling large scan rangefeeds (large fan-in factor) when executing many non-local ranges.

Fixes #113489

Release note: None

114000: colfetcher: disable metamorphic randomization for direct scans r=yuzefovich a=yuzefovich

This commit makes it so that we no longer - for now - use metamorphic randomization for the default value of
`sql.distsql.direct_columnar_scans.enabled` cluster setting that controls whether the direct columnar scans (aka "KV projection pushdown") is enabled. It appears that we might be missing some memory accounting in the local fast path of this feature, and some backup-related roachtests run into OOMs with binaries with "enabled assertions". Disabling this metamorphization for now seems good to silence failures in case of this now-known issue.

Informs: #113816

Epic: None

Release note: None

114026: kvnemesis: bump default steps to 100 r=erikgrinaker a=erikgrinaker

50 steps is usually too small to trigger interesting behaviors. Bump it to 100, which is still small enough to be easily debuggable.

The nightlies already run with 1000 steps.

Epic: none
Release note: None

Co-authored-by: Yevgeniy Miretskiy <[email protected]>
Co-authored-by: Yahor Yuzefovich <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
  • Loading branch information
4 people committed Nov 8, 2023
4 parents 46220d9 + d870c93 + 9c8d9f9 + fd68f26 commit 9743b28
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 16 deletions.
1 change: 1 addition & 0 deletions pkg/kv/kvclient/kvcoord/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ go_library(
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/iterutil",
"//pkg/util/limit",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/pprofutil",
Expand Down
69 changes: 63 additions & 6 deletions pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/grpcutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/iterutil"
"github.com/cockroachdb/cockroach/pkg/util/limit"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/pprofutil"
"github.com/cockroachdb/cockroach/pkg/util/quotapool"
Expand Down Expand Up @@ -72,6 +73,14 @@ var catchupStartupRate = settings.RegisterIntSetting(
settings.WithPublic,
)

var catchupScanConcurrency = settings.RegisterIntSetting(
settings.ApplicationLevel,
"kv.rangefeed.catchup_scan_concurrency",
"number of catchup scans that a single rangefeed can execute concurrently; 0 implies unlimited",
8,
settings.NonNegativeInt,
)

var rangefeedRangeStuckThreshold = settings.RegisterDurationSetting(
settings.ApplicationLevel,
"kv.rangefeed.range_stuck_threshold",
Expand Down Expand Up @@ -220,7 +229,7 @@ func (ds *DistSender) RangeFeedSpans(
cfg.rangeObserver(rr.ForEachPartialRangefeed)
}

rl := newCatchupScanRateLimiter(&ds.st.SV)
rl := newCatchupScanRateLimiter(&ds.st.SV, cfg.useMuxRangeFeed)

if enableMuxRangeFeed && cfg.useMuxRangeFeed {
return muxRangeFeed(ctx, cfg, spans, ds, rr, rl, eventCh)
Expand Down Expand Up @@ -694,11 +703,13 @@ func (a *activeRangeFeed) acquireCatchupScanQuota(
ctx context.Context, rl *catchupScanRateLimiter, metrics *DistSenderRangeFeedMetrics,
) error {
// Indicate catchup scan is starting.
if err := rl.Pace(ctx); err != nil {
alloc, err := rl.Pace(ctx)
if err != nil {
return err
}
metrics.RangefeedCatchupRanges.Inc(1)
a.catchupRes = func() {
alloc.Release()
metrics.RangefeedCatchupRanges.Dec(1)
}

Expand Down Expand Up @@ -987,18 +998,48 @@ type catchupScanRateLimiter struct {
pacer *quotapool.RateLimiter
sv *settings.Values
limit quotapool.Limit

// In addition to rate limiting catchup scans, a semaphore is used to restrict
// catchup scan concurrency for regular range feeds (catchupSem is nil for mux
// rangefeed).
// This additional limit is necessary due to the fact that regular
// rangefeed may buffer up to 2MB of data (or 128KB if
// useDedicatedRangefeedConnectionClass set to true) per rangefeed stream in the
// http2/gRPC buffers -- making OOMs likely if the consumer does not consume
// events quickly enough. See
// https://github.com/cockroachdb/cockroach/issues/74219 for details.
// TODO(yevgeniy): Drop this once regular rangefeed gets deprecated.
catchupSemLimit int
catchupSem *limit.ConcurrentRequestLimiter
}

func newCatchupScanRateLimiter(sv *settings.Values) *catchupScanRateLimiter {
func newCatchupScanRateLimiter(sv *settings.Values, useMuxRangeFeed bool) *catchupScanRateLimiter {
const slowAcquisitionThreshold = 5 * time.Second
lim := getCatchupRateLimit(sv)
return &catchupScanRateLimiter{

rl := &catchupScanRateLimiter{
sv: sv,
limit: lim,
pacer: quotapool.NewRateLimiter(
"distSenderCatchupLimit", lim, 0, /* smooth rate limit without burst */
quotapool.OnSlowAcquisition(slowAcquisitionThreshold, logSlowCatchupScanAcquisition(slowAcquisitionThreshold))),
}

if !useMuxRangeFeed {
rl.catchupSemLimit = maxConcurrentCatchupScans(sv)
l := limit.MakeConcurrentRequestLimiter("distSenderCatchupLimit", rl.catchupSemLimit)
rl.catchupSem = &l
}

return rl
}

func maxConcurrentCatchupScans(sv *settings.Values) int {
l := catchupScanConcurrency.Get(sv)
if l == 0 {
return math.MaxInt
}
return int(l)
}

func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
Expand All @@ -1009,15 +1050,31 @@ func getCatchupRateLimit(sv *settings.Values) quotapool.Limit {
}

// Pace paces the catchup scan startup.
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) error {
func (rl *catchupScanRateLimiter) Pace(ctx context.Context) (limit.Reservation, error) {
// Take opportunity to update limits if they have changed.
if lim := getCatchupRateLimit(rl.sv); lim != rl.limit {
rl.limit = lim
rl.pacer.UpdateLimit(lim, 0 /* smooth rate limit without burst */)
}
return rl.pacer.WaitN(ctx, 1)

if err := rl.pacer.WaitN(ctx, 1); err != nil {
return nil, err
}

// Regular rangefeed, in addition to pacing also acquires catchup scan quota.
if rl.catchupSem != nil {
// Take opportunity to update limits if they have changed.
if lim := maxConcurrentCatchupScans(rl.sv); lim != rl.catchupSemLimit {
rl.catchupSem.SetLimit(lim)
}
return rl.catchupSem.Begin(ctx)
}

return catchupAlloc(releaseNothing), nil
}

func releaseNothing() {}

// logSlowCatchupScanAcquisition is a function returning a quotapool.SlowAcquisitionFunction.
// It differs from the quotapool.LogSlowAcquisition in that only some of slow acquisition
// events are logged to reduce log spam.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvnemesis/kvnemesis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ import (
"github.com/stretchr/testify/require"
)

var defaultNumSteps = envutil.EnvOrDefaultInt("COCKROACH_KVNEMESIS_STEPS", 50)
var defaultNumSteps = envutil.EnvOrDefaultInt("COCKROACH_KVNEMESIS_STEPS", 100)

func (cfg kvnemesisTestCfg) testClusterArgs(tr *SeqTracker) base.TestClusterArgs {
storeKnobs := &kvserver.StoreTestingKnobs{
Expand Down
1 change: 0 additions & 1 deletion pkg/settings/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,6 @@ var retiredSettings = map[InternalKey]struct{}{
"jobs.trace.force_dump_mode": {},
"timeseries.storage.30m_resolution_ttl": {},
"server.cpu_profile.enabled": {},
"kv.rangefeed.catchup_scan_concurrency": {},
"changefeed.lagging_ranges_threshold": {},
"changefeed.lagging_ranges_polling_rate": {},
"trace.jaeger.agent": {},
Expand Down
9 changes: 1 addition & 8 deletions pkg/sql/colfetcher/cfetcher_wrapper.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/cockroach/pkg/util/buildutil"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
Expand All @@ -41,13 +40,7 @@ var DirectScansEnabled = settings.RegisterBoolSetting(
settings.ApplicationLevel,
"sql.distsql.direct_columnar_scans.enabled",
"set to true to enable the 'direct' columnar scans in the KV layer",
directScansEnabledDefault,
)

var directScansEnabledDefault = util.ConstantWithMetamorphicTestBool(
"direct-scans-enabled",
// TODO(yuzefovich, 23.1): update the default to 'true' for multi-tenant
// setups.
// TODO(yuzefovich): make this metamorphic constant once #113816 is fixed.
false,
)

Expand Down

0 comments on commit 9743b28

Please sign in to comment.