Skip to content

Commit

Permalink
Merge #135563
Browse files Browse the repository at this point in the history
135563: sql/backfill: Implement retry mechanism during index backfill r=spilchen a=spilchen

During an index backfill, if the bulko.index_backfill.batch_size setting is not appropriately configured for a table's size or definition, it may consume all available memory before writing the new index entries. This change introduces a retry mechanism to handle out-of-memory scenarios.

Upon encountering memory issues, the batch size is halved on each retry, coupled with an exponential backoff. This backoff period allows the consumer of the index entries to free up memory. The retry mechanism reuses the same bound memory account, making it critical to accurately track memory usage, even during failed attempts. This ensures proper accounting and frees memory consumed during the failed operation.

Epic: CRDB-37796
Closes #130939
Closes #132048
Release note (bug fix): The schema changer's backfill process now includes a retry mechanism that reduces the batch size when memory issues occur. This improves the likelihood of operation success without requiring manual adjustment of the bulko.index_backfill.batch_size parameter.

Co-authored-by: Matt Spilchen <[email protected]>
  • Loading branch information
craig[bot] and spilchen committed Nov 20, 2024
2 parents 8d089dd + 2bf463b commit 354444d
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 27 deletions.
2 changes: 0 additions & 2 deletions pkg/cmd/roachtest/tests/perturbation/index_backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ var _ perturbation = backfill{}

func (b backfill) setup() variations {
v := setup(b, 40.0)
// TODO(#130939): Allow the backfill to complete, without this it can hang indefinitely.
v.clusterSettings["bulkio.index_backfill.batch_size"] = "5000"
return v
}

Expand Down
29 changes: 15 additions & 14 deletions pkg/sql/backfill/backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,9 +795,9 @@ func (ib *IndexBackfiller) init(
// provided, and builds all the added indexes.
// The method accounts for the memory used by the index entries for this chunk
// using the memory monitor associated with ib and returns the amount of memory
// that needs to be freed once the returned IndexEntry slice is freed.
// It is the callers responsibility to clear the associated bound account when
// appropriate.
// that needs to be freed once the returned IndexEntry slice is freed. This is
// returned for the successful and failure cases. It is the callers responsibility
// to clear the associated bound account when appropriate.
func (ib *IndexBackfiller) BuildIndexEntriesChunk(
ctx context.Context,
txn *kv.Txn,
Expand Down Expand Up @@ -846,7 +846,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
if err := rowenc.InitIndexFetchSpec(
&spec, ib.evalCtx.Codec, tableDesc, tableDesc.GetPrimaryIndex(), fetcherCols,
); err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
var fetcher row.Fetcher
if err := fetcher.Init(
Expand All @@ -860,7 +860,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
ForceProductionKVBatchSize: ib.evalCtx.TestingKnobs.ForceProductionValues,
},
); err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
defer fetcher.Close(ctx)
if err := fetcher.StartScan(
Expand All @@ -869,7 +869,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
initBufferSize,
); err != nil {
log.Errorf(ctx, "scan error: %s", err)
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}

iv := &schemaexpr.RowIndexedVarContainer{
Expand All @@ -880,7 +880,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(

indexEntriesPerRowInitialBufferSize := int64(len(ib.added)) * sizeOfIndexEntry
if err := ib.GrowBoundAccount(ctx, indexEntriesPerRowInitialBufferSize); err != nil {
return nil, nil, 0, errors.Wrap(err,
return nil, nil, memUsedPerChunk, errors.Wrap(err,
"failed to initialize empty buffer to store the index entries of a single row")
}
memUsedPerChunk += indexEntriesPerRowInitialBufferSize
Expand Down Expand Up @@ -920,7 +920,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
for i := int64(0); i < chunkSize; i++ {
ok, err := fetcher.NextRowDecodedInto(ctx, ib.rowVals, ib.colIdxMap)
if err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
if !ok {
break
Expand All @@ -931,10 +931,10 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
// may reference default values.
if len(ib.colExprs) > 0 {
if err := evaluateExprs(ib.addedCols); err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
if err := evaluateExprs(ib.computedCols); err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
}

Expand All @@ -956,7 +956,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(

val, err := eval.Expr(ctx, ib.evalCtx, texpr)
if err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}

if val == tree.DBoolTrue {
Expand Down Expand Up @@ -989,10 +989,11 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
&ib.muBoundAccount.boundAccount,
)
}(buffer)
// Account for memory use prior to error checking
memUsedPerChunk += memUsedDuringEncoding
if err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
memUsedPerChunk += memUsedDuringEncoding

// The memory monitor has already accounted for cap(entries). If the number
// of index entries are going to cause the entries buffer to re-slice, then
Expand All @@ -1001,7 +1002,7 @@ func (ib *IndexBackfiller) BuildIndexEntriesChunk(
if cap(entries)-len(entries) < len(buffer) {
resliceSize := sizeOfIndexEntry * int64(cap(entries))
if err := ib.GrowBoundAccount(ctx, resliceSize); err != nil {
return nil, nil, 0, err
return nil, nil, memUsedPerChunk, err
}
memUsedPerChunk += resliceSize
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/sql/rowenc/index_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -1629,7 +1629,7 @@ func EncodeSecondaryIndexes(
if cap(secondaryIndexEntries)-len(secondaryIndexEntries) < len(entries) {
resliceSize := sizeOfIndexEntry * int64(cap(secondaryIndexEntries))
if err := indexBoundAccount.Grow(ctx, resliceSize); err != nil {
return nil, 0, errors.Wrap(err,
return nil, memUsedEncodingSecondaryIdxs, errors.Wrap(err,
"failed to re-slice index entries buffer")
}
memUsedEncodingSecondaryIdxs += resliceSize
Expand All @@ -1641,11 +1641,11 @@ func EncodeSecondaryIndexes(
// non-trivial.
for _, index := range entries {
if err := indexBoundAccount.Grow(ctx, int64(len(index.Key))); err != nil {
return nil, 0, errors.Wrap(err, "failed to allocate space for index keys")
return nil, memUsedEncodingSecondaryIdxs, errors.Wrap(err, "failed to allocate space for index keys")
}
memUsedEncodingSecondaryIdxs += int64(len(index.Key))
if err := indexBoundAccount.Grow(ctx, int64(len(index.Value.RawBytes))); err != nil {
return nil, 0, errors.Wrap(err, "failed to allocate space for index values")
return nil, memUsedEncodingSecondaryIdxs, errors.Wrap(err, "failed to allocate space for index values")
}
memUsedEncodingSecondaryIdxs += int64(len(index.Value.RawBytes))
}
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/rowexec/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ go_library(
"//pkg/util/optional",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/stringarena",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
Expand All @@ -123,6 +124,7 @@ go_test(
"distinct_test.go",
"filterer_test.go",
"hashjoiner_test.go",
"indexbackfiller_test.go",
"inverted_expr_evaluator_test.go",
"inverted_filterer_test.go",
"inverted_joiner_test.go",
Expand Down Expand Up @@ -191,6 +193,7 @@ go_test(
"//pkg/sql/sem/eval",
"//pkg/sql/sem/tree",
"//pkg/sql/span",
"//pkg/sql/sqlerrors",
"//pkg/sql/stats",
"//pkg/sql/types",
"//pkg/storage",
Expand All @@ -209,6 +212,7 @@ go_test(
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/randutil",
"//pkg/util/retry",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/tracing",
Expand Down
90 changes: 82 additions & 8 deletions pkg/sql/rowexec/indexbackfiller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/ctxgroup"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
Expand Down Expand Up @@ -396,30 +398,64 @@ func (ib *indexBackfiller) buildIndexEntryBatch(
tctx context.Context, sp roachpb.Span, readAsOf hlc.Timestamp,
) (roachpb.Key, []rowenc.IndexEntry, int64, error) {
knobs := &ib.flowCtx.Cfg.TestingKnobs
var memUsedBuildingBatch int64
if knobs.RunBeforeBackfillChunk != nil {
if err := knobs.RunBeforeBackfillChunk(sp); err != nil {
return nil, nil, 0, err
}
}
var key roachpb.Key

ctx, traceSpan := tracing.ChildSpan(tctx, "indexBatch")
defer traceSpan.Finish()
start := timeutil.Now()
var memUsedBuildingBatch int64
var key roachpb.Key
var entries []rowenc.IndexEntry
if err := ib.flowCtx.Cfg.DB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {

br := indexBatchRetry{
nextChunkSize: ib.spec.ChunkSize,
// Memory used while building index entries is released by another goroutine
// once those entries are processed. However, with wide rows and/or limited
// memory, memory pressure issues can arise. To address this, we retry with
// a smaller chunk size after an exponential backoff. Although the maximum
// wait time between retries may seem lengthy, it is significantly faster
// than allowing the entire schema operation to fail and restart.
retryOpts: retry.Options{
InitialBackoff: 500 * time.Millisecond,
Multiplier: 2,
MaxRetries: 15,
MaxBackoff: 1 * time.Minute,
},
resetForNextAttempt: func(ctx context.Context) error {
if entries != nil {
return errors.AssertionFailedf("expected entries to be nil on error")
}
// There is no need to track the memory we allocated in the last failed
// attempt. We will allocate new memory on the next iteration.
if memUsedBuildingBatch > 0 {
ib.ShrinkBoundAccount(ctx, memUsedBuildingBatch)
memUsedBuildingBatch = 0
}
return nil
},
}
br.buildIndexChunk = func(ctx context.Context, txn isql.Txn) error {
if err := txn.KV().SetFixedTimestamp(ctx, readAsOf); err != nil {
return err
}
// If this is a retry that succeeds, save the smaller chunk size to use as
// the starting size for the next batch. If memory pressure occurred with a
// larger batch size, it's prudent not to revert to it for subsequent batches.
ib.spec.ChunkSize = br.nextChunkSize

// TODO(knz): do KV tracing in DistSQL processors.
var err error
entries, key, memUsedBuildingBatch, err = ib.BuildIndexEntriesChunk(
ctx, txn.KV(), ib.desc, sp, ib.spec.ChunkSize, false, /* traceKV */
ctx, txn.KV(), ib.desc, sp, br.nextChunkSize, false, /* traceKV */
)
return err
}); err != nil {
}

ctx, traceSpan := tracing.ChildSpan(tctx, "indexBatch")
defer traceSpan.Finish()
start := timeutil.Now()
if err := br.buildBatchWithRetry(ctx, ib.flowCtx.Cfg.DB); err != nil {
return nil, nil, 0, err
}
prepTime := timeutil.Since(start)
Expand All @@ -438,3 +474,41 @@ func (ib *indexBackfiller) Resume(output execinfra.RowReceiver) {
func (ib *indexBackfiller) Close(ctx context.Context) {
ib.IndexBackfiller.Close(ctx)
}

type indexBatchRetry struct {
nextChunkSize int64
buildIndexChunk func(ctx context.Context, txn isql.Txn) error
resetForNextAttempt func(ctx context.Context) error
retryOpts retry.Options
}

// buildBatchWithRetry constructs a batch of index entries with a retry mechanism
// to handle out-of-memory errors.
func (b *indexBatchRetry) buildBatchWithRetry(ctx context.Context, db isql.DB) error {
r := retry.StartWithCtx(ctx, b.retryOpts)
for {
if err := db.Txn(ctx, b.buildIndexChunk); err != nil {
// Retry for any out of memory error. We want to wait for the goroutine
// that processes the prior batches of index entries to free memory.
if sqlerrors.IsOutOfMemoryError(err) && b.nextChunkSize > 1 {
// Callback to clear out any state acquired in the last attempt
if resetErr := b.resetForNextAttempt(ctx); resetErr != nil {
return errors.CombineErrors(err, resetErr)
}

if !r.Next() {
// If we have exhausted all retries, fail with the out of memory error.
return errors.Wrapf(err, "failed after %d retries", r.CurrentAttempt())
}
b.nextChunkSize = max(1, b.nextChunkSize/2)
log.Infof(ctx,
"out of memory while building index entries; retrying with batch size %d. Silencing error: %v",
b.nextChunkSize, err)
continue
}
return err
}
break // Batch completed successfully, no need for a retry.
}
return nil
}
81 changes: 81 additions & 0 deletions pkg/sql/rowexec/indexbackfiller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package rowexec

import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlerrors"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
"github.com/stretchr/testify/require"
)

func TestRetryOfIndexEntryBatch(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
srv := serverutils.StartServerOnly(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
db := srv.SystemLayer().InternalDB().(isql.DB)

const initialChunkSize int64 = 50000
oomErr := mon.NewMemoryBudgetExceededError(1, 1, 1)
nonOomErr := sqlerrors.NewUndefinedUserError(username.NodeUserName())

for _, tc := range []struct {
desc string
errs []error
retryErr error
expectedErr error
expectedChunkSize int64
}{
{"happy-path", nil, nil, nil, initialChunkSize},
{"retry-once", []error{oomErr}, nil, nil, initialChunkSize >> 1},
{"retry-then-fail", []error{oomErr, oomErr, nonOomErr}, nil, nonOomErr, initialChunkSize >> 2},
{"retry-exhaustive", []error{oomErr, oomErr, oomErr, oomErr}, nil, oomErr, initialChunkSize >> 3},
{"retry-error", []error{oomErr}, nonOomErr, oomErr, initialChunkSize},
} {
t.Run(tc.desc, func(t *testing.T) {
i := 0
br := indexBatchRetry{
nextChunkSize: initialChunkSize,
retryOpts: retry.Options{
InitialBackoff: 2 * time.Millisecond,
Multiplier: 2,
MaxRetries: 2,
MaxBackoff: 10 * time.Millisecond,
},
buildIndexChunk: func(ctx context.Context, txn isql.Txn) error {
if i < len(tc.errs) {
return tc.errs[i]
}
return nil
},
resetForNextAttempt: func(ctx context.Context) error {
i++
return tc.retryErr
},
}
err := br.buildBatchWithRetry(ctx, db)
if tc.expectedErr == nil {
require.NoError(t, err)
} else {
require.ErrorIs(t, err, tc.expectedErr)
}
require.Equal(t, tc.expectedChunkSize, br.nextChunkSize)
})
}
}

0 comments on commit 354444d

Please sign in to comment.