Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DNM] storage: factor out evaluation of BatchRequest #18796

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions pkg/ccl/storageccl/add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
Expand All @@ -31,8 +32,8 @@ func init() {
}

func evalAddSSTable(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
) (storage.EvalResult, error) {
ctx context.Context, batch engine.ReadWriter, cArgs batcheval.CommandArgs, _ roachpb.Response,
) (batcheval.Result, error) {
args := cArgs.Args.(*roachpb.AddSSTableRequest)
h := cArgs.Header
ms := cArgs.Stats
Expand All @@ -52,14 +53,14 @@ func evalAddSSTable(
defer existingIter.Close()
existingIter.Seek(mvccStartKey)
if ok, err := existingIter.Valid(); err != nil {
return storage.EvalResult{}, errors.Wrap(err, "computing existing stats")
return batcheval.Result{}, errors.Wrap(err, "computing existing stats")
} else if ok && existingIter.UnsafeKey().Less(mvccEndKey) {
log.Eventf(ctx, "target key range not empty, will merge existing data with sstable")
}
// This ComputeStats is cheap if the span is empty.
existingStats, err := existingIter.ComputeStats(mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "computing existing stats")
return batcheval.Result{}, errors.Wrap(err, "computing existing stats")
}
ms.Subtract(existingStats)

Expand All @@ -69,11 +70,11 @@ func evalAddSSTable(
stats, err := verifySSTable(
existingIter, args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "verifying sstable data")
return batcheval.Result{}, errors.Wrap(err, "verifying sstable data")
}
ms.Add(stats)

return storage.EvalResult{
return batcheval.Result{
Replicated: storagebase.ReplicatedEvalResult{
AddSSTable: &storagebase.ReplicatedEvalResult_AddSSTable{
Data: args.Data,
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/add_sstable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -290,7 +290,7 @@ func TestAddSSTableMVCCStats(t *testing.T) {
}()

nowNanos += rng.Int63n(1e9)
cArgs := storage.CommandArgs{
cArgs := batcheval.CommandArgs{
Header: roachpb.Header{
Timestamp: hlc.Timestamp{WallTime: nowNanos},
},
Expand Down
31 changes: 16 additions & 15 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/parser"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand Down Expand Up @@ -98,8 +99,8 @@ func (r *rowCounter) count(key roachpb.Key) error {
// evalExport dumps the requested keys into files of non-overlapping key ranges
// in a format suitable for bulk ingest.
func evalExport(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, resp roachpb.Response,
) (storage.EvalResult, error) {
ctx context.Context, batch engine.ReadWriter, cArgs batcheval.CommandArgs, resp roachpb.Response,
) (batcheval.Result, error) {
args := cArgs.Args.(*roachpb.ExportRequest)
h := cArgs.Header
reply := resp.(*roachpb.ExportResponse)
Expand All @@ -114,29 +115,29 @@ func evalExport(
// deletions in the incremental backup.
gcThreshold, err := cArgs.EvalCtx.GCThreshold()
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
if args.StartTime != (hlc.Timestamp{}) {
if !gcThreshold.Less(args.StartTime) {
return storage.EvalResult{}, errors.Errorf("start timestamp %v must be after replica GC threshold %v", args.StartTime, gcThreshold)
return batcheval.Result{}, errors.Errorf("start timestamp %v must be after replica GC threshold %v", args.StartTime, gcThreshold)
}
}

if err := exportRequestLimiter.beginLimitedRequest(ctx); err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
defer exportRequestLimiter.endLimitedRequest()
log.Infof(ctx, "export [%s,%s)", args.Key, args.EndKey)

exportStore, err := MakeExportStorage(ctx, args.Storage)
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
defer exportStore.Close()

sst, err := engine.MakeRocksDBSstFileWriter()
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
defer sst.Close()

Expand All @@ -150,7 +151,7 @@ func evalExport(
if err != nil {
// The error may be a WriteIntentError. In which case, returning it will
// cause this command to be retried.
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
if !ok || iter.UnsafeKey().Key.Compare(args.EndKey) >= 0 {
break
Expand All @@ -162,34 +163,34 @@ func evalExport(
}

if err := rows.count(iter.UnsafeKey().Key); err != nil {
return storage.EvalResult{}, errors.Wrapf(err, "decoding %s", iter.UnsafeKey())
return batcheval.Result{}, errors.Wrapf(err, "decoding %s", iter.UnsafeKey())
}
if err := sst.Add(engine.MVCCKeyValue{Key: iter.UnsafeKey(), Value: iter.UnsafeValue()}); err != nil {
return storage.EvalResult{}, errors.Wrapf(err, "adding key %s", iter.UnsafeKey())
return batcheval.Result{}, errors.Wrapf(err, "adding key %s", iter.UnsafeKey())
}
}

if sst.DataSize == 0 {
// Let the defer Close the sstable.
reply.Files = []roachpb.ExportResponse_File{}
return storage.EvalResult{}, nil
return batcheval.Result{}, nil
}
rows.BulkOpSummary.DataSize = sst.DataSize

sstContents, err := sst.Finish()
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}

// Compute the checksum before we upload and remove the local file.
checksum, err := SHA512ChecksumData(sstContents)
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}

filename := fmt.Sprintf("%d.sst", parser.GenerateUniqueInt(cArgs.EvalCtx.NodeID()))
if err := exportStore.WriteFile(ctx, filename, bytes.NewReader(sstContents)); err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}

reply.Files = []roachpb.ExportResponse_File{{
Expand All @@ -199,7 +200,7 @@ func evalExport(
Sha512: checksum,
}}

return storage.EvalResult{}, nil
return batcheval.Result{}, nil
}

// SHA512ChecksumData returns the SHA512 checksum of data.
Expand Down
3 changes: 2 additions & 1 deletion pkg/ccl/storageccl/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/util/humanizeutil"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -203,7 +204,7 @@ func (b *sstBatcher) Close() {
}

// evalImport bulk loads key/value entries.
func evalImport(ctx context.Context, cArgs storage.CommandArgs) (*roachpb.ImportResponse, error) {
func evalImport(ctx context.Context, cArgs batcheval.CommandArgs) (*roachpb.ImportResponse, error) {
args := cArgs.Args.(*roachpb.ImportRequest)
db := cArgs.EvalCtx.DB()
kr, err := MakeKeyRewriter(args.Rekeys)
Expand Down
15 changes: 8 additions & 7 deletions pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand All @@ -34,8 +35,8 @@ func init() {
// data in the affected keyrange is first cleared (not tombstoned), which makes
// this command idempotent.
func evalWriteBatch(
ctx context.Context, batch engine.ReadWriter, cArgs storage.CommandArgs, _ roachpb.Response,
) (storage.EvalResult, error) {
ctx context.Context, batch engine.ReadWriter, cArgs batcheval.CommandArgs, _ roachpb.Response,
) (batcheval.Result, error) {

args := cArgs.Args.(*roachpb.WriteBatchRequest)
h := cArgs.Header
Expand All @@ -52,7 +53,7 @@ func evalWriteBatch(
if args.DataSpan.Key.Compare(args.Key) < 0 || args.DataSpan.EndKey.Compare(args.EndKey) > 0 {
// TODO(dan): Add a new field in roachpb.Error, so the client can catch
// this and retry.
return storage.EvalResult{}, errors.New("data spans multiple ranges")
return batcheval.Result{}, errors.New("data spans multiple ranges")
}

mvccStartKey := engine.MVCCKey{Key: args.Key}
Expand All @@ -62,22 +63,22 @@ func evalWriteBatch(
// request header.
msBatch, err := engineccl.VerifyBatchRepr(args.Data, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
ms.Add(msBatch)

// Check if there was data in the affected keyrange. If so, delete it (and
// adjust the MVCCStats) before applying the WriteBatch data.
existingStats, err := clearExistingData(ctx, batch, mvccStartKey, mvccEndKey, h.Timestamp.WallTime)
if err != nil {
return storage.EvalResult{}, errors.Wrap(err, "clearing existing data")
return batcheval.Result{}, errors.Wrap(err, "clearing existing data")
}
ms.Subtract(existingStats)

if err := batch.ApplyBatchRepr(args.Data, false /* !sync */); err != nil {
return storage.EvalResult{}, err
return batcheval.Result{}, err
}
return storage.EvalResult{}, nil
return batcheval.Result{}, nil
}

func clearExistingData(
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/storageccl/writebatch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
Expand Down Expand Up @@ -133,7 +133,7 @@ func TestWriteBatchMVCCStats(t *testing.T) {
}
}

cArgs := storage.CommandArgs{
cArgs := batcheval.CommandArgs{
Args: &roachpb.WriteBatchRequest{
Span: span,
DataSpan: span,
Expand Down
Loading