Skip to content

Commit

Permalink
storage: report all encountered intents in sst export error
Browse files Browse the repository at this point in the history
Previously, pebbleExportToSst was stopping upon encountering first
intent.

This was causing backups to be very slow if lots of intents build up.
To be able to proceed with export, intent needs to be resolved and
export retried. The result of this behaviour is that export would run
as many times as there were intents in the table before succeeding.

To address this, all intents from the range are collected and reported
in WriteIntentError. They could be resolved efficiently as batch
similar to how GC operates.

Release note (bug fix): Backup no longer resolves intents one by one.
This eliminates running a high pri query to cleanup intents to unblock
backup in case of intent buildup.
  • Loading branch information
aliher1911 committed May 4, 2021
1 parent e3327f0 commit 82e0fab
Show file tree
Hide file tree
Showing 6 changed files with 474 additions and 112 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/backupccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_test(
srcs = [
"backup_cloud_test.go",
"backup_destination_test.go",
"backup_intents_test.go",
"backup_test.go",
"bench_test.go",
"create_scheduled_backup_test.go",
Expand Down
109 changes: 109 additions & 0 deletions pkg/ccl/backupccl/backup_intents_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2016 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package backupccl_test

import (
"context"
"errors"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func rangeFromValue(from, size int) []interface{} {
result := make([]interface{}, size)
for i := 0; i < size; i++ {
result[i] = from + i
}
return result
}

func formatSlice(values []interface{}, format string) []string {
result := make([]string, len(values))
for i, v := range values {
result[i] = fmt.Sprintf(format, v)
}
return result
}

func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
defer utilccl.TestingEnableEnterprise()()

skip.UnderRace(t, "measures backup times not to regress, can't work under race")

// Time to cleanup intents differs roughly 10x so some arbitrary number is picked which is 2x higher
// than current backup time on current (laptop) hardware.
const testTimeout = time.Second * 10

const rowCount = 10000
const transactionRowCount = 1000
const statementRowCount = 100

// Interceptor catches requests that cleanup transactions of size 1000 which are
// test data transactions. All other transaction commits pass though.
interceptor := func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error {
if req.Txn == nil || req.Txn.Name != "sql txn" {
return nil
}
endTxn := req.Requests[0].GetEndTxn()
if endTxn == nil {
return nil
}
if endTxn.InFlightWrites != nil {
return nil
}
if len(endTxn.LockSpans) != transactionRowCount {
return nil
}
return roachpb.NewError(errors.New("blocking intent cleanup for our transaction only"))
}
serverKnobs := kvserver.StoreTestingKnobs{TestingRequestFilter: interceptor}

s, sqlDb, _ := serverutils.StartServer(t,
base.TestServerArgs{Knobs: base.TestingKnobs{Store: &serverKnobs}})
defer s.Stopper().Stop(context.Background())

_, err := sqlDb.Exec("create table foo(v int)")
require.NoError(t, err)

placeholders := formatSlice(rangeFromValue(1, statementRowCount), "($%d)")
prepStatement := fmt.Sprintf("insert into foo (v) values %s", strings.Join(placeholders, ", "))

for i := 0; i < rowCount; {
tx, err := sqlDb.Begin()
require.NoError(t, err)
ps, err := tx.Prepare(prepStatement)
require.NoError(t, err)
for j := 0; j < transactionRowCount; j += statementRowCount {
_, err = ps.Exec(rangeFromValue(i, statementRowCount)...)
require.NoError(t, err)
i += statementRowCount
}
require.NoError(t, tx.Rollback())
}

start := timeutil.Now()
_, err = sqlDb.Exec("backup table foo to 'userfile:///test.foo'")
require.NoError(t, err, "Failed to run backup")
require.WithinDuration(t, timeutil.Now(), start, testTimeout, "Time to make backup")
}
85 changes: 64 additions & 21 deletions pkg/storage/mvcc_incremental_iterator.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ type MVCCIncrementalIterator struct {
// For allocation avoidance, meta is used to store the timestamp of keys
// regardless if they are metakeys.
meta enginepb.MVCCMetadata

// Intent aggregation options.
// Configuration passed in MVCCIncrementalIterOptions.
enableWriteIntentAggregation bool
// Optional collection of intents created on demand when first intent encountered.
intents []roachpb.Intent
}

var _ SimpleMVCCIterator = &MVCCIncrementalIterator{}
Expand All @@ -100,6 +106,12 @@ type MVCCIncrementalIterOptions struct {
// time.
StartTime hlc.Timestamp
EndTime hlc.Timestamp
// If intent aggregation is enabled, iterator will not fail on first encountered
// intent, but will proceed further. All found intents will be aggregated into
// a single WriteIntentError which would be updated during iteration. Consumer
// would be free to decide if it wants to keep collecting entries and intents or
// skip entries.
EnableWriteIntentAggregation bool
}

// NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the
Expand Down Expand Up @@ -135,10 +147,11 @@ func NewMVCCIncrementalIterator(
}

return &MVCCIncrementalIterator{
iter: iter,
startTime: opts.StartTime,
endTime: opts.EndTime,
timeBoundIter: timeBoundIter,
iter: iter,
startTime: opts.StartTime,
endTime: opts.EndTime,
timeBoundIter: timeBoundIter,
enableWriteIntentAggregation: opts.EnableWriteIntentAggregation,
}
}

Expand Down Expand Up @@ -185,22 +198,30 @@ func (i *MVCCIncrementalIterator) Close() {
// key.
func (i *MVCCIncrementalIterator) Next() {
i.iter.Next()
if !i.checkValidAndSaveErr() {
return
}
i.advance()
}

// checkValidAndSaveErr checks if the underlying iter is valid after the operation
// and saves the error and validity state. Returns true if the underlying iterator
// is valid.
func (i *MVCCIncrementalIterator) checkValidAndSaveErr() bool {
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
return
return false
}
i.advance()
return true
}

// NextKey advances the iterator to the next key. This operation is distinct
// from Next which advances to the next version of the current key or the next
// key if the iterator is currently located at the last version for a key.
func (i *MVCCIncrementalIterator) NextKey() {
i.iter.NextKey()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
i.advance()
Expand Down Expand Up @@ -265,9 +286,7 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() {
// expensive than a Next call.
seekKey := MakeMVCCMetadataKey(tbiKey)
i.iter.SeekGE(seekKey)
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
}
Expand Down Expand Up @@ -313,13 +332,20 @@ func (i *MVCCIncrementalIterator) initMetaAndCheckForIntentOrInlineError() error
}

if i.startTime.Less(metaTimestamp) && metaTimestamp.LessEq(i.endTime) {
i.err = &roachpb.WriteIntentError{
Intents: []roachpb.Intent{
roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key),
},
if !i.enableWriteIntentAggregation {
// If we don't plan to collect intents for resolving, we bail out here with a single intent.
i.err = &roachpb.WriteIntentError{
Intents: []roachpb.Intent{
roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key),
},
}
i.valid = false
return i.err
}
i.valid = false
return i.err
// We are collecting intents, so we need to save it and advance to its proposed value.
// Caller could then use a value key to update proposed row counters for the sake of bookkeeping
// and advance more.
i.intents = append(i.intents, roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key))
}
return nil
}
Expand All @@ -345,9 +371,7 @@ func (i *MVCCIncrementalIterator) advance() {
// the next valid KV.
if i.meta.Txn != nil {
i.iter.Next()
if ok, err := i.iter.Valid(); !ok {
i.err = err
i.valid = false
if !i.checkValidAndSaveErr() {
return
}
continue
Expand Down Expand Up @@ -435,3 +459,22 @@ func (i *MVCCIncrementalIterator) NextIgnoringTime() {
return
}
}

// NumCollectedIntents returns number of intents encountered during iteration.
// This is only the case when intent aggregation is enabled, otherwise it is
// always 0.
func (i *MVCCIncrementalIterator) NumCollectedIntents() int {
return len(i.intents)
}

// GetIntentError returns (roachpb.WriteIntentError, true) if intents were encountered
// during iteration and intent aggregation is enabled. Otherwise function returns (nil, false).
// roachpb.WriteIntentError will contain all encountered intents.
func (i *MVCCIncrementalIterator) GetIntentError() (bool, error) {
if len(i.intents) == 0 {
return false, nil
}
return true, &roachpb.WriteIntentError{
Intents: i.intents,
}
}
Loading

0 comments on commit 82e0fab

Please sign in to comment.