Skip to content

Commit

Permalink
builtins: add crdb_internal.fingerprint builtin
Browse files Browse the repository at this point in the history
This change adds a `crdb_internal.fingerprint` builtin
that accepts a `startTime`, `endTime`, `startKey` and `endKey`
to define the interval the user wants to fingerprint. The builtin
is powered by sending an ExportRequest with the defined intervals
but with the `ExportFingerprint` option set to true.

Setting this option on the ExportRequest means that instead of
writing all point and rangekeys to an SST and sending them back to
the client, command evaluation will use the newly introduced
`fingerprintWriter` (#90848) when exporting keys. This writer
computes an `fnv64` hash of the key/timestamp, value for each point key
and maintains a running XOR aggregate of all the point keys processed
as part of the ExportRequest. Rangekeys are not fingerprinted during
command evaluation, but instead returned to the client in a
pebble SST. This is because range keys do not have a stable,
discrete identity and so it is up to the caller to define a deterministic
ingerprinting scheme across all returned range keys.

The ExportRequest sent as part of this builtin does not set any DistSender
limit, thereby allowing concurrent execution across ranges. We are not
concerned about the ExportResponses growing too large since the SSTs
will only contain rangekeys that should be few in number. If this assumption
is proved incorrect in the future, we can revisit setting a `TargetBytes`
to the header of the BatchRequest.

Fixes: #89336

Release note: None
  • Loading branch information
adityamaru committed Nov 17, 2022
1 parent 3830d63 commit 084cc68
Show file tree
Hide file tree
Showing 8 changed files with 503 additions and 21 deletions.
2 changes: 2 additions & 0 deletions docs/generated/sql/functions.md
Original file line number Diff line number Diff line change
Expand Up @@ -3060,6 +3060,8 @@ may increase either contention or retry errors, or both.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.encode_key"></a><code>crdb_internal.encode_key(table_id: <a href="int.html">int</a>, index_id: <a href="int.html">int</a>, row_tuple: anyelement) &rarr; <a href="bytes.html">bytes</a></code></td><td><span class="funcdesc"><p>Generate the key for a row on a particular table and index.</p>
</span></td><td>Stable</td></tr>
<tr><td><a name="crdb_internal.fingerprint"></a><code>crdb_internal.fingerprint(span: <a href="bytes.html">bytes</a>[], start_time: <a href="timestamp.html">timestamptz</a>, all_revisions: <a href="bool.html">bool</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Immutable</td></tr>
<tr><td><a name="crdb_internal.force_assertion_error"></a><code>crdb_internal.force_assertion_error(msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
</span></td><td>Volatile</td></tr>
<tr><td><a name="crdb_internal.force_error"></a><code>crdb_internal.force_error(errorCode: <a href="string.html">string</a>, msg: <a href="string.html">string</a>) &rarr; <a href="int.html">int</a></code></td><td><span class="funcdesc"><p>This function is used only by CockroachDB’s developers for testing purposes.</p>
Expand Down
43 changes: 26 additions & 17 deletions pkg/kv/kvserver/batcheval/cmd_export.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,19 +181,27 @@ func evalExport(
var curSizeOfExportedSSTs int64
for start := args.Key; start != nil; {
destFile := &storage.MemFile{}
summary, resume, err := storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader,
storage.MVCCExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
MaxIntents: maxIntents,
StopMidKey: args.SplitMidKey,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}, destFile)
opts := storage.MVCCExportOptions{
StartKey: storage.MVCCKey{Key: start, Timestamp: resumeKeyTS},
EndKey: args.EndKey,
StartTS: args.StartTime,
EndTS: h.Timestamp,
ExportAllRevisions: exportAllRevisions,
TargetSize: targetSize,
MaxSize: maxSize,
MaxIntents: maxIntents,
StopMidKey: args.SplitMidKey,
ResourceLimiter: storage.NewResourceLimiter(storage.ResourceLimiterOptions{MaxRunTime: maxRunTime}, timeutil.DefaultTimeSource{}),
}
var summary roachpb.BulkOpSummary
var resume storage.MVCCKey
var fingerprint uint64
var err error
if args.ExportFingerprint {
summary, resume, fingerprint, err = storage.MVCCExportFingerprint(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
} else {
summary, resume, err = storage.MVCCExportToSST(ctx, cArgs.EvalCtx.ClusterSettings(), reader, opts, destFile)
}
if err != nil {
if errors.HasType(err, (*storage.ExceedMaxSizeError)(nil)) {
err = errors.WithHintf(err,
Expand All @@ -217,10 +225,11 @@ func evalExport(
span.EndKey = args.EndKey
}
exported := roachpb.ExportResponse_File{
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
Span: span,
EndKeyTS: resume.Timestamp,
Exported: summary,
SST: data,
Fingerprint: fingerprint,
}
reply.Files = append(reply.Files, exported)
start = resume.Key
Expand Down
30 changes: 27 additions & 3 deletions pkg/roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -1546,7 +1546,21 @@ message ExportRequest {
// then there is no limit.
int64 target_file_size = 10;

// ExportFingerprint when set to true will result in ExportRequest command
// evaluation generating an fnv64 hash for every key/timestamp and value, for
// point keys encountered in the key/time interval. Each KV hash will be
// combined via a XOR into a running aggregate that is returned as part of the
// ExportResponse.
//
// Range keys are not fingerprinted but instead written to a pebble SST that
// is returned to the caller. This is because range keys do not have a stable,
// discrete identity and so it is up to the caller to define a deterministic
// fingerprinting scheme across all returned range keys.
bool export_fingerprint = 14;

reserved 2, 5, 7, 8, 11;

// Next ID: 15
}

// BulkOpSummary summarizes the data processed by an operation, counting the
Expand Down Expand Up @@ -1586,14 +1600,24 @@ message ExportResponse {
(gogoproto.customname) = "EndKeyTS"
];
string path = 2;
reserved 3;
reserved 4;
reserved 5;

BulkOpSummary exported = 6 [(gogoproto.nullable) = false];

bytes sst = 7 [(gogoproto.customname) = "SST"];
string locality_kv = 8 [(gogoproto.customname) = "LocalityKV"];

// Fingerprint is the XOR aggregate of the fnv64 hash of every point
// key/timestamp and corresponding value that has been exported as part of
// the ExportRequest. This field is only set when the request is sent with
// `ExportFingerprint` set to true.
//
// Range keys are not fingerprinted but instead written to the sst above
// that is returned to the caller. This is because range keys do not have a
// stable, discrete identity and so it is up to the caller to define a
// deterministic fingerprinting scheme across all returned range keys.
uint64 fingerprint = 10;

reserved 3, 4, 5;
}

ResponseHeader header = 1 [(gogoproto.nullable) = false, (gogoproto.embed) = true];
Expand Down
11 changes: 11 additions & 0 deletions pkg/sql/sem/builtins/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ go_library(
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvclient",
"//pkg/kv/kvserver/concurrency/lock",
"//pkg/kv/kvserver/kvserverbase",
"//pkg/repstream/streampb",
"//pkg/roachpb",
Expand Down Expand Up @@ -88,7 +89,9 @@ go_library(
"//pkg/sql/storageparam",
"//pkg/sql/storageparam/indexstorageparam",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/util",
"//pkg/util/admission/admissionpb",
"//pkg/util/arith",
"//pkg/util/bitarray",
"//pkg/util/contextutil",
Expand Down Expand Up @@ -142,6 +145,7 @@ go_test(
"all_builtins_test.go",
"builtins_test.go",
"datums_to_bytes_builtin_test.go",
"fingerprint_builtin_test.go",
"generator_builtins_test.go",
"geo_builtins_test.go",
"help_test.go",
Expand All @@ -158,6 +162,8 @@ go_test(
"//pkg/base",
"//pkg/keys",
"//pkg/kv",
"//pkg/kv/kvserver",
"//pkg/roachpb",
"//pkg/security/securityassets",
"//pkg/security/securitytest",
"//pkg/server",
Expand All @@ -173,16 +179,21 @@ go_test(
"//pkg/sql/sem/tree/treewindow",
"//pkg/sql/sem/volatility",
"//pkg/sql/types",
"//pkg/storage",
"//pkg/testutils",
"//pkg/testutils/serverutils",
"//pkg/testutils/skip",
"//pkg/testutils/sqlutils",
"//pkg/testutils/storageutils",
"//pkg/testutils/testcluster",
"//pkg/util",
"//pkg/util/duration",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
"//pkg/util/mon",
"//pkg/util/randutil",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_lib_pq//:pq",
"@com_github_stretchr_testify//assert",
Expand Down
Loading

0 comments on commit 084cc68

Please sign in to comment.