From 084cc68743bbadb058ee01148a81b88ee9f5cb46 Mon Sep 17 00:00:00 2001 From: adityamaru Date: Wed, 2 Nov 2022 09:24:48 -0400 Subject: [PATCH] builtins: add `crdb_internal.fingerprint` builtin This change adds a `crdb_internal.fingerprint` builtin that accepts a `startTime`, `endTime`, `startKey` and `endKey` to define the interval the user wants to fingerprint. The builtin is powered by sending an ExportRequest with the defined intervals but with the `ExportFingerprint` option set to true. Setting this option on the ExportRequest means that instead of writing all point and rangekeys to an SST and sending them back to the client, command evaluation will use the newly introduced `fingerprintWriter` (#90848) when exporting keys. This writer computes an `fnv64` hash of the key/timestamp, value for each point key and maintains a running XOR aggregate of all the point keys processed as part of the ExportRequest. Rangekeys are not fingerprinted during command evaluation, but instead returned to the client in a pebble SST. This is because range keys do not have a stable, discrete identity and so it is up to the caller to define a deterministic ingerprinting scheme across all returned range keys. The ExportRequest sent as part of this builtin does not set any DistSender limit, thereby allowing concurrent execution across ranges. We are not concerned about the ExportResponses growing too large since the SSTs will only contain rangekeys that should be few in number. If this assumption is proved incorrect in the future, we can revisit setting a `TargetBytes` to the header of the BatchRequest. Fixes: #89336 Release note: None --- docs/generated/sql/functions.md | 2 + pkg/kv/kvserver/batcheval/cmd_export.go | 43 ++-- pkg/roachpb/api.proto | 30 ++- pkg/sql/sem/builtins/BUILD.bazel | 11 + pkg/sql/sem/builtins/builtins.go | 223 ++++++++++++++++++ .../sem/builtins/fingerprint_builtin_test.go | 212 +++++++++++++++++ pkg/sql/sem/builtins/fixed_oids.go | 1 + pkg/storage/mvcc.go | 2 +- 8 files changed, 503 insertions(+), 21 deletions(-) create mode 100644 pkg/sql/sem/builtins/fingerprint_builtin_test.go diff --git a/docs/generated/sql/functions.md b/docs/generated/sql/functions.md index be6e29cc0a42..8312c52be755 100644 --- a/docs/generated/sql/functions.md +++ b/docs/generated/sql/functions.md @@ -3060,6 +3060,8 @@ may increase either contention or retry errors, or both.

Volatile crdb_internal.encode_key(table_id: int, index_id: int, row_tuple: anyelement) → bytes

Generate the key for a row on a particular table and index.

Stable +crdb_internal.fingerprint(span: bytes[], start_time: timestamptz, all_revisions: bool) → int

This function is used only by CockroachDB’s developers for testing purposes.

+
Immutable crdb_internal.force_assertion_error(msg: string) → int

This function is used only by CockroachDB’s developers for testing purposes.

Volatile crdb_internal.force_error(errorCode: string, msg: string) → int

This function is used only by CockroachDB’s developers for testing purposes.

diff --git a/pkg/kv/kvserver/batcheval/cmd_export.go b/pkg/kv/kvserver/batcheval/cmd_export.go index 36e450df72c4..db3856342a27 100644 --- a/pkg/kv/kvserver/batcheval/cmd_export.go +++ b/pkg/kv/kvserver/batcheval/cmd_export.go @@ -181,19 +181,27 @@ func evalExport( var curSizeOfExportedSSTs int64 for start := args.Key; start != nil; { destFile := &storage.MemFile{} - summary, resume, err := storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, - storage.MVCCExportOptions{ - StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS}, - EndKey: args.EndKey, - StartTS: args.StartTime, - EndTS: h.Timestamp, - ExportAllRevisions: exportAllRevisions, - TargetSize: targetSize, - MaxSize: maxSize, - MaxIntents: maxIntents, - StopMidKey: args.SplitMidKey, - ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}), - }, destFile) + opts := storage.MVCCExportOptions{ + StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS}, + EndKey: args.EndKey, + StartTS: args.StartTime, + EndTS: h.Timestamp, + ExportAllRevisions: exportAllRevisions, + TargetSize: targetSize, + MaxSize: maxSize, + MaxIntents: maxIntents, + StopMidKey: args.SplitMidKey, + ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}), + } + var summary roachpb.BulkOpSummary + var resume storage.MVCCKey + var fingerprint uint64 + var err error + if args.ExportFingerprint { + summary, resume, fingerprint, err = storage.MVCCExportFingerprint(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile) + } else { + summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile) + } if err != nil { if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) { err = errors.WithHintf(err, @@ -217,10 +225,11 @@ func evalExport( span.EndKey = args.EndKey } exported := roachpb.ExportResponse_File{ - Span: span, - EndKeyTS: resume.Timestamp, - Exported: summary, - SST: data, + Span: span, + EndKeyTS: resume.Timestamp, + Exported: summary, + SST: data, + Fingerprint: fingerprint, } reply.Files = append(reply.Files, exported) start = resume.Key diff --git a/pkg/roachpb/api.proto b/pkg/roachpb/api.proto index c9c0754504fd..a44f0798395e 100644 --- a/pkg/roachpb/api.proto +++ b/pkg/roachpb/api.proto @@ -1546,7 +1546,21 @@ message ExportRequest { // then there is no limit. int64 target_file_size = 10; + // ExportFingerprint when set to true will result in ExportRequest command + // evaluation generating an fnv64 hash for every key/timestamp and value, for + // point keys encountered in the key/time interval. Each KV hash will be + // combined via a XOR into a running aggregate that is returned as part of the + // ExportResponse. + // + // Range keys are not fingerprinted but instead written to a pebble SST that + // is returned to the caller. This is because range keys do not have a stable, + // discrete identity and so it is up to the caller to define a deterministic + // fingerprinting scheme across all returned range keys. + bool export_fingerprint = 14; + reserved 2, 5, 7, 8, 11; + + // Next ID: 15 } // BulkOpSummary summarizes the data processed by an operation, counting the @@ -1586,14 +1600,24 @@ message ExportResponse { (gogoproto.customname) = "EndKeyTS" ]; string path = 2; - reserved 3; - reserved 4; - reserved 5; BulkOpSummary exported = 6 [(gogoproto.nullable) = false]; bytes sst = 7 [(gogoproto.customname) = "SST"]; string locality_kv = 8 [(gogoproto.customname) = "LocalityKV"]; + + // Fingerprint is the XOR aggregate of the fnv64 hash of every point + // key/timestamp and corresponding value that has been exported as part of + // the ExportRequest. This field is only set when the request is sent with + // `ExportFingerprint` set to true. + // + // Range keys are not fingerprinted but instead written to the sst above + // that is returned to the caller. This is because range keys do not have a + // stable, discrete identity and so it is up to the caller to define a + // deterministic fingerprinting scheme across all returned range keys. + uint64 fingerprint = 10; + + reserved 3, 4, 5; } ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true]; diff --git a/pkg/sql/sem/builtins/BUILD.bazel b/pkg/sql/sem/builtins/BUILD.bazel index 30c4f627bfd2..10d34ed06af1 100644 --- a/pkg/sql/sem/builtins/BUILD.bazel +++ b/pkg/sql/sem/builtins/BUILD.bazel @@ -46,6 +46,7 @@ go_library( "//pkg/keys", "//pkg/kv", "//pkg/kv/kvclient", + "//pkg/kv/kvserver/concurrency/lock", "//pkg/kv/kvserver/kvserverbase", "//pkg/repstream/streampb", "//pkg/roachpb", @@ -88,7 +89,9 @@ go_library( "//pkg/sql/storageparam", "//pkg/sql/storageparam/indexstorageparam", "//pkg/sql/types", + "//pkg/storage", "//pkg/util", + "//pkg/util/admission/admissionpb", "//pkg/util/arith", "//pkg/util/bitarray", "//pkg/util/contextutil", @@ -142,6 +145,7 @@ go_test( "all_builtins_test.go", "builtins_test.go", "datums_to_bytes_builtin_test.go", + "fingerprint_builtin_test.go", "generator_builtins_test.go", "geo_builtins_test.go", "help_test.go", @@ -158,6 +162,8 @@ go_test( "//pkg/base", "//pkg/keys", "//pkg/kv", + "//pkg/kv/kvserver", + "//pkg/roachpb", "//pkg/security/securityassets", "//pkg/security/securitytest", "//pkg/server", @@ -173,16 +179,21 @@ go_test( "//pkg/sql/sem/tree/treewindow", "//pkg/sql/sem/volatility", "//pkg/sql/types", + "//pkg/storage", "//pkg/testutils", "//pkg/testutils/serverutils", "//pkg/testutils/skip", "//pkg/testutils/sqlutils", + "//pkg/testutils/storageutils", "//pkg/testutils/testcluster", "//pkg/util", "//pkg/util/duration", + "//pkg/util/hlc", "//pkg/util/leaktest", + "//pkg/util/log", "//pkg/util/mon", "//pkg/util/randutil", + "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_lib_pq//:pq", "@com_github_stretchr_testify//assert", diff --git a/pkg/sql/sem/builtins/builtins.go b/pkg/sql/sem/builtins/builtins.go index e1aeacf4a0f1..a2232a765fd9 100644 --- a/pkg/sql/sem/builtins/builtins.go +++ b/pkg/sql/sem/builtins/builtins.go @@ -40,6 +40,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/config/zonepb" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver/concurrency/lock" "github.com/cockroachdb/cockroach/pkg/kv/kvserver/kvserverbase" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/security/password" @@ -72,11 +73,15 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sqlstats/persistedsqlstats/sqlstatsutil" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/util" + "github.com/cockroachdb/cockroach/pkg/util/admission/admissionpb" + "github.com/cockroachdb/cockroach/pkg/util/contextutil" "github.com/cockroachdb/cockroach/pkg/util/duration" "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/errorutil/unimplemented" "github.com/cockroachdb/cockroach/pkg/util/fuzzystrmatch" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/humanizeutil" "github.com/cockroachdb/cockroach/pkg/util/ipaddr" "github.com/cockroachdb/cockroach/pkg/util/json" @@ -7354,6 +7359,224 @@ expires until the statement bundle is collected`, Volatility: volatility.Volatile, }, ), + "crdb_internal.fingerprint": makeBuiltin( + tree.FunctionProperties{ + Category: builtinconstants.CategorySystemInfo, + }, + tree.Overload{ + Types: tree.ArgTypes{ + {"span", types.BytesArray}, + {"start_time", types.TimestampTZ}, + {"all_revisions", types.Bool}, + }, + ReturnType: tree.FixedReturnType(types.Int), + Fn: func(ctx context.Context, evalCtx *eval.Context, args tree.Datums) (tree.Datum, error) { + ctx, sp := tracing.ChildSpan(ctx, "crdb_internal.fingerprint") + defer sp.Finish() + isAdmin, err := evalCtx.SessionAccessor.HasAdminRole(ctx) + if err != nil { + return nil, err + } + if !isAdmin { + return nil, errors.New("crdb_internal.fingerprint() requires admin privilege") + } + arr := tree.MustBeDArray(args[0]) + if arr.Len() != 2 { + return nil, errors.New("expected an array of two elements") + } + startKey := []byte(tree.MustBeDBytes(arr.Array[0])) + endKey := []byte(tree.MustBeDBytes(arr.Array[1])) + endTime := hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + if evalCtx.AsOfSystemTime != nil { + endTime = evalCtx.AsOfSystemTime.Timestamp + } + header := roachpb.Header{ + Timestamp: endTime, + // We set WaitPolicy to Error, so that the export will return an error + // to us instead of a blocking wait if it hits any other txns. + // + // TODO(adityamaru): We might need to handle WriteIntentErrors + // specially in the future so as to allow the fingerprint to complete + // in the face of intents. + WaitPolicy: lock.WaitPolicy_Error, + } + startTime := args[1].(*tree.DTimestampTZ).Time + startTimestamp := hlc.Timestamp{WallTime: startTime.UnixNano()} + allRevisions := *args[2].(*tree.DBool) + filter := roachpb.MVCCFilter_Latest + if allRevisions { + filter = roachpb.MVCCFilter_All + } + req := &roachpb.ExportRequest{ + RequestHeader: roachpb.RequestHeader{Key: startKey, EndKey: endKey}, + StartTime: startTimestamp, + MVCCFilter: filter, + ExportFingerprint: true, + } + admissionHeader := roachpb.AdmissionHeader{ + Priority: int32(admissionpb.BulkNormalPri), + CreateTime: timeutil.Now().UnixNano(), + Source: roachpb.AdmissionHeader_FROM_SQL, + NoMemoryReservedAtSource: true, + } + todo := make(chan *roachpb.ExportRequest, 1) + todo <- req + ctxDone := ctx.Done() + var fingerprint uint64 + ssts := make([][]byte, 0) + for { + select { + case <-ctxDone: + return nil, ctx.Err() + case req := <-todo: + var rawResp roachpb.Response + var pErr *roachpb.Error + exportRequestErr := contextutil.RunWithTimeout(ctx, + fmt.Sprintf("ExportRequest fingerprint for span %s", roachpb.Span{Key: startKey, EndKey: endKey}), + 5*time.Minute, func(ctx context.Context) error { + rawResp, pErr = kv.SendWrappedWithAdmission(ctx, + evalCtx.Txn.DB().NonTransactionalSender(), header, admissionHeader, req) + if pErr != nil { + return pErr.GoError() + } + return nil + }) + if exportRequestErr != nil { + return nil, exportRequestErr + } + + resp := rawResp.(*roachpb.ExportResponse) + for _, file := range resp.Files { + fingerprint = fingerprint ^ file.Fingerprint + + // Aggregate all the range keys that need fingerprinting once all + // ExportRequests have been completed. + if len(file.SST) != 0 { + ssts = append(ssts, file.SST) + } + } + if resp.ResumeSpan != nil { + if !resp.ResumeSpan.Valid() { + return nil, errors.Errorf("invalid resume span: %s", resp.ResumeSpan) + } + + resumeReq := req + resumeReq.RequestHeader = roachpb.RequestHeaderFromSpan(*resp.ResumeSpan) + todo <- resumeReq + } + default: + // No ExportRequests left to send. We've aggregated range keys + // across all ExportRequests and can now fingerprint them. + // + // NB: We aggregate rangekeys across ExportRequests and then + // fingerprint them on the client, instead of fingerprinting them as + // part of the ExportRequest command evaluation, because range keys + // do not have a stable, discrete identity. Their fragmentation can + // be influenced by rangekeys outside the time interval that we are + // fingerprinting, or by range splits. So, we need to "defragment" + // all the rangekey stacks we observe such that the fragmentation is + // deterministic on only the data we want to fingerprint in our key + // and time interval. + // + // Egs: + // + // t2 [-----)[----) + // + // t1 [----)[-----) + // a b c d + // + // Assume we have two rangekeys [a, c)@t1 and [b, d)@t2. They will + // fragment as shown in the diagram above. If we wish to fingerprint + // key [a-d) in time interval (t1, t2] the fragmented rangekey + // [a, c)@t1 is outside our time interval and should not influence our + // fingerprint. The iterator in `fingerprintRangekeys` will + // "defragment" the rangekey stacks [b-c)@t2 and [c-d)@t2 and + // fingerprint them as a single rangekey with bounds [b-d)@t2. + rangekeyFingerprint, err := fingerprintRangekeys(ctx, ssts) + if err != nil { + return nil, err + } + fingerprint = fingerprint ^ rangekeyFingerprint + return tree.NewDInt(tree.DInt(fingerprint)), nil + } + } + }, + Info: "This function is used only by CockroachDB's developers for testing purposes.", + Volatility: volatility.Immutable, + }, + ), +} + +func hashData(h hash.Hash64, data []byte) error { + if _, err := h.Write(data); err != nil { + return errors.NewAssertionErrorWithWrappedErrf(err, + `"It never returns an error." -- https://golang.org/pkg/hash: %T`, h) + } + + return nil +} + +// fingerprintRangekeys iterates over the provided SSTs, that are expected to +// contain only rangekeys, and maintains a XOR aggregate of each rangekey's +// fingerprint. +func fingerprintRangekeys(ctx context.Context, ssts [][]byte) (uint64, error) { + ctx, sp := tracing.ChildSpan(ctx, "builtins.fingerprintRangekeys") + defer sp.Finish() + _ = ctx // ctx is currently unused, but this new ctx should be used below in the future. + + iterOpts := storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.LocalMax, + UpperBound: keys.MaxKey, + } + var fingerprint uint64 + iter, err := storage.NewMultiMemSSTIterator(ssts, false, iterOpts) + if err != nil { + return fingerprint, err + } + defer iter.Close() + + h := fnv.New64() + var timestampBuf []byte + fingerprintRangeKey := func(bounds roachpb.Span, versions storage.MVCCRangeKeyVersions) (uint64, error) { + defer h.Reset() + if err := hashData(h, bounds.Key); err != nil { + return 0, err + } + if err := hashData(h, bounds.EndKey); err != nil { + return 0, err + } + for _, v := range versions { + timestampBuf = storage.EncodeMVCCTimestampToBuf(timestampBuf, v.Timestamp) + if err := hashData(h, timestampBuf); err != nil { + return 0, err + } + if err := hashData(h, v.Value); err != nil { + return 0, err + } + } + return h.Sum64(), nil + } + + for iter.SeekGE(storage.MVCCKey{Key: keys.MinKey}); ; iter.Next() { + if ok, err := iter.Valid(); err != nil { + return fingerprint, err + } else if !ok { + break + } + hasPoint, _ := iter.HasPointAndRange() + if hasPoint { + return fingerprint, errors.AssertionFailedf("unexpected point key; ssts should only contain range keys") + } + rangeKeyStack := iter.RangeKeys() + rangekeyFingerprint, err := fingerprintRangeKey(rangeKeyStack.Bounds, rangeKeyStack.Versions) + if err != nil { + return fingerprint, err + } + fingerprint = fingerprint ^ rangekeyFingerprint + } + + return fingerprint, nil } var lengthImpls = func(incBitOverload bool) builtinDefinition { diff --git a/pkg/sql/sem/builtins/fingerprint_builtin_test.go b/pkg/sql/sem/builtins/fingerprint_builtin_test.go new file mode 100644 index 000000000000..fbeb66e06c88 --- /dev/null +++ b/pkg/sql/sem/builtins/fingerprint_builtin_test.go @@ -0,0 +1,212 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package builtins_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/kv/kvserver" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/server" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/storageutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/syncutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +// TestFingerprint tests the `crdb_internal.fingerprint` builtin in the presence +// of rangekeys. +func TestFingerprint(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + var mu syncutil.Mutex + var numExportResponses int + var numSSTsInExportResponses int + serv, sqlDB, db := serverutils.StartServer(t, base.TestServerArgs{ + Knobs: base.TestingKnobs{ + Store: &kvserver.StoreTestingKnobs{ + TestingResponseFilter: func(ctx context.Context, ba *roachpb.BatchRequest, br *roachpb.BatchResponse) *roachpb.Error { + mu.Lock() + defer mu.Unlock() + for i, ru := range br.Responses { + if _, ok := ba.Requests[i].GetInner().(*roachpb.ExportRequest); ok { + exportResponse := ru.GetInner().(*roachpb.ExportResponse) + numExportResponses++ + numSSTsInExportResponses += len(exportResponse.Files) + } + } + return nil + }, + }, + }, + }) + + resetVars := func() { + mu.Lock() + defer mu.Unlock() + numExportResponses = 0 + numSSTsInExportResponses = 0 + } + + returnPointAndRangeKeys := func(eng storage.Engine) ([]storage.MVCCKeyValue, []storage.MVCCRangeKey) { + var rangeKeys []storage.MVCCRangeKey + var pointKeys []storage.MVCCKeyValue + for _, kvI := range storageutils.ScanKeySpan(t, eng, roachpb.Key("a"), roachpb.Key("z")) { + switch kv := kvI.(type) { + case storage.MVCCRangeKeyValue: + rangeKeys = append(rangeKeys, kv.RangeKey) + + case storage.MVCCKeyValue: + pointKeys = append(pointKeys, kv) + + default: + t.Fatalf("unknown type %t", kvI) + } + } + return pointKeys, rangeKeys + } + + fingerprint := func(t *testing.T, startKey, endKey string, startTime, endTime hlc.Timestamp, allRevisions bool) int64 { + decimal := eval.TimestampToDecimal(endTime) + var fingerprint int64 + query := fmt.Sprintf(`SELECT * FROM crdb_internal.fingerprint(ARRAY[$1::BYTES, $2::BYTES], $3, $4) AS OF SYSTEM TIME %s`, decimal.String()) + require.NoError(t, sqlDB.QueryRow(query, roachpb.Key(startKey), roachpb.Key(endKey), startTime.GoTime(), allRevisions).Scan(&fingerprint)) + return fingerprint + } + + // Disable index recommendation so that >5 invocations of + // `crdb_internal.fingerprint` does not result in an additional call for + // generating index recommendations. + _, err := sqlDB.Exec(`SET CLUSTER SETTING sql.metrics.statement_details.index_recommendation_collection.enabled = false`) + require.NoError(t, err) + + t.Run("fingerprint-empty-store", func(t *testing.T) { + fingerprint := fingerprint(t, "a", "z", hlc.Timestamp{WallTime: 0}, + hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}, true /* allRevisions */) + require.Zero(t, fingerprint) + }) + + s := serv.(*server.TestServer) + defer s.Stopper().Stop(ctx) + + store, err := s.Stores().GetStore(s.GetFirstStoreID()) + require.NoError(t, err) + eng := store.Engine() + + // Insert some point keys. + txn := db.NewTxn(ctx, "test-point-keys") + pointKeysTS := hlc.Timestamp{WallTime: timeutil.Now().Round(time.Microsecond).UnixNano()} + require.NoError(t, txn.SetFixedTimestamp(ctx, pointKeysTS)) + require.NoError(t, txn.Put(ctx, "a", "value")) + require.NoError(t, txn.Put(ctx, "b", "value")) + require.NoError(t, txn.Put(ctx, "c", "value")) + require.NoError(t, txn.Put(ctx, "d", "value")) + require.NoError(t, txn.Commit(ctx)) + + // Run a scan to force intent resolution. + _, err = db.Scan(ctx, "a", "z", 0) + require.NoError(t, err) + + pointKeys, rangeKeys := returnPointAndRangeKeys(eng) + require.Len(t, pointKeys, 4) + require.Len(t, rangeKeys, 0) + + // The store will have: + // + // ts2 [----------- rt -------------) + // + // ts1 value value value value + // a b c d + // + // Fingerprint the point keys. + fingerprintPointKeys := fingerprint(t, "a", "z", + pointKeysTS.Add(int64(-time.Microsecond), 0), pointKeysTS, true /* allRevisions */) + + require.NoError(t, db.DelRangeUsingTombstone(ctx, "a", "c")) + pointKeys, rangeKeys = returnPointAndRangeKeys(eng) + require.Len(t, pointKeys, 4) + require.Len(t, rangeKeys, 1) + rangeKey1Timestamp := rangeKeys[0].Timestamp + // Note, the timestamp comparison is a noop here but we need the timestamp for + // future AOST fingerprint queries. + require.Equal(t, []storage.MVCCRangeKey{ + storageutils.RangeKeyWithTS("a", "c", rangeKey1Timestamp), + }, rangeKeys) + + // Fingerprint the point and range keys. + fingerprintPointAndRangeKeys := fingerprint(t, "a", "z", + pointKeysTS.Add(int64(-time.Microsecond), 0), rangeKey1Timestamp, true /* allRevisions */) + require.NotEqual(t, int64(0), fingerprintPointAndRangeKeys) + require.NotEqual(t, fingerprintPointKeys, fingerprintPointAndRangeKeys) + + // Fingerprint only the range key. + fingerprintRangekeys := fingerprint(t, "a", "z", + rangeKey1Timestamp.Add(int64(-time.Microsecond), 0), rangeKey1Timestamp, true /* allRevisions */) + require.NotEqual(t, int64(0), fingerprintRangekeys) + + require.Equal(t, fingerprintPointAndRangeKeys, fingerprintPointKeys^fingerprintRangekeys) + + // The store now has: + // + // ts3 [------)[-------) + // + // ts2 [----------)[------) + // + // ts1 value value value value + // a b c d + require.NoError(t, db.DelRangeUsingTombstone(ctx, "b", "d")) + pointKeys, rangeKeys = returnPointAndRangeKeys(eng) + require.Len(t, pointKeys, 4) + require.Len(t, rangeKeys, 4) + rangeKey2Timestamp := rangeKeys[1].Timestamp + require.Equal(t, []storage.MVCCRangeKey{ + storageutils.RangeKeyWithTS("a", "b", rangeKey1Timestamp), + storageutils.RangeKeyWithTS("b", "c", rangeKey2Timestamp), + storageutils.RangeKeyWithTS("b", "c", rangeKey1Timestamp), + storageutils.RangeKeyWithTS("c", "d", rangeKey2Timestamp), + }, rangeKeys) + + // Even with the fragmentation of the first range key, our fingerprint for the + // point keys and first range key should be the same as before. + fingerprintFragmentedPointAndRangeKeys := fingerprint(t, "a", "z", + pointKeysTS.Add(int64(-time.Microsecond), 0), rangeKey1Timestamp, true /* allRevisions */) + require.Equal(t, fingerprintPointAndRangeKeys, fingerprintFragmentedPointAndRangeKeys) + + // Insert a split point so that we're returned 2 SSTs with rangekeys instead + // of one. This should not affect the fingerprint. + resetVars() + fingerprintPreSplit := fingerprint(t, "a", "z", pointKeysTS.Add(int64(-time.Microsecond), 0), + hlc.Timestamp{WallTime: timeutil.Now().Round(time.Microsecond).UnixNano()}, true /* allRevisions */) + require.Equal(t, 1, numSSTsInExportResponses) + require.Equal(t, 1, numExportResponses) + + require.NoError(t, db.AdminSplit(ctx, "c", hlc.MaxTimestamp, roachpb.AdminSplitRequest_INGESTION)) + + resetVars() + fingerprintPostSplit := fingerprint(t, "a", "z", pointKeysTS.Add(int64(-time.Microsecond), 0), + hlc.Timestamp{WallTime: timeutil.Now().Round(time.Microsecond).UnixNano()}, true /* allRevisions */) + require.Equal(t, 2, numSSTsInExportResponses) + require.Equal(t, 2, numExportResponses) + + require.Equal(t, fingerprintPreSplit, fingerprintPostSplit) +} diff --git a/pkg/sql/sem/builtins/fixed_oids.go b/pkg/sql/sem/builtins/fixed_oids.go index 6b339ab1312d..d9cce10acf21 100644 --- a/pkg/sql/sem/builtins/fixed_oids.go +++ b/pkg/sql/sem/builtins/fixed_oids.go @@ -400,6 +400,7 @@ var builtinOidsBySignature = map[string]oid.Oid{ `crdb_internal.destroy_tenant(id: int) -> int`: 1304, `crdb_internal.destroy_tenant(id: int, synchronous: bool) -> int`: 1305, `crdb_internal.encode_key(table_id: int, index_id: int, row_tuple: anyelement) -> bytes`: 1307, + `crdb_internal.fingerprint(span: bytes[], start_time: timestamptz, all_revisions: bool) -> int`: 2045, `crdb_internal.filter_multiregion_fields_from_zone_config_sql(val: string) -> string`: 1366, `crdb_internal.force_assertion_error(msg: string) -> int`: 1311, `crdb_internal.force_delete_table_data(id: int) -> bool`: 1369, diff --git a/pkg/storage/mvcc.go b/pkg/storage/mvcc.go index 1f6a7cd7ae85..93a00fd5f6aa 100644 --- a/pkg/storage/mvcc.go +++ b/pkg/storage/mvcc.go @@ -5833,7 +5833,7 @@ func MVCCIsSpanEmpty( func MVCCExportFingerprint( ctx context.Context, cs *cluster.Settings, reader Reader, opts MVCCExportOptions, dest io.Writer, ) (roachpb.BulkOpSummary, MVCCKey, uint64, error) { - ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportToSST") + ctx, span := tracing.ChildSpan(ctx, "storage.MVCCExportFingerprint") defer span.Finish() hasher := fnv.New64()