From 7a5a92f4aa83f0c91e9b046d1a299c3efce6c0a3 Mon Sep 17 00:00:00 2001 From: Oleg Afanasyev Date: Fri, 23 Apr 2021 15:22:33 +0100 Subject: [PATCH] storage: report all encountered intents in sst export error Previously, pebbleExportToSst was stopping upon encountering first intent. This was causing backups to be very slow if lots of intents build up. To be able to proceed with export, intent needs to be resolved and export retried. The result of this behaviour is that export would run as many times as there were intents in the table before succeeding. To address this, all intents from the range are collected and reported in WriteIntentError. They could be resolved efficiently as batch similar to how GC operates. Release note (bug fix): Backup no longer resolves intents one by one. This eliminates running a high pri query to cleanup intents to unblock backup in case of intent buildup. --- pkg/ccl/backupccl/backup_intents_test.go | 85 ++++++ pkg/storage/bench_pebble_test.go | 4 +- pkg/storage/mvcc_incremental_iterator.go | 85 ++++-- pkg/storage/mvcc_incremental_iterator_test.go | 269 ++++++++++++------ pkg/storage/mvcc_test.go | 9 +- pkg/storage/pebble.go | 60 +++- pkg/storage/pebble_test.go | 94 ++++++ 7 files changed, 482 insertions(+), 124 deletions(-) create mode 100644 pkg/ccl/backupccl/backup_intents_test.go diff --git a/pkg/ccl/backupccl/backup_intents_test.go b/pkg/ccl/backupccl/backup_intents_test.go new file mode 100644 index 000000000000..6fd1bb193ea8 --- /dev/null +++ b/pkg/ccl/backupccl/backup_intents_test.go @@ -0,0 +1,85 @@ +// Copyright 2021 The Cockroach Authors. +// +// Licensed as a CockroachDB Enterprise file under the Cockroach Community +// License (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt + +package backupccl_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/utilccl" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/skip" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestCleanupIntentsDuringBackupPerformanceRegression(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + defer utilccl.TestingEnableEnterprise()() + + skip.UnderRace(t, "measures backup times not to regress, can't work under race") + + // Time to create backup in presence of intents differs roughly 10x so some + // arbitrary number is picked which is 2x higher than current backup time on + // current (laptop) hardware. + const backupTimeout = time.Second * 10 + + const totalRowCount = 10000 + const perTransactionRowCount = 10 + + // Interceptor catches requests that cleanup transactions of size 1000 which are + // test data transactions. All other transaction commits pass though. + interceptor := func(ctx context.Context, req roachpb.BatchRequest) *roachpb.Error { + endTxn := req.Requests[0].GetEndTxn() + if endTxn != nil && !endTxn.Commit && len(endTxn.LockSpans) == perTransactionRowCount { + // If this is a rollback of one the test's SQL transactions, allow the + // EndTxn to proceed and mark the transaction record as ABORTED, but strip + // the request of its lock spans so that no intents are recorded into the + // transaction record or eagerly resolved. This is a bit of a hack, but it + // mimics the behavior of an abandoned transaction which is aborted by a + // pusher after expiring due to an absence of heartbeats. + endTxn.LockSpans = nil + } + return nil + } + serverKnobs := kvserver.StoreTestingKnobs{TestingRequestFilter: interceptor} + + s, sqlDb, _ := serverutils.StartServer(t, + base.TestServerArgs{Knobs: base.TestingKnobs{Store: &serverKnobs}}) + defer s.Stopper().Stop(context.Background()) + + _, err := sqlDb.Exec("create table foo(v int not null)") + require.NoError(t, err) + + for i := 0; i < totalRowCount; i += perTransactionRowCount { + tx, err := sqlDb.Begin() + require.NoError(t, err) + for j := 0; j < perTransactionRowCount; j += 1 { + statement := fmt.Sprintf("insert into foo (v) values (%d)", i+j) + _, err = tx.Exec(statement) + require.NoError(t, err) + } + require.NoError(t, tx.Rollback()) + } + + start := timeutil.Now() + _, err = sqlDb.Exec("backup table foo to 'userfile:///test.foo'") + stop := timeutil.Now() + require.NoError(t, err, "Failed to run backup") + t.Logf("Backup took %s", stop.Sub(start)) + require.WithinDuration(t, stop, start, backupTimeout, "Time to make backup") +} diff --git a/pkg/storage/bench_pebble_test.go b/pkg/storage/bench_pebble_test.go index 908f801a84b3..465d9cdbb733 100644 --- a/pkg/storage/bench_pebble_test.go +++ b/pkg/storage/bench_pebble_test.go @@ -18,6 +18,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/testutils/skip" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" @@ -37,7 +38,8 @@ func setupMVCCPebble(b testing.TB, dir string) Engine { context.Background(), PebbleConfig{ StorageConfig: base.StorageConfig{ - Dir: dir, + Dir: dir, + Settings: cluster.MakeTestingClusterSettings(), }, Opts: opts, }) diff --git a/pkg/storage/mvcc_incremental_iterator.go b/pkg/storage/mvcc_incremental_iterator.go index cd0412f69c78..70c358e2cb30 100644 --- a/pkg/storage/mvcc_incremental_iterator.go +++ b/pkg/storage/mvcc_incremental_iterator.go @@ -86,6 +86,12 @@ type MVCCIncrementalIterator struct { // For allocation avoidance, meta is used to store the timestamp of keys // regardless if they are metakeys. meta enginepb.MVCCMetadata + + // Intent aggregation options. + // Configuration passed in MVCCIncrementalIterOptions. + enableWriteIntentAggregation bool + // Optional collection of intents created on demand when first intent encountered. + intents []roachpb.Intent } var _ SimpleIterator = &MVCCIncrementalIterator{} @@ -99,6 +105,12 @@ type MVCCIncrementalIterOptions struct { // time. StartTime hlc.Timestamp EndTime hlc.Timestamp + // If intent aggregation is enabled, iterator will not fail on first encountered + // intent, but will proceed further. All found intents will be aggregated into + // a single WriteIntentError which would be updated during iteration. Consumer + // would be free to decide if it wants to keep collecting entries and intents or + // skip entries. + EnableWriteIntentAggregation bool } // NewMVCCIncrementalIterator creates an MVCCIncrementalIterator with the @@ -124,10 +136,11 @@ func NewMVCCIncrementalIterator( } return &MVCCIncrementalIterator{ - iter: iter, - startTime: opts.StartTime, - endTime: opts.EndTime, - timeBoundIter: timeBoundIter, + iter: iter, + startTime: opts.StartTime, + endTime: opts.EndTime, + timeBoundIter: timeBoundIter, + enableWriteIntentAggregation: opts.EnableWriteIntentAggregation, } } @@ -174,12 +187,22 @@ func (i *MVCCIncrementalIterator) Close() { // key. func (i *MVCCIncrementalIterator) Next() { i.iter.Next() + if !i.checkValidAndSaveErr() { + return + } + i.advance() +} + +// checkValidAndSaveErr checks if the underlying iter is valid after the operation +// and saves the error and validity state. Returns true if the underlying iterator +// is valid. +func (i *MVCCIncrementalIterator) checkValidAndSaveErr() bool { if ok, err := i.iter.Valid(); !ok { i.err = err i.valid = false - return + return false } - i.advance() + return true } // NextKey advances the iterator to the next key. This operation is distinct @@ -187,9 +210,7 @@ func (i *MVCCIncrementalIterator) Next() { // key if the iterator is currently located at the last version for a key. func (i *MVCCIncrementalIterator) NextKey() { i.iter.NextKey() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } i.advance() @@ -254,9 +275,7 @@ func (i *MVCCIncrementalIterator) maybeSkipKeys() { // expensive than a Next call. seekKey := MakeMVCCMetadataKey(tbiKey) i.iter.SeekGE(seekKey) - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } } @@ -304,18 +323,23 @@ func (i *MVCCIncrementalIterator) advance() { metaTimestamp := hlc.Timestamp(i.meta.Timestamp) if i.meta.Txn != nil { if i.startTime.Less(metaTimestamp) && metaTimestamp.LessEq(i.endTime) { - i.err = &roachpb.WriteIntentError{ - Intents: []roachpb.Intent{ - roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), - }, + if !i.enableWriteIntentAggregation { + // If we don't plan to collect intents for resolving, we bail out here with a single intent. + i.err = &roachpb.WriteIntentError{ + Intents: []roachpb.Intent{ + roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key), + }, + } + i.valid = false + return } - i.valid = false - return + // We are collecting intents, so we need to save it and advance to its proposed value. + // Caller could then use a value key to update proposed row counters for the sake of bookkeeping + // and advance more. + i.intents = append(i.intents, roachpb.MakeIntent(i.meta.Txn, i.iter.Key().Key)) } i.iter.Next() - if ok, err := i.iter.Valid(); !ok { - i.err = err - i.valid = false + if !i.checkValidAndSaveErr() { return } continue @@ -373,3 +397,22 @@ func (i *MVCCIncrementalIterator) UnsafeKey() MVCCKey { func (i *MVCCIncrementalIterator) UnsafeValue() []byte { return i.iter.UnsafeValue() } + +// NumCollectedIntents returns number of intents encountered during iteration. +// This is only the case when intent aggregation is enabled, otherwise it is +// always 0. +func (i *MVCCIncrementalIterator) NumCollectedIntents() int { + return len(i.intents) +} + +// TryGetIntentError returns roachpb.WriteIntentError if intents were encountered +// during iteration and intent aggregation is enabled. Otherwise function +// returns nil. roachpb.WriteIntentError will contain all encountered intents. +func (i *MVCCIncrementalIterator) TryGetIntentError() error { + if len(i.intents) == 0 { + return nil + } + return &roachpb.WriteIntentError{ + Intents: i.intents, + } +} diff --git a/pkg/storage/mvcc_incremental_iterator_test.go b/pkg/storage/mvcc_incremental_iterator_test.go index 26b35c85f699..be2491e08a71 100644 --- a/pkg/storage/mvcc_incremental_iterator_test.go +++ b/pkg/storage/mvcc_incremental_iterator_test.go @@ -39,33 +39,138 @@ func iterateExpectErr( startKey, endKey roachpb.Key, startTime, endTime hlc.Timestamp, revisions bool, - errString string, + intents []roachpb.Intent, ) func(*testing.T) { return func(t *testing.T) { t.Helper() - iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ - IterOptions: IterOptions{ - UpperBound: endKey, - }, - StartTime: startTime, - EndTime: endTime, + t.Run("aggregate-intents", func(t *testing.T) { + assertExpectErrs(t, e, startKey, endKey, startTime, endTime, revisions, intents) }) - defer iter.Close() - var iterFn func() - if revisions { - iterFn = iter.Next - } else { - iterFn = iter.NextKey + t.Run("first-intent", func(t *testing.T) { + assertExpectErr(t, e, startKey, endKey, startTime, endTime, revisions, intents[0]) + }) + io := IterOptions{UpperBound: endKey} + t.Run("export-intents", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, io, intents) + }) + io.MinTimestampHint = startTime.Next() + io.MaxTimestampHint = endTime + t.Run("export-intents-tbi", func(t *testing.T) { + assertExportedErrs(t, e, startKey, endKey, startTime, endTime, revisions, io, intents) + }) + } +} + +func assertExpectErr( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntent roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + IterOptions: IterOptions{ + UpperBound: endKey, + }, + StartTime: startTime, + EndTime: endTime, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break } - for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { - if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { - break + // pass + } + _, err := iter.Valid() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + if !expectedIntent.Key.Equal(intentErr.Intents[0].Key) { + t.Fatalf("Expected intent key %v, but got %v", expectedIntent.Key, intentErr.Intents[0].Key) + } + } else { + t.Fatalf("expected error with intent %v but got %v", expectedIntent, err) + } +} + +func assertExpectErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + expectedIntents []roachpb.Intent, +) { + iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ + IterOptions: IterOptions{ + UpperBound: endKey, + }, + StartTime: startTime, + EndTime: endTime, + EnableWriteIntentAggregation: true, + }) + defer iter.Close() + var iterFn func() + if revisions { + iterFn = iter.Next + } else { + iterFn = iter.NextKey + } + for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; iterFn() { + if ok, _ := iter.Valid(); !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { + break + } + // pass + } + + if iter.NumCollectedIntents() != len(expectedIntents) { + t.Fatalf("Expected %d intents but found %d", len(expectedIntents), iter.NumCollectedIntents()) + } + err := iter.TryGetIntentError() + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) } - // pass } - if _, err := iter.Valid(); !testutils.IsError(err, errString) { - t.Fatalf("expected error %q but got %v", errString, err) + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) + } +} + +func assertExportedErrs( + t *testing.T, + e Engine, + startKey, endKey roachpb.Key, + startTime, endTime hlc.Timestamp, + revisions bool, + io IterOptions, + expectedIntents []roachpb.Intent, +) { + const big = 1 << 30 + _, _, _, err := e.ExportToSst(startKey, endKey, startTime, endTime, revisions, big, big, io) + require.Error(t, err) + + if intentErr := (*roachpb.WriteIntentError)(nil); errors.As(err, &intentErr) { + for i := range expectedIntents { + if !expectedIntents[i].Key.Equal(intentErr.Intents[i].Key) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Key, expectedIntents[i].Key) + } + if !expectedIntents[i].Txn.ID.Equal(intentErr.Intents[i].Txn.ID) { + t.Fatalf("%d intent key: got %v, expected %v", i, intentErr.Intents[i].Txn.ID, expectedIntents[i].Txn.ID) + } } + } else { + t.Fatalf("Expected roachpb.WriteIntentError, found %T", err) } } @@ -118,9 +223,10 @@ func assertIteratedKVs( expected []MVCCKeyValue, ) { iter := NewMVCCIncrementalIterator(e, MVCCIncrementalIterOptions{ - IterOptions: io, - StartTime: startTime, - EndTime: endTime, + IterOptions: io, + StartTime: startTime, + EndTime: endTime, + EnableWriteIntentAggregation: true, }) defer iter.Close() var iterFn func() @@ -136,6 +242,9 @@ func assertIteratedKVs( } else if !ok || iter.UnsafeKey().Key.Compare(endKey) >= 0 { break } + if iter.NumCollectedIntents() > 0 { + t.Fatal("got unexpected intent error") + } kvs = append(kvs, MVCCKeyValue{Key: iter.Key(), Value: iter.Value()}) } @@ -213,16 +322,33 @@ func TestMVCCIncrementalIterator(t *testing.T) { makeKVT := func(key roachpb.Key, value []byte, ts hlc.Timestamp) MVCCKeyValue { return MVCCKeyValue{Key: MVCCKey{Key: key, Timestamp: ts}, Value: value} } + makeTxn := func(key roachpb.Key, val []byte, ts hlc.Timestamp) (roachpb.Transaction, roachpb.Value, roachpb.Intent) { + txnID := uuid.MakeV4() + txnMeta := enginepb.TxnMeta{ + Key: key, + ID: txnID, + Epoch: 1, + WriteTimestamp: ts, + } + return roachpb.Transaction{ + TxnMeta: txnMeta, + ReadTimestamp: ts, + }, roachpb.Value{ + RawBytes: val, + }, roachpb.MakeIntent(&txnMeta, key) + } + intents := func(intents ...roachpb.Intent) []roachpb.Intent { return intents } + // Keys are named as kv__. kv1_1_1 := makeKVT(testKey1, testValue1, ts1) kv1_4_4 := makeKVT(testKey1, testValue4, ts4) kv1_2_2 := makeKVT(testKey1, testValue2, ts2) kv2_2_2 := makeKVT(testKey2, testValue3, ts2) - kv1_3Deleted := makeKVT(testKey1, nil, ts3) + kv1Deleted3 := makeKVT(testKey1, nil, ts3) kvs := func(kvs ...MVCCKeyValue) []MVCCKeyValue { return kvs } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-latest", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -251,49 +377,31 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, keyMin, keyMax, ts1, tsMax, latest, kvs(kv1_3Deleted, kv2_2_2))) + t.Run("del", assertEqualKVs(e, keyMin, keyMax, ts1, tsMax, latest, kvs(kv1Deleted3, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, keyMin, keyMax, tsMin, ts4, latest, "conflicting intents")) + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, latest, intents(intentErr2))) + if engineImpl.supportsMultiIntentError { + t.Run("intents-multi", + iterateExpectErr(e, keyMin, keyMax, tsMin, ts4, latest, intents(intentErr1, intentErr2))) + } // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, tsMin, ts3, latest, kvs(kv1_3Deleted, kv2_2_2))) - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, ts4, tsMax, latest, kvs())) - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, ts4.Next(), tsMax, latest, kvs())) + t.Run("intents-filtered-1", assertEqualKVs(e, keyMin, keyMax, tsMin, ts3, latest, kvs(kv1Deleted3, kv2_2_2))) + t.Run("intents-filtered-2", assertEqualKVs(e, keyMin, keyMax, ts4, tsMax, latest, kvs())) + t.Run("intents-filtered-3", assertEqualKVs(e, keyMin, keyMax, ts4.Next(), tsMax, latest, kvs())) intent1 := roachpb.MakeLockUpdate(&txn1, roachpb.Span{Key: testKey1}) intent1.Status = roachpb.COMMITTED @@ -305,12 +413,12 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, keyMin, keyMax, tsMin, tsMax, latest, kvs(kv1_4_4, kv2_2_2))) }) } for _, engineImpl := range mvccEngineImpls { - t.Run(engineImpl.name, func(t *testing.T) { + t.Run(engineImpl.name+"-all", func(t *testing.T) { e := engineImpl.create() defer e.Close() @@ -339,47 +447,30 @@ func TestMVCCIncrementalIterator(t *testing.T) { if err := MVCCDelete(ctx, e, nil, testKey1, ts3, nil); err != nil { t.Fatal(err) } - t.Run("del", assertEqualKVs(e, keyMin, keyMax, ts1, tsMax, all, kvs(kv1_3Deleted, kv1_2_2, kv2_2_2))) + t.Run("del", assertEqualKVs(e, keyMin, keyMax, ts1, tsMax, all, kvs(kv1Deleted3, kv1_2_2, kv2_2_2))) // Exercise intent handling. - txn1ID := uuid.MakeV4() - txn1 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey1, - ID: txn1ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn1Val := roachpb.Value{RawBytes: testValue4} + txn1, txn1Val, intentErr1 := makeTxn(testKey1, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn1.TxnMeta.Key, txn1.ReadTimestamp, txn1Val, &txn1); err != nil { t.Fatal(err) } - txn2ID := uuid.MakeV4() - txn2 := roachpb.Transaction{ - TxnMeta: enginepb.TxnMeta{ - Key: testKey2, - ID: txn2ID, - Epoch: 1, - WriteTimestamp: ts4, - }, - ReadTimestamp: ts4, - } - txn2Val := roachpb.Value{RawBytes: testValue4} + txn2, txn2Val, intentErr2 := makeTxn(testKey2, testValue4, ts4) if err := MVCCPut(ctx, e, nil, txn2.TxnMeta.Key, txn2.ReadTimestamp, txn2Val, &txn2); err != nil { t.Fatal(err) } - t.Run("intents", - iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, "conflicting intents")) - t.Run("intents", - iterateExpectErr(e, keyMin, keyMax, tsMin, ts4, all, "conflicting intents")) + // Single intent tests are verifying behavior when intent collection is not enabled. + t.Run("intents-1", + iterateExpectErr(e, testKey1, testKey1.PrefixEnd(), tsMin, tsMax, all, intents(intentErr1))) + t.Run("intents-2", + iterateExpectErr(e, testKey2, testKey2.PrefixEnd(), tsMin, tsMax, all, intents(intentErr2))) + if engineImpl.supportsMultiIntentError { + t.Run("intents-multi", + iterateExpectErr(e, keyMin, keyMax, tsMin, ts4, all, intents(intentErr1, intentErr2))) + } // Intents above the upper time bound or beneath the lower time bound must // be ignored (#28358). Note that the lower time bound is exclusive while // the upper time bound is inclusive. - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, tsMin, ts3, all, kvs(kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents", assertEqualKVs(e, keyMin, keyMax, tsMin, ts3, all, kvs(kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) t.Run("intents", assertEqualKVs(e, keyMin, keyMax, ts4, tsMax, all, kvs())) t.Run("intents", assertEqualKVs(e, keyMin, keyMax, ts4.Next(), tsMax, all, kvs())) @@ -393,7 +484,7 @@ func TestMVCCIncrementalIterator(t *testing.T) { if _, err := MVCCResolveWriteIntent(ctx, e, nil, intent2); err != nil { t.Fatal(err) } - t.Run("intents", assertEqualKVs(e, keyMin, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1_3Deleted, kv1_2_2, kv1_1_1, kv2_2_2))) + t.Run("intents-resolved", assertEqualKVs(e, keyMin, keyMax, tsMin, tsMax, all, kvs(kv1_4_4, kv1Deleted3, kv1_2_2, kv1_1_1, kv2_2_2))) }) } } diff --git a/pkg/storage/mvcc_test.go b/pkg/storage/mvcc_test.go index 3001fa7c67c6..623d56504832 100644 --- a/pkg/storage/mvcc_test.go +++ b/pkg/storage/mvcc_test.go @@ -91,11 +91,12 @@ func createTestPebbleEngine() Engine { } var mvccEngineImpls = []struct { - name string - create func() Engine + name string + supportsMultiIntentError bool + create func() Engine }{ - {"rocksdb", createTestRocksDBEngine}, - {"pebble", createTestPebbleEngine}, + {"rocksdb", false, createTestRocksDBEngine}, + {"pebble", true, createTestPebbleEngine}, } // makeTxn creates a new transaction using the specified base diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index 95a27d1f448d..f1c385af1256 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -24,6 +24,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/base" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/fs" @@ -37,6 +38,17 @@ import ( "github.com/cockroachdb/pebble/vfs" ) +// Default value for maximum number of intents reported by ExportToSST +// in WriteIntentError is set to half of the maximum lock table size. +// This value is subject to tuning in real environment as we have more +// data available. +const maxIntentsPerSstExportErrorDefault = 5000 + +var maxIntentsPerSstExportError = settings.RegisterIntSetting( + "storage.sst_export.max_intents_per_error", + "maximum number of intents returned in error when sst export fails", + maxIntentsPerSstExportErrorDefault) + // MVCCKeyCompare compares cockroach keys, including the MVCC timestamps. func MVCCKeyCompare(a, b []byte) int { // NB: For performance, this routine manually splits the key into the @@ -533,7 +545,8 @@ func newPebbleInMem(ctx context.Context, attrs roachpb.Attributes, cacheSize int Attrs: attrs, // TODO(bdarnell): The hard-coded 512 MiB is wrong; see // https://github.com/cockroachdb/cockroach/issues/16750 - MaxSize: 512 << 20, /* 512 MiB */ + MaxSize: 512 << 20, /* 512 MiB */ + Settings: cluster.MakeTestingClusterSettings(), }, Opts: opts, }) @@ -578,7 +591,8 @@ func (p *Pebble) ExportToSst( targetSize, maxSize uint64, io IterOptions, ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io) + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io, maxIntentCount) } // Get implements the Engine interface. @@ -882,6 +896,7 @@ func (p *Pebble) NewWriteOnlyBatch() Batch { func (p *Pebble) NewSnapshot() Reader { return &pebbleSnapshot{ snapshot: p.db.NewSnapshot(), + settings: p.settings, } } @@ -1073,7 +1088,8 @@ func (p *pebbleReadOnly) ExportToSst( targetSize, maxSize uint64, io IterOptions, ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io) + maxIntentCount := maxIntentsPerSstExportError.Get(&p.parent.settings.SV) + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io, maxIntentCount) } func (p *pebbleReadOnly) Get(key MVCCKey) ([]byte, error) { @@ -1171,6 +1187,7 @@ func (p *pebbleReadOnly) LogLogicalOp(op MVCCLogicalOpType, details MVCCLogicalO // pebbleSnapshot represents a snapshot created using Pebble.NewSnapshot(). type pebbleSnapshot struct { snapshot *pebble.Snapshot + settings *cluster.Settings closed bool } @@ -1195,7 +1212,8 @@ func (p *pebbleSnapshot) ExportToSst( targetSize, maxSize uint64, io IterOptions, ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { - return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io) + maxIntentCount := maxIntentsPerSstExportError.Get(&p.settings.SV) + return pebbleExportToSst(p, startKey, endKey, startTS, endTS, exportAllRevisions, targetSize, maxSize, io, maxIntentCount) } // Get implements the Reader interface. @@ -1260,6 +1278,7 @@ func pebbleExportToSst( exportAllRevisions bool, targetSize, maxSize uint64, io IterOptions, + maxIntentCount int64, ) ([]byte, roachpb.BulkOpSummary, roachpb.Key, error) { sstFile := &MemFile{} sstWriter := MakeBackupSSTWriter(sstFile) @@ -1269,9 +1288,10 @@ func pebbleExportToSst( iter := NewMVCCIncrementalIterator( reader, MVCCIncrementalIterOptions{ - IterOptions: io, - StartTime: startTS, - EndTime: endTS, + IterOptions: io, + StartTime: startTS, + EndTime: endTS, + EnableWriteIntentAggregation: true, }) defer iter.Close() var curKey roachpb.Key // only used if exportAllRevisions @@ -1280,8 +1300,8 @@ func pebbleExportToSst( for iter.SeekGE(MakeMVCCMetadataKey(startKey)); ; { ok, err := iter.Valid() if err != nil { - // The error may be a WriteIntentError. In which case, returning it will - // cause this command to be retried. + // This is an underlying iterator error, return it to the caller to deal + // with. return nil, roachpb.BulkOpSummary{}, nil, err } if !ok { @@ -1291,6 +1311,11 @@ func pebbleExportToSst( if unsafeKey.Key.Compare(endKey) >= 0 { break } + + if iter.NumCollectedIntents() > 0 { + break + } + unsafeValue := iter.UnsafeValue() isNewKey := !exportAllRevisions || !unsafeKey.Key.Equal(curKey) if paginated && exportAllRevisions && isNewKey { @@ -1329,6 +1354,23 @@ func pebbleExportToSst( } } + // First check if we encountered an intent while iterating the data. + // If we do it means this export can't complete and is aborted. We need to loop over remaining data + // to collect all matching intents before returning them in an error to the caller. + if iter.NumCollectedIntents() > 0 { + for int64(iter.NumCollectedIntents()) < maxIntentCount { + iter.NextKey() + // If we encounter other errors during intent collection, we return our original write intent failure. + // We would find this new error again upon retry. + ok, _ := iter.Valid() + if !ok { + break + } + } + err := iter.TryGetIntentError() + return nil, roachpb.BulkOpSummary{}, nil, err + } + if rows.BulkOpSummary.DataSize == 0 { // If no records were added to the sstable, skip completing it and return a // nil slice – the export code will discard it anyway (based on 0 DataSize). diff --git a/pkg/storage/pebble_test.go b/pkg/storage/pebble_test.go index def38c398a0a..486bfbeb14d7 100644 --- a/pkg/storage/pebble_test.go +++ b/pkg/storage/pebble_test.go @@ -12,6 +12,7 @@ package storage import ( "bytes" + "context" "fmt" "io/ioutil" "math/rand" @@ -20,6 +21,7 @@ import ( "strings" "testing" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/enginepb" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -28,7 +30,9 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/datadriven" + "github.com/cockroachdb/errors" "github.com/cockroachdb/pebble" + "github.com/stretchr/testify/require" ) func TestPebbleTimeBoundPropCollector(t *testing.T) { @@ -295,3 +299,93 @@ func BenchmarkMVCCKeyCompare(b *testing.B) { fmt.Fprint(ioutil.Discard, c) } } + +type testValue struct { + key roachpb.Key + value roachpb.Value + timestamp hlc.Timestamp + txn *roachpb.Transaction +} + +func intent(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + tx := roachpb.MakeTransaction(fmt.Sprintf("txn-%v", key), key, roachpb.NormalUserPriority, ts, 1000) + var txn = &tx + return testValue{key, value, ts, txn} +} + +func value(key roachpb.Key, val string, ts hlc.Timestamp) testValue { + var value = roachpb.MakeValueFromString(val) + value.InitChecksum(key) + return testValue{key, value, ts, nil} +} + +func fillInData(ctx context.Context, engine Engine, data []testValue) error { + batch := engine.NewBatch() + for _, val := range data { + if err := MVCCPut(ctx, batch, nil, val.key, val.timestamp, val.value, val.txn); err != nil { + return err + } + } + return batch.Commit(true) +} + +func ts(ts int64) hlc.Timestamp { + return hlc.Timestamp{WallTime: ts} +} + +func formattedKey(k int) roachpb.Key { + return []byte(fmt.Sprintf("%05d", k)) +} + +func requireTxnForValue(t *testing.T, val testValue, intent roachpb.Intent) { + require.Equal(t, val.txn.Key, intent.Txn.Key) +} + +func TestSstExportFailureIntentBatching(t *testing.T) { + defer leaktest.AfterTest(t)() + + // Test function uses a fixed time and key range to produce SST. + // Use varying inserted keys for values and intents to putting them in and out of ranges. + checkReportedErrors := func(data []testValue, expectedIntentIndices []int) func(*testing.T) { + return func(t *testing.T) { + ctx := context.Background() + + engine := createTestPebbleEngine() + defer engine.Close() + + require.NoError(t, fillInData(ctx, engine, data)) + + minTs := ts(999) + maxTs := ts(2000) + endKey := formattedKey(20000) + _, _, _, err := engine.ExportToSst(formattedKey(10), formattedKey(20000), minTs, maxTs, + true, 0, 0, IterOptions{MinTimestampHint: minTs.Next(), MaxTimestampHint: maxTs, UpperBound: endKey}) + if len(expectedIntentIndices) == 0 { + require.NoError(t, err) + } else { + require.Error(t, err) + e := (*roachpb.WriteIntentError)(nil) + if !errors.As(err, &e) { + require.Fail(t, "Expected WriteIntentFailure, got %T", err) + } + require.Equal(t, len(expectedIntentIndices), len(e.Intents)) + for i, dataIdx := range expectedIntentIndices { + requireTxnForValue(t, data[dataIdx], e.Intents[i]) + } + } + } + } + + // Export range is fixed to k:["00010", "10000"), ts:(999, 2000] for all tests. + testDataCount := int(maxIntentsPerSstExportError.Default() + 1) + testData := make([]testValue, testDataCount*2) + expectedErrors := make([]int, testDataCount) + for i := 0; i < testDataCount; i++ { + testData[i*2] = value(formattedKey(i*2+11), "value", ts(1000)) + testData[i*2+1] = intent(formattedKey(i*2+12), "intent", ts(1001)) + expectedErrors[i] = i*2 + 1 + } + t.Run("Receive no more than limit intents", checkReportedErrors(testData, expectedErrors[:maxIntentsPerSstExportError.Default()])) +}