Skip to content

Commit

Permalink
batcheval: absorb command registration
Browse files Browse the repository at this point in the history
You'll notice that `EndTransaction` is still lingering in `replica_command.go`,
but that will be done in a follow-up.

Touches cockroachdb#18779.
  • Loading branch information
tbg committed Nov 30, 2017
1 parent 602df28 commit 98419a5
Show file tree
Hide file tree
Showing 37 changed files with 497 additions and 298 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/add_sstable.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand All @@ -26,10 +25,7 @@ import (
)

func init() {
storage.SetAddSSTableCmd(storage.Command{
DeclareKeys: batcheval.DefaultDeclareKeys,
Eval: evalAddSSTable,
})
batcheval.RegisterCommand(roachpb.AddSSTable, batcheval.DefaultDeclareKeys, evalAddSSTable)
}

func evalAddSSTable(
Expand Down
6 changes: 1 addition & 5 deletions pkg/ccl/storageccl/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/sem/builtins"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
Expand All @@ -43,10 +42,7 @@ const ExportRequestLimit = 5
var exportRequestLimiter = makeConcurrentRequestLimiter(ExportRequestLimit)

func init() {
storage.SetExportCmd(storage.Command{
DeclareKeys: declareKeysExport,
Eval: evalExport,
})
batcheval.RegisterCommand(roachpb.Export, declareKeysExport, evalExport)
}

func declareKeysExport(
Expand Down
11 changes: 4 additions & 7 deletions pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,18 @@ import (

"github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/pkg/errors"
)

func init() {
storage.SetWriteBatchCmd(storage.Command{
DeclareKeys: batcheval.DefaultDeclareKeys,
Eval: evalWriteBatch,
})
batcheval.RegisterCommand(roachpb.WriteBatch, batcheval.DefaultDeclareKeys, evalWriteBatch)
}

// evalWriteBatch applies the operations encoded in a BatchRepr. Any existing
Expand Down Expand Up @@ -115,10 +112,10 @@ func clearExistingData(
}

log.Eventf(ctx, "target key range not empty, will clear existing data: %+v", existingStats)
// If this is a SpanSetIterator, we have to unwrap it because
// If this is a Iterator, we have to unwrap it because
// ClearIterRange needs a plain rocksdb iterator (and can't unwrap
// it itself because of import cycles).
if ssi, ok := iter.(*storage.SpanSetIterator); ok {
if ssi, ok := iter.(*spanset.Iterator); ok {
iter = ssi.Iterator()
}
// TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/batch_spanset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func TestSpanSetBatch(t *testing.T) {
t.Fatalf("direct write failed: %s", err)
}

batch := makeSpanSetBatch(eng.NewBatch(), &ss)
batch := spanset.NewBatch(eng.NewBatch(), &ss)
defer batch.Close()

// Writes inside the range work. Write twice for later read testing.
Expand Down Expand Up @@ -153,7 +153,7 @@ func TestSpanSetBatch(t *testing.T) {
if err := batch.Commit(true); err != nil {
t.Fatal(err)
}
iter := &SpanSetIterator{eng.NewIterator(false), &ss, nil, false}
iter := spanset.NewIterator(eng.NewIterator(false), &ss)
defer iter.Close()
iter.SeekReverse(outsideKey)
if _, err := iter.Valid(); !isReadSpanErr(err) {
Expand Down
25 changes: 25 additions & 0 deletions pkg/storage/batcheval/cmd_begin_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,34 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.BeginTransaction, declareKeysBeginTransaction, BeginTransaction)
}

// DeclareKeysWriteTransaction is the shared portion of
// declareKeys{Begin,End,Heartbeat}Transaction
func DeclareKeysWriteTransaction(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadWrite, roachpb.Span{
Key: keys.TransactionKey(req.Header().Key, header.Txn.ID),
})
}
}

func declareKeysBeginTransaction(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
DeclareKeysWriteTransaction(desc, header, req, spans)
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)})
}

// BeginTransaction writes the initial transaction record. Fails in
// the event that a transaction record is already written. This may
// occur if a transaction is started with a batch containing writes
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_compute_checksum.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func init() {
RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum)
}

// Version numbers for Replica checksum computation. Requests fail unless the
// versions are compatible.
const (
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_conditional_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.ConditionalPut, DefaultDeclareKeys, ConditionalPut)
}

// ConditionalPut sets the value for a specified key only if
// the expected value matches. If not, the return value contains
// the actual value.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Delete, DefaultDeclareKeys, Delete)
}

// Delete deletes the key and value specified by key.
func Delete(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_delete_range.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.DeleteRange, DefaultDeclareKeys, DeleteRange)
}

// DeleteRange deletes the range of key/value pairs specified by
// start and end keys.
func DeleteRange(
Expand Down
33 changes: 33 additions & 0 deletions pkg/storage/batcheval/cmd_deprecated_verify_checksum.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2014 The Cockroach Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
// implied. See the License for the specific language governing
// permissions and limitations under the License.

package batcheval

import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.DeprecatedVerifyChecksum, DefaultDeclareKeys, deprecatedVerifyChecksum)
}

func deprecatedVerifyChecksum(
context.Context, engine.ReadWriter, CommandArgs, roachpb.Response,
) (result.Result, error) {
return result.Result{}, nil
}
35 changes: 35 additions & 0 deletions pkg/storage/batcheval/cmd_gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,54 @@ package batcheval
import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
)

func init() {
RegisterCommand(roachpb.GC, declareKeysGC, GC)
}

var gcBatchSize = settings.RegisterIntSetting("kv.gc.batch_size",
"maximum number of keys in a batch for MVCC garbage collection",
100000,
)

func declareKeysGC(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
// Intentionally don't call DefaultDeclareKeys: the key range in the header
// is usually the whole range (pending resolution of #7880).
gcr := req.(*roachpb.GCRequest)
for _, key := range gcr.Keys {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: key.Key})
}
// Be smart here about blocking on the threshold keys. The GC queue can send an empty
// request first to bump the thresholds, and then another one that actually does work
// but can avoid declaring these keys below.
if gcr.Threshold != (hlc.Timestamp{}) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)})
}
if gcr.TxnSpanGCThreshold != (hlc.Timestamp{}) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{
// TODO(bdarnell): since this must be checked by all
// reads, this should be factored out into a separate
// waiter which blocks only those reads far enough in the
// past to be affected by the in-flight GCRequest (i.e.
// normally none). This means this key would be special
// cased and not tracked by the command queue.
Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID),
})
}
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

// GC iterates through the list of keys to garbage collect
// specified in the arguments. MVCCGarbageCollect is invoked on each
// listed key along with the expiration timestamp. The GC metadata
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_get.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Get, DefaultDeclareKeys, Get)
}

// Get returns the value for a specified key.
func Get(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
17 changes: 17 additions & 0 deletions pkg/storage/batcheval/cmd_heartbeat_txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,27 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/pkg/errors"
)

func init() {
RegisterCommand(roachpb.HeartbeatTxn, declareKeysHeartbeatTransaction, HeartbeatTxn)
}

func declareKeysHeartbeatTransaction(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
DeclareKeysWriteTransaction(desc, header, req, spans)
if header.Txn != nil {
header.Txn.AssertInitialized(context.TODO())
spans.Add(spanset.SpanReadOnly, roachpb.Span{
Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID),
})
}
}

// HeartbeatTxn updates the transaction status and heartbeat
// timestamp after receiving transaction heartbeat messages from
// coordinator. Returns the updated transaction.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_increment.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.Increment, DefaultDeclareKeys, Increment)
}

// Increment increments the value (interpreted as varint64 encoded) and
// returns the newly incremented value (encoded as varint64). If no value
// exists for the key, zero is incremented.
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_init_put.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.InitPut, DefaultDeclareKeys, InitPut)
}

// InitPut sets the value for a specified key only if it doesn't exist. It
// returns a ConditionFailedError if the key exists with an existing value that
// is different from the value provided. If FailOnTombstone is set to true,
Expand Down
9 changes: 9 additions & 0 deletions pkg/storage/batcheval/cmd_lease.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,22 @@ import (

"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
)

func declareKeysRequestLease(
desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)})
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)})
}

func newFailedLeaseTrigger(isTransfer bool) result.Result {
var trigger result.Result
trigger.Local.LeaseMetricsResult = new(result.LeaseMetricsType)
Expand Down
12 changes: 12 additions & 0 deletions pkg/storage/batcheval/cmd_lease_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,23 @@ package batcheval
import (
"golang.org/x/net/context"

"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/batcheval/result"
"github.com/cockroachdb/cockroach/pkg/storage/engine"
"github.com/cockroachdb/cockroach/pkg/storage/spanset"
)

func init() {
RegisterCommand(roachpb.LeaseInfo, declareKeysLeaseInfo, LeaseInfo)
}

func declareKeysLeaseInfo(
_ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet,
) {
spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)})
}

// LeaseInfo returns information about the lease holder for the range.
func LeaseInfo(
ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response,
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_lease_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/storage/engine"
)

func init() {
RegisterCommand(roachpb.RequestLease, declareKeysRequestLease, RequestLease)
}

// RequestLease sets the range lease for this range. The command fails
// only if the desired start timestamp collides with a previous lease.
// Otherwise, the start timestamp is wound back to right after the expiration
Expand Down
4 changes: 4 additions & 0 deletions pkg/storage/batcheval/cmd_lease_transfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log"
)

func init() {
RegisterCommand(roachpb.TransferLease, declareKeysRequestLease, TransferLease)
}

// TransferLease sets the lease holder for the range.
// Unlike with RequestLease(), the new lease is allowed to overlap the old one,
// the contract being that the transfer must have been initiated by the (soon
Expand Down
Loading

0 comments on commit 98419a5

Please sign in to comment.