Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

batcheval: absorb command registration #20329

Merged
merged 1 commit into from
Nov 30, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand All @@ -26,10 +25,7 @@ import (
)

func init() {
storage.SetAddSSTableCmd(storage.Command{
DeclareKeys: batcheval.DefaultDeclareKeys,
Eval: evalAddSSTable,
})
batcheval.RegisterCommand(roachpb.AddSSTable, batcheval.DefaultDeclareKeys, evalAddSSTable)
}

func evalAddSSTable(
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand All @@ -43,10 +42,7 @@ const ExportRequestLimit = 5
var exportRequestLimiter = makeConcurrentRequestLimiter(ExportRequestLimit)

func init() {
storage.SetExportCmd(storage.Command{
DeclareKeys: declareKeysExport,
Eval: evalExport,
})
batcheval.RegisterCommand(roachpb.Export, declareKeysExport, evalExport)
}

func declareKeysExport(
Expand Down
11 changes: 4 additions & 7 deletions pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

func init() {
storage.SetWriteBatchCmd(storage.Command{
DeclareKeys: batcheval.DefaultDeclareKeys,
Eval: evalWriteBatch,
})
batcheval.RegisterCommand(roachpb.WriteBatch, batcheval.DefaultDeclareKeys, evalWriteBatch)
}

// evalWriteBatch applies the operations encoded in a BatchRepr. Any existing
Expand Down Expand Up @@ -115,10 +112,10 @@ func clearExistingData(
}

log.Eventf(ctx, "target key range not empty, will clear existing data: %+v", existingStats)
// If this is a SpanSetIterator, we have to unwrap it because
// If this is a Iterator, we have to unwrap it because
// ClearIterRange needs a plain rocksdb iterator (and can't unwrap
// it itself because of import cycles).
if ssi, ok := iter.(*storage.SpanSetIterator); ok {
if ssi, ok := iter.(*spanset.Iterator); ok {
iter = ssi.Iterator()
}
// TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSpanSetBatch(t *testing.T) {
t.Fatalf("direct write failed: %s", err)
}

batch := makeSpanSetBatch(eng.NewBatch(), &ss)
batch := spanset.NewBatch(eng.NewBatch(), &ss)
defer batch.Close()

// Writes inside the range work. Write twice for later read testing.
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestSpanSetBatch(t *testing.T) {
if err := batch.Commit(true); err != nil {
t.Fatal(err)
}
iter := &SpanSetIterator{eng.NewIterator(false), &ss, nil, false}
iter := spanset.NewIterator(eng.NewIterator(false), &ss)
defer iter.Close()
iter.SeekReverse(outsideKey)
if _, err := iter.Valid(); !isReadSpanErr(err) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/batcheval/cmd_begin_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,34 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.BeginTransaction, declareKeysBeginTransaction, BeginTransaction)
}

// DeclareKeysWriteTransaction is the shared portion of
// declareKeys{Begin,End,Heartbeat}Transaction
func DeclareKeysWriteTransaction(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadWrite, roachpb.Span{
Key: keys.TransactionKey(req.Header().Key, header.Txn.ID),
})
}
}

func declareKeysBeginTransaction(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
DeclareKeysWriteTransaction(desc, header, req, spans)
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)})
}

// BeginTransaction writes the initial transaction record. Fails in
// the event that a transaction record is already written. This may
// occur if a transaction is started with a batch containing writes
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func init() {
RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum)
}

// Version numbers for Replica checksum computation. Requests fail unless the
// versions are compatible.
const (
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.ConditionalPut, DefaultDeclareKeys, ConditionalPut)
}

// ConditionalPut sets the value for a specified key only if
// the expected value matches. If not, the return value contains
// the actual value.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Delete, DefaultDeclareKeys, Delete)
}

// Delete deletes the key and value specified by key.
func Delete(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.DeleteRange, DefaultDeclareKeys, DeleteRange)
}

// DeleteRange deletes the range of key/value pairs specified by
// start and end keys.
func DeleteRange(
Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/batcheval/cmd_deprecated_verify_checksum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package batcheval

import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.DeprecatedVerifyChecksum, DefaultDeclareKeys, deprecatedVerifyChecksum)
}

func deprecatedVerifyChecksum(
context.Context, engine.ReadWriter, CommandArgs, roachpb.Response,
) (result.Result, error) {
return result.Result{}, nil
}
35 changes: 35 additions & 0 deletions pkg/storage/batcheval/cmd_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,54 @@ package batcheval
import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.GC, declareKeysGC, GC)
}

var gcBatchSize = settings.RegisterIntSetting("kv.gc.batch_size",
"maximum number of keys in a batch for MVCC garbage collection",
100000,
)

func declareKeysGC(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
// Intentionally don't call DefaultDeclareKeys: the key range in the header
// is usually the whole range (pending resolution of #7880).
gcr := req.(*roachpb.GCRequest)
for _, key := range gcr.Keys {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: key.Key})
}
// Be smart here about blocking on the threshold keys. The GC queue can send an empty
// request first to bump the thresholds, and then another one that actually does work
// but can avoid declaring these keys below.
if gcr.Threshold != (hlc.Timestamp{}) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
}
if gcr.TxnSpanGCThreshold != (hlc.Timestamp{}) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{
// TODO(bdarnell): since this must be checked by all
// reads, this should be factored out into a separate
// waiter which blocks only those reads far enough in the
// past to be affected by the in-flight GCRequest (i.e.
// normally none). This means this key would be special
// cased and not tracked by the command queue.
Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID),
})
}
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

// GC iterates through the list of keys to garbage collect
// specified in the arguments. MVCCGarbageCollect is invoked on each
// listed key along with the expiration timestamp. The GC metadata
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Get, DefaultDeclareKeys, Get)
}

// Get returns the value for a specified key.
func Get(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

func init() {
RegisterCommand(roachpb.HeartbeatTxn, declareKeysHeartbeatTransaction, HeartbeatTxn)
}

func declareKeysHeartbeatTransaction(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
DeclareKeysWriteTransaction(desc, header, req, spans)
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadOnly, roachpb.Span{
Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID),
})
}
}

// HeartbeatTxn updates the transaction status and heartbeat
// timestamp after receiving transaction heartbeat messages from
// coordinator. Returns the updated transaction.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Increment, DefaultDeclareKeys, Increment)
}

// Increment increments the value (interpreted as varint64 encoded) and
// returns the newly incremented value (encoded as varint64). If no value
// exists for the key, zero is incremented.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.InitPut, DefaultDeclareKeys, InitPut)
}

// InitPut sets the value for a specified key only if it doesn't exist. It
// returns a ConditionFailedError if the key exists with an existing value that
// is different from the value provided. If FailOnTombstone is set to true,
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
)

func declareKeysRequestLease(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)})
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

func newFailedLeaseTrigger(isTransfer bool) result.Result {
var trigger result.Result
trigger.Local.LeaseMetricsResult = new(result.LeaseMetricsType)
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/batcheval/cmd_lease_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ package batcheval
import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
)

func init() {
RegisterCommand(roachpb.LeaseInfo, declareKeysLeaseInfo, LeaseInfo)
}

func declareKeysLeaseInfo(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)})
}

// LeaseInfo returns information about the lease holder for the range.
func LeaseInfo(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.RequestLease, declareKeysRequestLease, RequestLease)
}

// RequestLease sets the range lease for this range. The command fails
// only if the desired start timestamp collides with a previous lease.
// Otherwise, the start timestamp is wound back to right after the expiration
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func init() {
RegisterCommand(roachpb.TransferLease, declareKeysRequestLease, TransferLease)
}

// TransferLease sets the lease holder for the range.
// Unlike with RequestLease(), the new lease is allowed to overlap the old one,
// the contract being that the transfer must have been initiated by the (soon
Expand Down
Loading