diff --git a/pkg/ccl/backupccl/BUILD.bazel b/pkg/ccl/backupccl/BUILD.bazel index 8d7fefc04b7e..ce2c66460e8d 100644 --- a/pkg/ccl/backupccl/BUILD.bazel +++ b/pkg/ccl/backupccl/BUILD.bazel @@ -116,6 +116,7 @@ go_test( srcs = [ "backup_cloud_test.go", "backup_destination_test.go", + "backup_intents_test.go", "backup_test.go", "bench_test.go", "create_scheduled_backup_test.go", diff --git a/pkg/ccl/backupccl/backup_intents_test.go b/pkg/ccl/backupccl/backup_intents_test.go new file mode 100644 index 000000000000..6fd1bb193ea8 --- /dev/null +++ b/pkg/ccl/backupccl/backup_intents_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + + skip.UnderRace(t, "measures backup times not to regress, can't work under race") + + // Time to create backup in presence of intents differs roughly 10x so some + // arbitrary number is picked which is 2x higher than current backup time on + // current (laptop) hardware. + const backupTimeout = time.Second * 10 + + const totalRowCount = 10000 + const perTransactionRowCount = 10 + + // Interceptor catches requests that cleanup transactions of size 1000 which are + // test data transactions. All other transaction commits pass though. + interceptor := func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + endTxn := req.Requests[0].GetEndTxn() + if endTxn != nil && !endTxn.Commit && len(endTxn.LockSpans) == perTransactionRowCount { + // If this is a rollback of one the test's SQL transactions, allow the + // EndTxn to proceed and mark the transaction record as ABORTED, but strip + // the request of its lock spans so that no intents are recorded into the + // transaction record or eagerly resolved. This is a bit of a hack, but it + // mimics the behavior of an abandoned transaction which is aborted by a + // pusher after expiring due to an absence of heartbeats. + endTxn.LockSpans = nil + } + return nil + } + serverKnobs := kvserver.StoreTestingKnobs{TestingRequestFilter: interceptor} + + s, sqlDb, _ := serverutils.StartServer(t, + base.TestServerArgs{Knobs: base.TestingKnobs{Store: &serverKnobs}}) + defer s.Stopper().Stop(context.Background()) + + _, err := sqlDb.Exec("create table foo(v int not null)") + require.NoError(t, err) + + for i := 0; i < totalRowCount; i += perTransactionRowCount { + tx, err := sqlDb.Begin() + require.NoError(t, err) + for j := 0; j < perTransactionRowCount; j += 1 { + statement := fmt.Sprintf("insert into foo (v) values (%d)", i+j) + _, err = tx.Exec(statement) + require.NoError(t, err) + } + require.NoError(t, tx.Rollback()) + } + + start := timeutil.Now() + _, err = sqlDb.Exec("backup table foo to 'userfile:///test.foo'") + stop := timeutil.Now() + require.NoError(t, err, "Failed to run backup") + t.Logf("Backup took %s", stop.Sub(start)) + require.WithinDuration(t, stop, start, backupTimeout, "Time to make backup") +} diff --git a/pkg/storage/mvcc_incremental_iterator.go b/pkg/storage/mvcc_incremental_iterator.go index 1186e4c16429..099827dc5209 100644 --- a/pkg/storage/mvcc_incremental_iterator.go +++ b/pkg/storage/mvcc_incremental_iterator.go @@ -86,6 +86,12 @@ type MVCCIncrementalIterator struct { // For allocation avoidance, meta is used to store the timestamp of keys // regardless if they are metakeys. meta enginepb.MVCCMetadata + + // Intent aggregation options. + // Configuration passed in MVCCIncrementalIterOptions. + enableWriteIntentAggregation bool + // Optional collection of intents created on demand when first intent encountered. + intents []roachpb.Intent } var _ SimpleMVCCIterator = &MVCCIncrementalIterator{} @@ -100,6 +106,12 @@ type MVCCIncrementalIterOptions struct { // time. StartTime hlc.Timestamp EndTime hlc.Timestamp + // If intent aggregation is enabled, iterator will not fail on first encountered + // intent, but will proceed further. All found intents will be aggregated into + // a single WriteIntentError which would be updated during iteration. Consumer + // would be free to decide if it wants to keep collecting entries and intents or + // skip entries. + EnableWriteIntentAggregation bool } // NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the @@ -135,10 +147,11 @@ func NewMVCCIncrementalIterator( } return &MVCCIncrementalIterator{ - iter: iter, - startTime: opts.StartTime, - endTime: opts.EndTime, - timeBoundIter: timeBoundIter, + iter: iter, + startTime: opts.StartTime, + endTime: opts.EndTime, + timeBoundIter: timeBoundIter, + enableWriteIntentAggregation: opts.EnableWriteIntentAggregation, } } @@ -185,12 +198,22 @@ func (i *MVCCIncrementalIterator) Close() { // key. func (i *MVCCIncrementalIterator) Next() { i.iter.Next() + if !i.checkValidAndSaveErr() { + return + } + i.advance() +} + +// checkValidAndSaveErr checks if the underlying iter is valid after the operation +// and saves the error and validity state. Returns true if the underlying iterator +// is valid. +func (i *MVCCIncrementalIterator) checkValidAndSaveErr() bool { if ok, err := i.iter.Valid(); !ok { i.err = err i.valid = false - return + return false } - i.advance() + return true } // NextKey advances the iterator to the next key. This operation is distinct @@ -198,9 +221,7 @@ func (i *MVCCIncrementalIterator) Next() { // key if the iterator is currently located at the last version for a key. func (i *MVCCIncrementalIterator) NextKey() { i.iter.NextKey() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } i.advance() @@ -265,9 +286,7 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() { // expensive than a Next call. seekKey := MakeMVCCMetadataKey(tbiKey) i.iter.SeekGE(seekKey) - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } } @@ -313,13 +332,20 @@ func (i *MVCCIncrementalIterator) initMetaAndCheckForIntentOrInlineError() error } if i.startTime.Less(metaTimestamp) && metaTimestamp.LessEq(i.endTime) { - i.err = &roachpb.WriteIntentError{ - Intents: []roachpb.Intent{ - roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), - }, + if !i.enableWriteIntentAggregation { + // If we don't plan to collect intents for resolving, we bail out here with a single intent. + i.err = &roachpb.WriteIntentError{ + Intents: []roachpb.Intent{ + roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), + }, + } + i.valid = false + return i.err } - i.valid = false - return i.err + // We are collecting intents, so we need to save it and advance to its proposed value. + // Caller could then use a value key to update proposed row counters for the sake of bookkeeping + // and advance more. + i.intents = append(i.intents, roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key)) } return nil } @@ -345,9 +371,7 @@ func (i *MVCCIncrementalIterator) advance() { // the next valid KV. if i.meta.Txn != nil { i.iter.Next() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } continue @@ -435,3 +459,22 @@ func (i *MVCCIncrementalIterator) NextIgnoringTime() { return } } + +// NumCollectedIntents returns number of intents encountered during iteration. +// This is only the case when intent aggregation is enabled, otherwise it is +// always 0. +func (i *MVCCIncrementalIterator) NumCollectedIntents() int { + return len(i.intents) +} + +// TryGetIntentError returns roachpb.WriteIntentError if intents were encountered +// during iteration and intent aggregation is enabled. Otherwise function +// returns nil. roachpb.WriteIntentError will contain all encountered intents. +func (i *MVCCIncrementalIterator) TryGetIntentError() error { + if len(i.intents) == 0 { + return nil + } + return &roachpb.WriteIntentError{ + Intents: i.intents, + } +} diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index f15d9656a035..6a9698c1977b 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -39,31 +39,133 @@ func iterateExpectErr( startKey, endKey roachpb.Key, startTime, endTime hlc.Timestamp, revisions bool, - errString string, + intents []roachpb.Intent, ) func(*testing.T) { return func(t *testing.T) { t.Helper() - iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ - EndKey: endKey, - StartTime: startTime, - EndTime: endTime, + t.Run("aggregate-intents", func(t *testing.T) { + assertExpectErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents) }) - defer iter.Close() - var iterFn func() - if revisions { - iterFn = iter.Next - } else { - iterFn = iter.NextKey + t.Run("first-intent", func(t *testing.T) { + assertExpectErr(t, e, startKey, endKey, startTime, endTime, revisions, intents[0]) + }) + t.Run("export-intents", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents, false) + }) + t.Run("export-intents-tbi", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents, true) + }) + } +} + +func assertExpectErr( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntent roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: startTime, + EndTime: endTime, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break } - for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { - if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { - break + // pass + } + + _, err := iter.Valid() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + if !expectedIntent.Key.Equal(intentErr.Intents[0].Key) { + t.Fatalf("Expected intent key %v, but got %v", expectedIntent.Key, intentErr.Intents[0].Key) + } + } else { + t.Fatalf("expected error with intent %v but got %v", expectedIntent, err) + } +} + +func assertExpectErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntents []roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + EndKey: endKey, + StartTime: startTime, + EndTime: endTime, + EnableWriteIntentAggregation: true, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + // pass + } + + if iter.NumCollectedIntents() != len(expectedIntents) { + t.Fatalf("Expected %d intents but found %d", len(expectedIntents), iter.NumCollectedIntents()) + } + err := iter.TryGetIntentError() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) } - // pass } - if _, err := iter.Valid(); !testutils.IsError(err, errString) { - t.Fatalf("expected error %q but got %v", errString, err) + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) + } +} + +func assertExportedErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntents []roachpb.Intent, + useTBI bool, +) { + const big = 1 << 30 + _, _, _, err := e.ExportMVCCToSst(startKey, endKey, startTime, endTime, revisions, big, big, + useTBI) + require.Error(t, err) + + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) + } } + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) } } @@ -195,6 +297,7 @@ func assertIteratedKVs( EnableTimeBoundIteratorOptimization: useTBI, StartTime: startTime, EndTime: endTime, + EnableWriteIntentAggregation: true, }) defer iter.Close() var iterFn func() @@ -210,6 +313,9 @@ func assertIteratedKVs( } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { break } + if iter.NumCollectedIntents() > 0 { + t.Fatal("got unexpected intent error") + } kvs = append(kvs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) } @@ -427,16 +533,33 @@ func TestMVCCIncrementalIterator(t *testing.T) { makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue { return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value} } + makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp) (roachpb.Transaction, roachpb.Value, roachpb.Intent) { + txnID := uuid.MakeV4() + txnMeta := enginepb.TxnMeta{ + Key: key, + ID: txnID, + Epoch: 1, + WriteTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + }, roachpb.Value{ + RawBytes: val, + }, roachpb.MakeIntent(&txnMeta, key) + } + intents := func(intents ...roachpb.Intent) []roachpb.Intent { return intents } + // Keys are named as kv__. kv1_1_1 := makeKVT(testKey1, testValue1, ts1) kv1_4_4 := makeKVT(testKey1, testValue4, ts4) kv1_2_2 := makeKVT(testKey1, testValue2, ts2) kv2_2_2 := makeKVT(testKey2, testValue3, ts2) - kv1_3Deleted := makeKVT(testKey1, nil, ts3) + kv1Deleted3 := makeKVT(testKey1, nil, ts3) kvs := func(kvs ...MVCCKeyValue) []MVCCKeyValue { return kvs } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-latest", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -465,49 +588,29 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, latest, kvs(kv1_3Deleted, kv2_2_2))) + t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, latest, kvs(kv1Deleted3, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, localMax, keyMax, tsMin, ts4, latest, "conflicting intents")) + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr2))) + t.Run("intents-multi", + iterateExpectErr(e, localMax, keyMax, tsMin, ts4, latest, intents(intentErr1, intentErr2))) // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, latest, kvs(kv1_3Deleted, kv2_2_2))) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, latest, kvs())) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, latest, kvs())) + t.Run("intents-filtered-1", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, latest, kvs(kv1Deleted3, kv2_2_2))) + t.Run("intents-filtered-2", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, latest, kvs())) + t.Run("intents-filtered-3", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, latest, kvs())) intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1}) intent1.Status = roachpb.COMMITTED @@ -519,12 +622,12 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) }) } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-all", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -553,49 +656,30 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, all, kvs(kv1_3Deleted, kv1_2_2, kv2_2_2))) + t.Run("del", assertEqualKVs(e, localMax, keyMax, ts1, tsMax, all, kvs(kv1Deleted3, kv1_2_2, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, localMax, keyMax, tsMin, ts4, all, "conflicting intents")) + // Single intent tests are verifying behavior when intent collection is not enabled. + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, intents(intentErr2))) + t.Run("intents-multi", + iterateExpectErr(e, localMax, keyMax, tsMin, ts4, all, intents(intentErr1, intentErr2))) // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, all, kvs(kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, all, kvs())) - t.Run("intents", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, all, kvs())) + t.Run("intents-filtered-1", assertEqualKVs(e, localMax, keyMax, tsMin, ts3, all, kvs(kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents-filtered-2", assertEqualKVs(e, localMax, keyMax, ts4, tsMax, all, kvs())) + t.Run("intents-filtered-3", assertEqualKVs(e, localMax, keyMax, ts4.Next(), tsMax, all, kvs())) intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1}) intent1.Status = roachpb.COMMITTED @@ -607,7 +691,7 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, localMax, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) }) } } diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 4a500d9d19e0..40cfa24cc0b8 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -47,6 +47,12 @@ import ( const ( maxSyncDurationFatalOnExceededDefault = true + + // Default value for maximum number of intents reported by ExportToSST + // in WriteIntentError is set to half of the maximum lock table size. + // This value is subject to tuning in real environment as we have more + // data available. + maxIntentsPerSstExportErrorDefault = 5000 ) // Default for MaxSyncDuration below. @@ -69,6 +75,11 @@ var MaxSyncDurationFatalOnExceeded = settings.RegisterBoolSetting( maxSyncDurationFatalOnExceededDefault, ) +var maxIntentsPerSstExportError = settings.RegisterIntSetting( + "storage.sst_export.max_intents_per_error", + "maximum number of intents returned in error when sst export fails", + maxIntentsPerSstExportErrorDefault) + // EngineKeyCompare compares cockroach keys, including the version (which // could be MVCC timestamps). func EngineKeyCompare(a, b []byte) int { @@ -655,8 +666,9 @@ func (p *Pebble) ExportMVCCToSst( ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) b, summary, k, err := pebbleExportToSst(r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, - maxSize, useTBI) + maxSize, useTBI, maxIntentCount) r.Free() return b, summary, k, err } @@ -1087,6 +1099,7 @@ func (p *Pebble) NewUnindexedBatch(writeOnly bool) Batch { func (p *Pebble) NewSnapshot() Reader { return &pebbleSnapshot{ snapshot: p.db.NewSnapshot(), + settings: p.settings, } } @@ -1324,8 +1337,9 @@ func (p *pebbleReadOnly) ExportMVCCToSst( ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.parent.settings.SV) b, summary, k, err := pebbleExportToSst( - r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI) + r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, maxIntentCount) r.Free() return b, summary, k, err } @@ -1560,6 +1574,7 @@ func (p *pebbleReadOnly) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO // pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot(). type pebbleSnapshot struct { snapshot *pebble.Snapshot + settings *cluster.Settings closed bool } @@ -1586,8 +1601,9 @@ func (p *pebbleSnapshot) ExportMVCCToSst( ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { r := wrapReader(p) // Doing defer r.Free() does not inline. + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) b, summary, k, err := pebbleExportToSst( - r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI) + r, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, useTBI, maxIntentCount) r.Free() return b, summary, k, err } @@ -1694,6 +1710,7 @@ func pebbleExportToSst( exportAllRevisions bool, targetSize, maxSize uint64, useTBI bool, + maxIntentCount int64, ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { sstFile := &MemFile{} sstWriter := MakeBackupSSTWriter(sstFile) @@ -1707,6 +1724,7 @@ func pebbleExportToSst( EnableTimeBoundIteratorOptimization: useTBI, StartTime: startTS, EndTime: endTS, + EnableWriteIntentAggregation: true, }) defer iter.Close() var curKey roachpb.Key // only used if exportAllRevisions @@ -1715,8 +1733,8 @@ func pebbleExportToSst( for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; { ok, err := iter.Valid() if err != nil { - // The error may be a WriteIntentError. In which case, returning it will - // cause this command to be retried. + // This is an underlying iterator error, return it to the caller to deal + // with. return nil, roachpb.BulkOpSummary{}, nil, err } if !ok { @@ -1726,6 +1744,11 @@ func pebbleExportToSst( if unsafeKey.Key.Compare(endKey) >= 0 { break } + + if iter.NumCollectedIntents() > 0 { + break + } + unsafeValue := iter.UnsafeValue() isNewKey := !exportAllRevisions || !unsafeKey.Key.Equal(curKey) if paginated && exportAllRevisions && isNewKey { @@ -1772,6 +1795,23 @@ func pebbleExportToSst( } } + // First check if we encountered an intent while iterating the data. + // If we do it means this export can't complete and is aborted. We need to loop over remaining data + // to collect all matching intents before returning them in an error to the caller. + if iter.NumCollectedIntents() > 0 { + for int64(iter.NumCollectedIntents()) < maxIntentCount { + iter.NextKey() + // If we encounter other errors during intent collection, we return our original write intent failure. + // We would find this new error again upon retry. + ok, _ := iter.Valid() + if !ok { + break + } + } + err := iter.TryGetIntentError() + return nil, roachpb.BulkOpSummary{}, nil, err + } + if rows.BulkOpSummary.DataSize == 0 { // If no records were added to the sstable, skip completing it and return a // nil slice – the export code will discard it anyway (based on 0 DataSize). diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index 8e8d4a2d9e2f..7155559b1f49 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -32,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" "github.com/stretchr/testify/require" ) @@ -542,3 +543,90 @@ func BenchmarkMVCCKeyCompare(b *testing.B) { fmt.Fprint(ioutil.Discard, c) } } + +type testValue struct { + key roachpb.Key + value roachpb.Value + timestamp hlc.Timestamp + txn *roachpb.Transaction +} + +func intent(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + tx := roachpb.MakeTransaction(fmt.Sprintf("txn-%v", key), key, roachpb.NormalUserPriority, ts, 1000) + var txn = &tx + return testValue{key, value, ts, txn} +} + +func value(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + return testValue{key, value, ts, nil} +} + +func fillInData(ctx context.Context, engine Engine, data []testValue) error { + batch := engine.NewBatch() + for _, val := range data { + if err := MVCCPut(ctx, batch, nil, val.key, val.timestamp, val.value, val.txn); err != nil { + return err + } + } + return batch.Commit(true) +} + +func ts(ts int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: ts} +} + +func key(k int) roachpb.Key { + return []byte(fmt.Sprintf("%05d", k)) +} + +func requireTxnForValue(t *testing.T, val testValue, intent roachpb.Intent) { + require.Equal(t, val.txn.Key, intent.Txn.Key) +} + +func TestSstExportFailureIntentBatching(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Test function uses a fixed time and key range to produce SST. + // Use varying inserted keys for values and intents to putting them in and out of ranges. + checkReportedErrors := func(data []testValue, expectedIntentIndices []int) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + + engine := createTestPebbleEngine() + defer engine.Close() + + require.NoError(t, fillInData(ctx, engine, data)) + + _, _, _, err := engine.ExportMVCCToSst(key(10), key(20000), ts(999), ts(2000), + true, 0, 0, true) + if len(expectedIntentIndices) == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + e := (*roachpb.WriteIntentError)(nil) + if !errors.As(err, &e) { + require.Fail(t, "Expected WriteIntentFailure, got %T", err) + } + require.Equal(t, len(expectedIntentIndices), len(e.Intents)) + for i, dataIdx := range expectedIntentIndices { + requireTxnForValue(t, data[dataIdx], e.Intents[i]) + } + } + } + } + + // Export range is fixed to k:["00010", "10000"), ts:(999, 2000] for all tests. + testDataCount := int(maxIntentsPerSstExportError.Default() + 1) + testData := make([]testValue, testDataCount*2) + expectedErrors := make([]int, testDataCount) + for i := 0; i < testDataCount; i++ { + testData[i*2] = value(key(i*2+11), "value", ts(1000)) + testData[i*2+1] = intent(key(i*2+12), "intent", ts(1001)) + expectedErrors[i] = i*2 + 1 + } + t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:maxIntentsPerSstExportError.Default()])) +}