diff --git a/pkg/ccl/storageccl/add_sstable.go b/pkg/ccl/storageccl/add_sstable.go index 29fd883c8eeb..5bf33eeec896 100644 --- a/pkg/ccl/storageccl/add_sstable.go +++ b/pkg/ccl/storageccl/add_sstable.go @@ -14,7 +14,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" @@ -26,10 +25,7 @@ import ( ) func init() { - storage.SetAddSSTableCmd(storage.Command{ - DeclareKeys: batcheval.DefaultDeclareKeys, - Eval: evalAddSSTable, - }) + batcheval.RegisterCommand(roachpb.AddSSTable, batcheval.DefaultDeclareKeys, evalAddSSTable) } func evalAddSSTable( diff --git a/pkg/ccl/storageccl/export.go b/pkg/ccl/storageccl/export.go index e89a74b4ac52..a5f20be1d78c 100644 --- a/pkg/ccl/storageccl/export.go +++ b/pkg/ccl/storageccl/export.go @@ -19,7 +19,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/sem/builtins" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" @@ -43,10 +42,7 @@ const ExportRequestLimit = 5 var exportRequestLimiter = makeConcurrentRequestLimiter(ExportRequestLimit) func init() { - storage.SetExportCmd(storage.Command{ - DeclareKeys: declareKeysExport, - Eval: evalExport, - }) + batcheval.RegisterCommand(roachpb.Export, declareKeysExport, evalExport) } func declareKeysExport( diff --git a/pkg/ccl/storageccl/writebatch.go b/pkg/ccl/storageccl/writebatch.go index 42208dbbc5ba..5ce796e7f186 100644 --- a/pkg/ccl/storageccl/writebatch.go +++ b/pkg/ccl/storageccl/writebatch.go @@ -15,21 +15,18 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/storageccl/engineccl" "github.com/cockroachdb/cockroach/pkg/roachpb" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/cockroachdb/cockroach/pkg/util/tracing" "github.com/pkg/errors" ) func init() { - storage.SetWriteBatchCmd(storage.Command{ - DeclareKeys: batcheval.DefaultDeclareKeys, - Eval: evalWriteBatch, - }) + batcheval.RegisterCommand(roachpb.WriteBatch, batcheval.DefaultDeclareKeys, evalWriteBatch) } // evalWriteBatch applies the operations encoded in a BatchRepr. Any existing @@ -115,10 +112,10 @@ func clearExistingData( } log.Eventf(ctx, "target key range not empty, will clear existing data: %+v", existingStats) - // If this is a SpanSetIterator, we have to unwrap it because + // If this is a Iterator, we have to unwrap it because // ClearIterRange needs a plain rocksdb iterator (and can't unwrap // it itself because of import cycles). - if ssi, ok := iter.(*storage.SpanSetIterator); ok { + if ssi, ok := iter.(*spanset.Iterator); ok { iter = ssi.Iterator() } // TODO(dan): Ideally, this would use `batch.ClearRange` but it doesn't diff --git a/pkg/storage/batch_spanset_test.go b/pkg/storage/batch_spanset_test.go index ff913dc502b3..19e6766e006b 100644 --- a/pkg/storage/batch_spanset_test.go +++ b/pkg/storage/batch_spanset_test.go @@ -49,7 +49,7 @@ func TestSpanSetBatch(t *testing.T) { t.Fatalf("direct write failed: %s", err) } - batch := makeSpanSetBatch(eng.NewBatch(), &ss) + batch := spanset.NewBatch(eng.NewBatch(), &ss) defer batch.Close() // Writes inside the range work. Write twice for later read testing. @@ -153,7 +153,7 @@ func TestSpanSetBatch(t *testing.T) { if err := batch.Commit(true); err != nil { t.Fatal(err) } - iter := &SpanSetIterator{eng.NewIterator(false), &ss, nil, false} + iter := spanset.NewIterator(eng.NewIterator(false), &ss) defer iter.Close() iter.SeekReverse(outsideKey) if _, err := iter.Valid(); !isReadSpanErr(err) { diff --git a/pkg/storage/batcheval/cmd_begin_transaction.go b/pkg/storage/batcheval/cmd_begin_transaction.go index 7a21fd6ec78c..dabff7cd68fa 100644 --- a/pkg/storage/batcheval/cmd_begin_transaction.go +++ b/pkg/storage/batcheval/cmd_begin_transaction.go @@ -23,9 +23,34 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) +func init() { + RegisterCommand(roachpb.BeginTransaction, declareKeysBeginTransaction, BeginTransaction) +} + +// DeclareKeysWriteTransaction is the shared portion of +// declareKeys{Begin,End,Heartbeat}Transaction +func DeclareKeysWriteTransaction( + _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + if header.Txn != nil { + header.Txn.AssertInitialized(context.TODO()) + spans.Add(spanset.SpanReadWrite, roachpb.Span{ + Key: keys.TransactionKey(req.Header().Key, header.Txn.ID), + }) + } +} + +func declareKeysBeginTransaction( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + DeclareKeysWriteTransaction(desc, header, req, spans) + spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)}) +} + // BeginTransaction writes the initial transaction record. Fails in // the event that a transaction record is already written. This may // occur if a transaction is started with a batch containing writes diff --git a/pkg/storage/batcheval/cmd_compute_checksum.go b/pkg/storage/batcheval/cmd_compute_checksum.go index 42327c2b1bbd..85df01acb22e 100644 --- a/pkg/storage/batcheval/cmd_compute_checksum.go +++ b/pkg/storage/batcheval/cmd_compute_checksum.go @@ -25,6 +25,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +func init() { + RegisterCommand(roachpb.ComputeChecksum, DefaultDeclareKeys, ComputeChecksum) +} + // Version numbers for Replica checksum computation. Requests fail unless the // versions are compatible. const ( diff --git a/pkg/storage/batcheval/cmd_conditional_put.go b/pkg/storage/batcheval/cmd_conditional_put.go index ecb452c91cf2..9f615babc5b0 100644 --- a/pkg/storage/batcheval/cmd_conditional_put.go +++ b/pkg/storage/batcheval/cmd_conditional_put.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.ConditionalPut, DefaultDeclareKeys, ConditionalPut) +} + // ConditionalPut sets the value for a specified key only if // the expected value matches. If not, the return value contains // the actual value. diff --git a/pkg/storage/batcheval/cmd_delete.go b/pkg/storage/batcheval/cmd_delete.go index f5002ab6f719..dac27bb472aa 100644 --- a/pkg/storage/batcheval/cmd_delete.go +++ b/pkg/storage/batcheval/cmd_delete.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.Delete, DefaultDeclareKeys, Delete) +} + // Delete deletes the key and value specified by key. func Delete( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, diff --git a/pkg/storage/batcheval/cmd_delete_range.go b/pkg/storage/batcheval/cmd_delete_range.go index cd623d2e8eee..23fe23ea7723 100644 --- a/pkg/storage/batcheval/cmd_delete_range.go +++ b/pkg/storage/batcheval/cmd_delete_range.go @@ -23,6 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) +func init() { + RegisterCommand(roachpb.DeleteRange, DefaultDeclareKeys, DeleteRange) +} + // DeleteRange deletes the range of key/value pairs specified by // start and end keys. func DeleteRange( diff --git a/pkg/storage/batcheval/cmd_deprecated_verify_checksum.go b/pkg/storage/batcheval/cmd_deprecated_verify_checksum.go new file mode 100644 index 000000000000..e4b095c264d5 --- /dev/null +++ b/pkg/storage/batcheval/cmd_deprecated_verify_checksum.go @@ -0,0 +1,33 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package batcheval + +import ( + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/engine" +) + +func init() { + RegisterCommand(roachpb.DeprecatedVerifyChecksum, DefaultDeclareKeys, deprecatedVerifyChecksum) +} + +func deprecatedVerifyChecksum( + context.Context, engine.ReadWriter, CommandArgs, roachpb.Response, +) (result.Result, error) { + return result.Result{}, nil +} diff --git a/pkg/storage/batcheval/cmd_gc.go b/pkg/storage/batcheval/cmd_gc.go index 7448e74ec3f2..75fa09707213 100644 --- a/pkg/storage/batcheval/cmd_gc.go +++ b/pkg/storage/batcheval/cmd_gc.go @@ -17,19 +17,54 @@ package batcheval import ( "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/settings" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/hlc" ) +func init() { + RegisterCommand(roachpb.GC, declareKeysGC, GC) +} + var gcBatchSize = settings.RegisterIntSetting("kv.gc.batch_size", "maximum number of keys in a batch for MVCC garbage collection", 100000, ) +func declareKeysGC( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + // Intentionally don't call DefaultDeclareKeys: the key range in the header + // is usually the whole range (pending resolution of #7880). + gcr := req.(*roachpb.GCRequest) + for _, key := range gcr.Keys { + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}) + } + // Be smart here about blocking on the threshold keys. The GC queue can send an empty + // request first to bump the thresholds, and then another one that actually does work + // but can avoid declaring these keys below. + if gcr.Threshold != (hlc.Timestamp{}) { + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) + } + if gcr.TxnSpanGCThreshold != (hlc.Timestamp{}) { + spans.Add(spanset.SpanReadWrite, roachpb.Span{ + // TODO(bdarnell): since this must be checked by all + // reads, this should be factored out into a separate + // waiter which blocks only those reads far enough in the + // past to be affected by the in-flight GCRequest (i.e. + // normally none). This means this key would be special + // cased and not tracked by the command queue. + Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID), + }) + } + spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) +} + // GC iterates through the list of keys to garbage collect // specified in the arguments. MVCCGarbageCollect is invoked on each // listed key along with the expiration timestamp. The GC metadata diff --git a/pkg/storage/batcheval/cmd_get.go b/pkg/storage/batcheval/cmd_get.go index 699b045a7599..7f4330d5e0b7 100644 --- a/pkg/storage/batcheval/cmd_get.go +++ b/pkg/storage/batcheval/cmd_get.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.Get, DefaultDeclareKeys, Get) +} + // Get returns the value for a specified key. func Get( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, diff --git a/pkg/storage/batcheval/cmd_heartbeat_txn.go b/pkg/storage/batcheval/cmd_heartbeat_txn.go index a4ec69e8a4d4..f3f98766173b 100644 --- a/pkg/storage/batcheval/cmd_heartbeat_txn.go +++ b/pkg/storage/batcheval/cmd_heartbeat_txn.go @@ -21,10 +21,27 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/pkg/errors" ) +func init() { + RegisterCommand(roachpb.HeartbeatTxn, declareKeysHeartbeatTransaction, HeartbeatTxn) +} + +func declareKeysHeartbeatTransaction( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + DeclareKeysWriteTransaction(desc, header, req, spans) + if header.Txn != nil { + header.Txn.AssertInitialized(context.TODO()) + spans.Add(spanset.SpanReadOnly, roachpb.Span{ + Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID), + }) + } +} + // HeartbeatTxn updates the transaction status and heartbeat // timestamp after receiving transaction heartbeat messages from // coordinator. Returns the updated transaction. diff --git a/pkg/storage/batcheval/cmd_increment.go b/pkg/storage/batcheval/cmd_increment.go index a0db34ed2f16..965bc2884b8a 100644 --- a/pkg/storage/batcheval/cmd_increment.go +++ b/pkg/storage/batcheval/cmd_increment.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.Increment, DefaultDeclareKeys, Increment) +} + // Increment increments the value (interpreted as varint64 encoded) and // returns the newly incremented value (encoded as varint64). If no value // exists for the key, zero is incremented. diff --git a/pkg/storage/batcheval/cmd_init_put.go b/pkg/storage/batcheval/cmd_init_put.go index 68163ba957e4..76e27dc203f5 100644 --- a/pkg/storage/batcheval/cmd_init_put.go +++ b/pkg/storage/batcheval/cmd_init_put.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.InitPut, DefaultDeclareKeys, InitPut) +} + // InitPut sets the value for a specified key only if it doesn't exist. It // returns a ConditionFailedError if the key exists with an existing value that // is different from the value provided. If FailOnTombstone is set to true, diff --git a/pkg/storage/batcheval/cmd_lease.go b/pkg/storage/batcheval/cmd_lease.go index 14c912a64eca..5f76be7b8b35 100644 --- a/pkg/storage/batcheval/cmd_lease.go +++ b/pkg/storage/batcheval/cmd_lease.go @@ -19,13 +19,22 @@ import ( "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" ) +func declareKeysRequestLease( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) + spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) +} + func newFailedLeaseTrigger(isTransfer bool) result.Result { var trigger result.Result trigger.Local.LeaseMetricsResult = new(result.LeaseMetricsType) diff --git a/pkg/storage/batcheval/cmd_lease_info.go b/pkg/storage/batcheval/cmd_lease_info.go index 44b059b15f74..d11104c50bcd 100644 --- a/pkg/storage/batcheval/cmd_lease_info.go +++ b/pkg/storage/batcheval/cmd_lease_info.go @@ -17,11 +17,23 @@ package batcheval import ( "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" ) +func init() { + RegisterCommand(roachpb.LeaseInfo, declareKeysLeaseInfo, LeaseInfo) +} + +func declareKeysLeaseInfo( + _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) +} + // LeaseInfo returns information about the lease holder for the range. func LeaseInfo( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, diff --git a/pkg/storage/batcheval/cmd_lease_request.go b/pkg/storage/batcheval/cmd_lease_request.go index de7f07209cac..f5c84c3e281f 100644 --- a/pkg/storage/batcheval/cmd_lease_request.go +++ b/pkg/storage/batcheval/cmd_lease_request.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.RequestLease, declareKeysRequestLease, RequestLease) +} + // RequestLease sets the range lease for this range. The command fails // only if the desired start timestamp collides with a previous lease. // Otherwise, the start timestamp is wound back to right after the expiration diff --git a/pkg/storage/batcheval/cmd_lease_transfer.go b/pkg/storage/batcheval/cmd_lease_transfer.go index 3b4e4a4650d5..62bcd3d6727d 100644 --- a/pkg/storage/batcheval/cmd_lease_transfer.go +++ b/pkg/storage/batcheval/cmd_lease_transfer.go @@ -23,6 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log" ) +func init() { + RegisterCommand(roachpb.TransferLease, declareKeysRequestLease, TransferLease) +} + // TransferLease sets the lease holder for the range. // Unlike with RequestLease(), the new lease is allowed to overlap the old one, // the contract being that the transfer must have been initiated by the (soon diff --git a/pkg/storage/batcheval/cmd_merge.go b/pkg/storage/batcheval/cmd_merge.go index f00fb81b880d..e1235c0970c8 100644 --- a/pkg/storage/batcheval/cmd_merge.go +++ b/pkg/storage/batcheval/cmd_merge.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.Merge, DefaultDeclareKeys, Merge) +} + // Merge is used to merge a value into an existing key. Merge is an // efficient accumulation operation which is exposed by RocksDB, used // by CockroachDB for the efficient accumulation of certain diff --git a/pkg/storage/batcheval/cmd_push_txn.go b/pkg/storage/batcheval/cmd_push_txn.go index 5e91f480873d..ec111efda549 100644 --- a/pkg/storage/batcheval/cmd_push_txn.go +++ b/pkg/storage/batcheval/cmd_push_txn.go @@ -24,12 +24,25 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) +func init() { + RegisterCommand(roachpb.PushTxn, declareKeysPushTransaction, PushTxn) +} + +func declareKeysPushTransaction( + _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + pr := req.(*roachpb.PushTxnRequest) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(pr.PusheeTxn.Key, pr.PusheeTxn.ID)}) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, pr.PusheeTxn.ID)}) +} + // PushTxn resolves conflicts between concurrent txns (or // between a non-transactional reader or writer and a txn) in several // ways depending on the statuses and priorities of the conflicting diff --git a/pkg/storage/batcheval/cmd_put.go b/pkg/storage/batcheval/cmd_put.go index 7b5e6b54c577..14af9bf03b95 100644 --- a/pkg/storage/batcheval/cmd_put.go +++ b/pkg/storage/batcheval/cmd_put.go @@ -23,6 +23,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/hlc" ) +func init() { + RegisterCommand(roachpb.Put, DefaultDeclareKeys, Put) +} + // Put sets the value for a specified key. func Put( ctx context.Context, batch engine.ReadWriter, cArgs CommandArgs, resp roachpb.Response, diff --git a/pkg/storage/batcheval/cmd_query_txn.go b/pkg/storage/batcheval/cmd_query_txn.go index f75e43608a51..dd6572ec94da 100644 --- a/pkg/storage/batcheval/cmd_query_txn.go +++ b/pkg/storage/batcheval/cmd_query_txn.go @@ -27,6 +27,10 @@ import ( "github.com/pkg/errors" ) +func init() { + RegisterCommand(roachpb.QueryTxn, DefaultDeclareKeys, QueryTxn) +} + // QueryTxn fetches the current state of a transaction. // This method is used to continually update the state of a txn // which is blocked waiting to resolve a conflicting intent. It diff --git a/pkg/storage/batcheval/cmd_range_lookup.go b/pkg/storage/batcheval/cmd_range_lookup.go index 7d433a839729..b7e18a678b62 100644 --- a/pkg/storage/batcheval/cmd_range_lookup.go +++ b/pkg/storage/batcheval/cmd_range_lookup.go @@ -24,10 +24,22 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) +func init() { + RegisterCommand(roachpb.RangeLookup, declareKeysRangeLookup, RangeLookup) +} + +func declareKeysRangeLookup( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + DefaultDeclareKeys(desc, header, req, spans) + spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) +} + // RangeLookup is used to look up RangeDescriptors - a RangeDescriptor // is a metadata structure which describes the key range and replica locations // of a distinct range in the cluster. diff --git a/pkg/storage/batcheval/cmd_resolve_intent.go b/pkg/storage/batcheval/cmd_resolve_intent.go index 308be9188848..dd634323b512 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent.go +++ b/pkg/storage/batcheval/cmd_resolve_intent.go @@ -17,11 +17,41 @@ package batcheval import ( "golang.org/x/net/context" + "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" ) +func init() { + RegisterCommand(roachpb.ResolveIntent, declareKeysResolveIntent, ResolveIntent) +} + +func declareKeysResolveIntentCombined( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + DefaultDeclareKeys(desc, header, req, spans) + var args *roachpb.ResolveIntentRequest + switch t := req.(type) { + case *roachpb.ResolveIntentRequest: + args = t + case *roachpb.ResolveIntentRangeRequest: + // Ranged and point requests only differ in whether the header's EndKey + // is used, so we can convert them. + args = (*roachpb.ResolveIntentRequest)(t) + } + if WriteAbortSpanOnResolve(args.Status) { + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, args.IntentTxn.ID)}) + } +} + +func declareKeysResolveIntent( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + declareKeysResolveIntentCombined(desc, header, req, spans) +} + // ResolveIntent resolves a write intent from the specified key // according to the status of the transaction which created it. func ResolveIntent( diff --git a/pkg/storage/batcheval/cmd_resolve_intent_range.go b/pkg/storage/batcheval/cmd_resolve_intent_range.go index 6313088d8e6f..2c78d2fa4fb9 100644 --- a/pkg/storage/batcheval/cmd_resolve_intent_range.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_range.go @@ -22,8 +22,19 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" ) +func init() { + RegisterCommand(roachpb.ResolveIntentRange, declareKeysResolveIntentRange, ResolveIntentRange) +} + +func declareKeysResolveIntentRange( + desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + declareKeysResolveIntentCombined(desc, header, req, spans) +} + // ResolveIntentRange resolves write intents in the specified // key range according to the status of the transaction which created it. func ResolveIntentRange( diff --git a/pkg/storage/replica_command_test.go b/pkg/storage/batcheval/cmd_resolve_intent_test.go similarity index 56% rename from pkg/storage/replica_command_test.go rename to pkg/storage/batcheval/cmd_resolve_intent_test.go index df34db9171a8..aba7eb9e9faf 100644 --- a/pkg/storage/replica_command_test.go +++ b/pkg/storage/batcheval/cmd_resolve_intent_test.go @@ -12,7 +12,7 @@ // implied. See the License for the specific language governing // permissions and limitations under the License. -package storage +package batcheval import ( "fmt" @@ -21,17 +21,90 @@ import ( "golang.org/x/net/context" + opentracing "github.com/opentracing/opentracing-go" + + "github.com/cockroachdb/cockroach/pkg/internal/client" "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/storage/abortspan" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/storage/txnwait" "github.com/cockroachdb/cockroach/pkg/testutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/uuid" ) +type mockEvalCtx struct { + abortSpan *abortspan.AbortSpan +} + +func (m *mockEvalCtx) String() string { + return "mock" +} +func (m *mockEvalCtx) ClusterSettings() *cluster.Settings { + panic("unimplemented") +} +func (m *mockEvalCtx) EvalKnobs() TestingKnobs { + panic("unimplemented") +} +func (m *mockEvalCtx) Tracer() opentracing.Tracer { + panic("unimplemented") +} +func (m *mockEvalCtx) Engine() engine.Engine { + panic("unimplemented") +} +func (m *mockEvalCtx) DB() *client.DB { + panic("unimplemented") +} +func (m *mockEvalCtx) AbortSpan() *abortspan.AbortSpan { + return m.abortSpan +} +func (m *mockEvalCtx) GetTxnWaitQueue() *txnwait.Queue { + panic("unimplemented") +} +func (m *mockEvalCtx) NodeID() roachpb.NodeID { + panic("unimplemented") +} +func (m *mockEvalCtx) StoreID() roachpb.StoreID { + panic("unimplemented") +} +func (m *mockEvalCtx) GetRangeID() roachpb.RangeID { + panic("unimplemented") +} +func (m *mockEvalCtx) IsFirstRange() bool { + panic("unimplemented") +} +func (m *mockEvalCtx) GetFirstIndex() (uint64, error) { + panic("unimplemented") +} +func (m *mockEvalCtx) GetTerm(uint64) (uint64, error) { + panic("unimplemented") +} +func (m *mockEvalCtx) Desc() *roachpb.RangeDescriptor { + panic("unimplemented") +} +func (m *mockEvalCtx) ContainsKey(key roachpb.Key) bool { + panic("unimplemented") +} +func (m *mockEvalCtx) GetMVCCStats() enginepb.MVCCStats { + panic("unimplemented") +} +func (m *mockEvalCtx) GetGCThreshold() hlc.Timestamp { + panic("unimplemented") +} +func (m *mockEvalCtx) GetTxnSpanGCThreshold() hlc.Timestamp { + panic("unimplemented") +} +func (m *mockEvalCtx) GetLastReplicaGCTimestamp(context.Context) (hlc.Timestamp, error) { + panic("unimplemented") +} +func (m *mockEvalCtx) GetLease() (roachpb.Lease, *roachpb.Lease) { + panic("unimplemented") +} + func TestDeclareKeysResolveIntent(t *testing.T) { defer leaktest.AfterTest(t)() @@ -95,25 +168,25 @@ func TestDeclareKeysResolveIntent(t *testing.T) { var spans spanset.SpanSet batch := engine.NewBatch() - batch = makeSpanSetBatch(batch, &spans) + batch = spanset.NewBatch(batch, &spans) defer batch.Close() var h roachpb.Header h.RangeID = desc.RangeID - cArgs := batcheval.CommandArgs{Header: h} - cArgs.EvalCtx = &Replica{abortSpan: ac} + cArgs := CommandArgs{Header: h} + cArgs.EvalCtx = &mockEvalCtx{abortSpan: ac} if !ranged { cArgs.Args = &ri declareKeysResolveIntent(desc, h, &ri, &spans) - if _, err := batcheval.ResolveIntent(ctx, batch, cArgs, &roachpb.ResolveIntentResponse{}); err != nil { + if _, err := ResolveIntent(ctx, batch, cArgs, &roachpb.ResolveIntentResponse{}); err != nil { t.Fatal(err) } } else { cArgs.Args = &rir declareKeysResolveIntentRange(desc, h, &rir, &spans) - if _, err := batcheval.ResolveIntentRange(ctx, batch, cArgs, &roachpb.ResolveIntentRangeResponse{}); err != nil { + if _, err := ResolveIntentRange(ctx, batch, cArgs, &roachpb.ResolveIntentRangeResponse{}); err != nil { t.Fatal(err) } } diff --git a/pkg/storage/batcheval/cmd_reverse_scan.go b/pkg/storage/batcheval/cmd_reverse_scan.go index 9ca27ad8a56e..b499a1c111cf 100644 --- a/pkg/storage/batcheval/cmd_reverse_scan.go +++ b/pkg/storage/batcheval/cmd_reverse_scan.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.ReverseScan, DefaultDeclareKeys, ReverseScan) +} + // ReverseScan scans the key range specified by start key through // end key in descending order up to some maximum number of results. // maxKeys stores the number of scan results remaining for this batch diff --git a/pkg/storage/batcheval/cmd_scan.go b/pkg/storage/batcheval/cmd_scan.go index 4707a64e2d66..ce7aacb4c9f0 100644 --- a/pkg/storage/batcheval/cmd_scan.go +++ b/pkg/storage/batcheval/cmd_scan.go @@ -22,6 +22,10 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/engine" ) +func init() { + RegisterCommand(roachpb.Scan, DefaultDeclareKeys, Scan) +} + // Scan scans the key range specified by start key through end key // in ascending order up to some maximum number of results. maxKeys // stores the number of scan results remaining for this batch diff --git a/pkg/storage/batcheval/cmd_truncate_log.go b/pkg/storage/batcheval/cmd_truncate_log.go index 1632bff6794e..a1cb461b74a8 100644 --- a/pkg/storage/batcheval/cmd_truncate_log.go +++ b/pkg/storage/batcheval/cmd_truncate_log.go @@ -25,12 +25,25 @@ import ( "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/storage/storagebase" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/pkg/errors" ) +func init() { + RegisterCommand(roachpb.TruncateLog, declareKeysTruncateLog, TruncateLog) +} + +func declareKeysTruncateLog( + _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, +) { + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateKey(header.RangeID)}) + prefix := keys.RaftLogPrefix(header.RangeID) + spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) +} + // TruncateLog discards a prefix of the raft log. Truncating part of a log that // has already been truncated has no effect. If this range is not the one // specified within the request body, the request will also be ignored. diff --git a/pkg/storage/batcheval/command.go b/pkg/storage/batcheval/command.go new file mode 100644 index 000000000000..46c3c726017e --- /dev/null +++ b/pkg/storage/batcheval/command.go @@ -0,0 +1,69 @@ +// Copyright 2014 The Cockroach Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +// implied. See the License for the specific language governing +// permissions and limitations under the License. + +package batcheval + +import ( + "golang.org/x/net/context" + + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" + "github.com/cockroachdb/cockroach/pkg/storage/engine" + "github.com/cockroachdb/cockroach/pkg/storage/spanset" + "github.com/cockroachdb/cockroach/pkg/util/log" +) + +// A Command is the implementation of a single request within a BatchRequest. +type Command struct { + // DeclareKeys adds all keys this command touches to the given spanSet. + DeclareKeys func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet) + + // Eval evaluates a command on the given engine. It should populate + // the supplied response (always a non-nil pointer to the correct + // type) and return special side effects (if any) in the Result. + // If it writes to the engine it should also update + // *CommandArgs.Stats. + Eval func(context.Context, engine.ReadWriter, CommandArgs, roachpb.Response) (result.Result, error) +} + +var cmds = make(map[roachpb.Method]Command) + +// RegisterCommand makes a command available for execution. It must only be +// called before any evaluation takes place. +func RegisterCommand( + method roachpb.Method, + declare func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet), + impl func(context.Context, engine.ReadWriter, CommandArgs, roachpb.Response) (result.Result, error), +) { + if _, ok := cmds[method]; ok { + log.Fatalf(context.TODO(), "cannot overwrite previously registered method %v", method) + } + cmds[method] = Command{ + DeclareKeys: declare, + Eval: impl, + } +} + +// UnregisterCommand is provided for testing and allows removing a command. +// It is a no-op if the command is not registered. +func UnregisterCommand(method roachpb.Method) { + delete(cmds, method) +} + +// LookupCommand returns the command for the given method, with the boolean +// indicating success or failure. +func LookupCommand(method roachpb.Method) (Command, bool) { + cmd, ok := cmds[method] + return cmd, ok +} diff --git a/pkg/storage/cclglue.go b/pkg/storage/cclglue.go index 1be805b6ade9..6c6af6710693 100644 --- a/pkg/storage/cclglue.go +++ b/pkg/storage/cclglue.go @@ -19,52 +19,13 @@ import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/batcheval" - "github.com/cockroachdb/cockroach/pkg/storage/batcheval/result" - "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/pkg/errors" ) -// The variables/methods here are initialized/called at init() time, often from -// the ccl packages. - -func makeUnimplementedCommand(method roachpb.Method) Command { - return Command{ - DeclareKeys: batcheval.DefaultDeclareKeys, - Eval: func( - _ context.Context, _ engine.ReadWriter, _ batcheval.CommandArgs, _ roachpb.Response, - ) (result.Result, error) { - return result.Result{}, errors.Errorf("unimplemented command: %s", method.String()) - }} -} - -var writeBatchCmd = makeUnimplementedCommand(roachpb.WriteBatch) -var addSSTableCmd = makeUnimplementedCommand(roachpb.AddSSTable) -var exportCmd = makeUnimplementedCommand(roachpb.Export) var importCmdFn ImportCmdFunc = func(context.Context, batcheval.CommandArgs) (*roachpb.ImportResponse, error) { return &roachpb.ImportResponse{}, errors.Errorf("unimplemented command: %s", roachpb.Import) } -// SetWriteBatchCmd allows setting the function that will be called as the -// implementation of the WriteBatch command. Only allowed to be called by Init. -func SetWriteBatchCmd(cmd Command) { - // This is safe if SetWriteBatchCmd is only called at init time. - commands[roachpb.WriteBatch] = cmd -} - -// SetAddSSTableCmd allows setting the function that will be called as the -// implementation of the AddSSTable command. Only allowed to be called by Init. -func SetAddSSTableCmd(cmd Command) { - // This is safe if SetAddSSTableCmd is only called at init time. - commands[roachpb.AddSSTable] = cmd -} - -// SetExportCmd allows setting the function that will be called as the -// implementation of the Export command. Only allowed to be called by Init. -func SetExportCmd(cmd Command) { - // This is safe if SetExportCmd is only called at init time. - commands[roachpb.Export] = cmd -} - // ImportCmdFunc is the type of the function that will be called as the // implementation of the Import command. type ImportCmdFunc func(context.Context, batcheval.CommandArgs) (*roachpb.ImportResponse, error) diff --git a/pkg/storage/helpers_test.go b/pkg/storage/helpers_test.go index 8d7e6a2caae5..10bf35ef0358 100644 --- a/pkg/storage/helpers_test.go +++ b/pkg/storage/helpers_test.go @@ -451,7 +451,7 @@ func ProposeAddSSTable(ctx context.Context, key, val string, ts hlc.Timestamp, s } func SetMockAddSSTable() (undo func()) { - prev := commands[roachpb.AddSSTable] + prev, _ := batcheval.LookupCommand(roachpb.AddSSTable) // TODO(tschottdorf): this already does nontrivial work. Worth open-sourcing the relevant // subparts of the real evalAddSSTable to make this test less likely to rot. @@ -471,12 +471,11 @@ func SetMockAddSSTable() (undo func()) { }, nil } - SetAddSSTableCmd(Command{ - DeclareKeys: batcheval.DefaultDeclareKeys, - Eval: evalAddSSTable, - }) + batcheval.UnregisterCommand(roachpb.AddSSTable) + batcheval.RegisterCommand(roachpb.AddSSTable, batcheval.DefaultDeclareKeys, evalAddSSTable) return func() { - SetAddSSTableCmd(prev) + batcheval.UnregisterCommand(roachpb.AddSSTable) + batcheval.RegisterCommand(roachpb.AddSSTable, prev.DeclareKeys, prev.Eval) } } diff --git a/pkg/storage/replica.go b/pkg/storage/replica.go index ad98eaae714a..e5def1762404 100644 --- a/pkg/storage/replica.go +++ b/pkg/storage/replica.go @@ -2164,7 +2164,7 @@ func collectSpans( if _, ok := inner.(*roachpb.NoopRequest); ok { continue } - if cmd, ok := commands[inner.Method()]; ok { + if cmd, ok := batcheval.LookupCommand(inner.Method()); ok { cmd.DeclareKeys(desc, ba.Header, inner, spans) } else { return nil, errors.Errorf("unrecognized command %s", inner.Method()) @@ -5070,7 +5070,7 @@ func (r *Replica) evaluateTxnWriteBatch( // If all writes occurred at the intended timestamp, we've succeeded on the fast path. batch := r.store.Engine().NewBatch() if util.RaceEnabled && spans != nil { - batch = makeSpanSetBatch(batch, spans) + batch = spanset.NewBatch(batch, spans) } rec := NewReplicaEvalContext(r, spans) br, res, pErr := evaluateBatch(ctx, idKey, batch, rec, &ms, strippedBa) @@ -5122,7 +5122,7 @@ func (r *Replica) evaluateTxnWriteBatch( batch := r.store.Engine().NewBatch() if util.RaceEnabled && spans != nil { - batch = makeSpanSetBatch(batch, spans) + batch = spanset.NewBatch(batch, spans) } rec := NewReplicaEvalContext(r, spans) br, result, pErr := evaluateBatch(ctx, idKey, batch, rec, &ms, ba) diff --git a/pkg/storage/replica_command.go b/pkg/storage/replica_command.go index b44ccdfa001f..93b6c6667fbc 100644 --- a/pkg/storage/replica_command.go +++ b/pkg/storage/replica_command.go @@ -60,55 +60,6 @@ const ( collectChecksumTimeout = 5 * time.Second ) -// A Command is the implementation of a single request within a BatchRequest. -type Command struct { - // DeclareKeys adds all keys this command touches to the given spanSet. - DeclareKeys func(roachpb.RangeDescriptor, roachpb.Header, roachpb.Request, *spanset.SpanSet) - - // Eval evaluates a command on the given engine. It should populate - // the supplied response (always a non-nil pointer to the correct - // type) and return special side effects (if any) in the Result. - // If it writes to the engine it should also update - // *CommandArgs.Stats. - Eval func(context.Context, engine.ReadWriter, batcheval.CommandArgs, roachpb.Response) (result.Result, error) -} - -var commands = map[roachpb.Method]Command{ - roachpb.Get: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Get}, - roachpb.Put: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Put}, - roachpb.ConditionalPut: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.ConditionalPut}, - roachpb.InitPut: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.InitPut}, - roachpb.Increment: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Increment}, - roachpb.Delete: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Delete}, - roachpb.DeleteRange: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.DeleteRange}, - roachpb.Scan: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Scan}, - roachpb.ReverseScan: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.ReverseScan}, - roachpb.BeginTransaction: {DeclareKeys: declareKeysBeginTransaction, Eval: batcheval.BeginTransaction}, - roachpb.EndTransaction: {DeclareKeys: declareKeysEndTransaction, Eval: evalEndTransaction}, - roachpb.RangeLookup: {DeclareKeys: declareKeysRangeLookup, Eval: batcheval.RangeLookup}, - roachpb.HeartbeatTxn: {DeclareKeys: declareKeysHeartbeatTransaction, Eval: batcheval.HeartbeatTxn}, - roachpb.GC: {DeclareKeys: declareKeysGC, Eval: batcheval.GC}, - roachpb.PushTxn: {DeclareKeys: declareKeysPushTransaction, Eval: batcheval.PushTxn}, - roachpb.QueryTxn: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.QueryTxn}, - roachpb.ResolveIntent: {DeclareKeys: declareKeysResolveIntent, Eval: batcheval.ResolveIntent}, - roachpb.ResolveIntentRange: {DeclareKeys: declareKeysResolveIntentRange, Eval: batcheval.ResolveIntentRange}, - roachpb.Merge: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.Merge}, - roachpb.TruncateLog: {DeclareKeys: declareKeysTruncateLog, Eval: batcheval.TruncateLog}, - roachpb.RequestLease: {DeclareKeys: declareKeysRequestLease, Eval: batcheval.RequestLease}, - roachpb.TransferLease: {DeclareKeys: declareKeysRequestLease, Eval: batcheval.TransferLease}, - roachpb.LeaseInfo: {DeclareKeys: declareKeysLeaseInfo, Eval: batcheval.LeaseInfo}, - roachpb.ComputeChecksum: {DeclareKeys: batcheval.DefaultDeclareKeys, Eval: batcheval.ComputeChecksum}, - roachpb.WriteBatch: writeBatchCmd, - roachpb.Export: exportCmd, - roachpb.AddSSTable: addSSTableCmd, - - roachpb.DeprecatedVerifyChecksum: { - DeclareKeys: batcheval.DefaultDeclareKeys, - Eval: func(context.Context, engine.ReadWriter, batcheval.CommandArgs, roachpb.Response) (result.Result, error) { - return result.Result{}, nil - }}, -} - // evaluateCommand delegates to the eval method for the given // roachpb.Request. The returned Result may be partially valid // even if an error is returned. maxKeys is the number of scan results @@ -149,7 +100,7 @@ func evaluateCommand( var err error var pd result.Result - if cmd, ok := commands[args.Method()]; ok { + if cmd, ok := batcheval.LookupCommand(args.Method()); ok { cArgs := batcheval.CommandArgs{ EvalCtx: rec, Header: h, @@ -205,30 +156,16 @@ func evaluateCommand( return pd, pErr } -// declareKeysWriteTransaction is the shared portion of -// declareKeys{Begin,End,Heartbeat}Transaction -func declareKeysWriteTransaction( - _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - if header.Txn != nil { - header.Txn.AssertInitialized(context.TODO()) - spans.Add(spanset.SpanReadWrite, roachpb.Span{ - Key: keys.TransactionKey(req.Header().Key, header.Txn.ID), - }) - } -} - -func declareKeysBeginTransaction( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - declareKeysWriteTransaction(desc, header, req, spans) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID)}) +func init() { + // TODO(tschottdorf): move EndTransaction into batcheval. In doing so, + // unexport DeclareKeysWriteTransaction. + batcheval.RegisterCommand(roachpb.EndTransaction, declareKeysEndTransaction, evalEndTransaction) } func declareKeysEndTransaction( desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, ) { - declareKeysWriteTransaction(desc, header, req, spans) + batcheval.DeclareKeysWriteTransaction(desc, header, req, spans) et := req.(*roachpb.EndTransactionRequest) // The spans may extend beyond this Range, but it's ok for the // purpose of the command queue. The parts in our Range will @@ -761,107 +698,6 @@ func runCommitTrigger( return result.Result{}, nil } -func declareKeysRangeLookup( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - batcheval.DefaultDeclareKeys(desc, header, req, spans) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) -} - -func declareKeysHeartbeatTransaction( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - declareKeysWriteTransaction(desc, header, req, spans) - if header.Txn != nil { - header.Txn.AssertInitialized(context.TODO()) - spans.Add(spanset.SpanReadOnly, roachpb.Span{ - Key: keys.AbortSpanKey(header.RangeID, header.Txn.ID), - }) - } -} - -func declareKeysGC( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - // Intentionally don't call batcheval.DefaultDeclareKeys: the key range in the header - // is usually the whole range (pending resolution of #7880). - gcr := req.(*roachpb.GCRequest) - for _, key := range gcr.Keys { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}) - } - // Be smart here about blocking on the threshold keys. The GC queue can send an empty - // request first to bump the thresholds, and then another one that actually does work - // but can avoid declaring these keys below. - if gcr.Threshold != (hlc.Timestamp{}) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLastGCKey(header.RangeID)}) - } - if gcr.TxnSpanGCThreshold != (hlc.Timestamp{}) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{ - // TODO(bdarnell): since this must be checked by all - // reads, this should be factored out into a separate - // waiter which blocks only those reads far enough in the - // past to be affected by the in-flight GCRequest (i.e. - // normally none). This means this key would be special - // cased and not tracked by the command queue. - Key: keys.RangeTxnSpanGCThresholdKey(header.RangeID), - }) - } - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) -} - -func declareKeysPushTransaction( - _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - pr := req.(*roachpb.PushTxnRequest) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.TransactionKey(pr.PusheeTxn.Key, pr.PusheeTxn.ID)}) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, pr.PusheeTxn.ID)}) -} - -func declareKeysResolveIntentCombined( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - batcheval.DefaultDeclareKeys(desc, header, req, spans) - var args *roachpb.ResolveIntentRequest - switch t := req.(type) { - case *roachpb.ResolveIntentRequest: - args = t - case *roachpb.ResolveIntentRangeRequest: - // Ranged and point requests only differ in whether the header's EndKey - // is used, so we can convert them. - args = (*roachpb.ResolveIntentRequest)(t) - } - if batcheval.WriteAbortSpanOnResolve(args.Status) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.AbortSpanKey(header.RangeID, args.IntentTxn.ID)}) - } -} - -func declareKeysResolveIntent( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - declareKeysResolveIntentCombined(desc, header, req, spans) -} - -func declareKeysResolveIntentRange( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - declareKeysResolveIntentCombined(desc, header, req, spans) -} - -func declareKeysTruncateLog( - _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RaftTruncatedStateKey(header.RangeID)}) - prefix := keys.RaftLogPrefix(header.RangeID) - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: prefix, EndKey: prefix.PrefixEnd()}) -} - -func declareKeysRequestLease( - desc roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - spans.Add(spanset.SpanReadWrite, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeDescriptorKey(desc.StartKey)}) -} - // CheckConsistency runs a consistency check on the range. It first applies a // ComputeChecksum command on the range. It then issues CollectChecksum commands // to the other replicas. @@ -2516,12 +2352,6 @@ func updateRangeDescriptor( return nil } -func declareKeysLeaseInfo( - _ roachpb.RangeDescriptor, header roachpb.Header, req roachpb.Request, spans *spanset.SpanSet, -) { - spans.Add(spanset.SpanReadOnly, roachpb.Span{Key: keys.RangeLeaseKey(header.RangeID)}) -} - // TestingRelocateRange relocates a given range to a given set of stores. The first // store in the slice becomes the new leaseholder. // diff --git a/pkg/storage/replica_test.go b/pkg/storage/replica_test.go index b3ac79f41dd7..04d701dd10fd 100644 --- a/pkg/storage/replica_test.go +++ b/pkg/storage/replica_test.go @@ -3141,7 +3141,8 @@ func TestReplicaCommandQueueSplitDeclaresWrites(t *testing.T) { defer leaktest.AfterTest(t)() var spans spanset.SpanSet - commands[roachpb.EndTransaction].DeclareKeys( + cmd, _ := batcheval.LookupCommand(roachpb.EndTransaction) + cmd.DeclareKeys( roachpb.RangeDescriptor{StartKey: roachpb.RKey("a"), EndKey: roachpb.RKey("d")}, roachpb.Header{}, &roachpb.EndTransactionRequest{ @@ -8106,7 +8107,8 @@ func TestGCWithoutThreshold(t *testing.T) { gc.Threshold = keyThresh gc.TxnSpanGCThreshold = txnThresh - declareKeysGC(desc, roachpb.Header{RangeID: tc.repl.RangeID}, &gc, &spans) + cmd, _ := batcheval.LookupCommand(roachpb.GC) + cmd.DeclareKeys(desc, roachpb.Header{RangeID: tc.repl.RangeID}, &gc, &spans) if num, exp := spans.Len(), i+j+1; num != exp { t.Fatalf("(%s,%s): expected %d declared keys, found %d", @@ -8118,7 +8120,7 @@ func TestGCWithoutThreshold(t *testing.T) { batch := eng.NewBatch() defer batch.Close() - rw := makeSpanSetBatch(batch, &spans) + rw := spanset.NewBatch(batch, &spans) var resp roachpb.GCResponse diff --git a/pkg/storage/batch_spanset.go b/pkg/storage/spanset/batch.go similarity index 64% rename from pkg/storage/batch_spanset.go rename to pkg/storage/spanset/batch.go index c9ba27d78dfd..bf7214aeba60 100644 --- a/pkg/storage/batch_spanset.go +++ b/pkg/storage/spanset/batch.go @@ -13,22 +13,21 @@ // permissions and limitations under the License. See the AUTHORS file // for names of contributors. -package storage +package spanset import ( "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/storage/engine" "github.com/cockroachdb/cockroach/pkg/storage/engine/enginepb" - "github.com/cockroachdb/cockroach/pkg/storage/spanset" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/protoutil" ) -// SpanSetIterator wraps an engine.Iterator and ensures that it can +// Iterator wraps an engine.Iterator and ensures that it can // only be used to access spans in a SpanSet. -type SpanSetIterator struct { +type Iterator struct { i engine.Iterator - spans *spanset.SpanSet + spans *SpanSet // Seeking to an invalid key puts the iterator in an error state. err error @@ -37,21 +36,30 @@ type SpanSetIterator struct { invalid bool } -var _ engine.Iterator = &SpanSetIterator{} +var _ engine.Iterator = &Iterator{} + +// NewIterator constructs an iterator that verifies access of the underlying +// iterator against the given spans. +func NewIterator(iter engine.Iterator, spans *SpanSet) *Iterator { + return &Iterator{ + i: iter, + spans: spans, + } +} // Close implements engine.Iterator. -func (s *SpanSetIterator) Close() { +func (s *Iterator) Close() { s.i.Close() } // Iterator returns the underlying engine.Iterator. -func (s *SpanSetIterator) Iterator() engine.Iterator { +func (s *Iterator) Iterator() engine.Iterator { return s.i } // Seek implements engine.Iterator. -func (s *SpanSetIterator) Seek(key engine.MVCCKey) { - s.err = s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: key.Key}) +func (s *Iterator) Seek(key engine.MVCCKey) { + s.err = s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) if s.err == nil { s.invalid = false } @@ -59,8 +67,8 @@ func (s *SpanSetIterator) Seek(key engine.MVCCKey) { } // SeekReverse implements engine.Iterator. -func (s *SpanSetIterator) SeekReverse(key engine.MVCCKey) { - s.err = s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: key.Key}) +func (s *Iterator) SeekReverse(key engine.MVCCKey) { + s.err = s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}) if s.err == nil { s.invalid = false } @@ -68,7 +76,7 @@ func (s *SpanSetIterator) SeekReverse(key engine.MVCCKey) { } // Valid implements engine.Iterator. -func (s *SpanSetIterator) Valid() (bool, error) { +func (s *Iterator) Valid() (bool, error) { if s.err != nil { return false, s.err } @@ -80,82 +88,82 @@ func (s *SpanSetIterator) Valid() (bool, error) { } // Next implements engine.Iterator. -func (s *SpanSetIterator) Next() { +func (s *Iterator) Next() { s.i.Next() - if s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { + if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { s.invalid = true } } // Prev implements engine.Iterator. -func (s *SpanSetIterator) Prev() { +func (s *Iterator) Prev() { s.i.Prev() - if s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { + if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { s.invalid = true } } // NextKey implements engine.Iterator. -func (s *SpanSetIterator) NextKey() { +func (s *Iterator) NextKey() { s.i.NextKey() - if s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { + if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { s.invalid = true } } // PrevKey implements engine.Iterator. -func (s *SpanSetIterator) PrevKey() { +func (s *Iterator) PrevKey() { s.i.PrevKey() - if s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { + if s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: s.UnsafeKey().Key}) != nil { s.invalid = true } } // Key implements engine.Iterator. -func (s *SpanSetIterator) Key() engine.MVCCKey { +func (s *Iterator) Key() engine.MVCCKey { return s.i.Key() } // Value implements engine.Iterator. -func (s *SpanSetIterator) Value() []byte { +func (s *Iterator) Value() []byte { return s.i.Value() } // ValueProto implements engine.Iterator. -func (s *SpanSetIterator) ValueProto(msg protoutil.Message) error { +func (s *Iterator) ValueProto(msg protoutil.Message) error { return s.i.ValueProto(msg) } // UnsafeKey implements engine.Iterator. -func (s *SpanSetIterator) UnsafeKey() engine.MVCCKey { +func (s *Iterator) UnsafeKey() engine.MVCCKey { return s.i.UnsafeKey() } // UnsafeValue implements engine.Iterator. -func (s *SpanSetIterator) UnsafeValue() []byte { +func (s *Iterator) UnsafeValue() []byte { return s.i.UnsafeValue() } // Less implements engine.Iterator. -func (s *SpanSetIterator) Less(key engine.MVCCKey) bool { +func (s *Iterator) Less(key engine.MVCCKey) bool { return s.i.Less(key) } // ComputeStats implements engine.Iterator. -func (s *SpanSetIterator) ComputeStats( +func (s *Iterator) ComputeStats( start, end engine.MVCCKey, nowNanos int64, ) (enginepb.MVCCStats, error) { - if err := s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { return enginepb.MVCCStats{}, err } return s.i.ComputeStats(start, end, nowNanos) } // FindSplitKey implements engine.Iterator. -func (s *SpanSetIterator) FindSplitKey( +func (s *Iterator) FindSplitKey( start, end, minSplitKey engine.MVCCKey, targetSize int64, allowMeta2Splits bool, ) (engine.MVCCKey, error) { - if err := s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { return engine.MVCCKey{}, err } return s.i.FindSplitKey(start, end, minSplitKey, targetSize, allowMeta2Splits) @@ -163,7 +171,7 @@ func (s *SpanSetIterator) FindSplitKey( type spanSetReader struct { r engine.Reader - spans *spanset.SpanSet + spans *SpanSet } var _ engine.Reader = spanSetReader{} @@ -177,7 +185,7 @@ func (s spanSetReader) Closed() bool { } func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) { - if err := s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { return nil, err } return s.r.Get(key) @@ -186,7 +194,7 @@ func (s spanSetReader) Get(key engine.MVCCKey) ([]byte, error) { func (s spanSetReader) GetProto( key engine.MVCCKey, msg protoutil.Message, ) (bool, int64, int64, error) { - if err := s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: key.Key}); err != nil { return false, 0, 0, err } return s.r.GetProto(key, msg) @@ -195,14 +203,14 @@ func (s spanSetReader) GetProto( func (s spanSetReader) Iterate( start, end engine.MVCCKey, f func(engine.MVCCKeyValue) (bool, error), ) error { - if err := s.spans.CheckAllowed(spanset.SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadOnly, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { return err } return s.r.Iterate(start, end, f) } func (s spanSetReader) NewIterator(prefix bool) engine.Iterator { - return &SpanSetIterator{s.r.NewIterator(prefix), s.spans, nil, false} + return &Iterator{s.r.NewIterator(prefix), s.spans, nil, false} } func (s spanSetReader) NewTimeBoundIterator(start, end hlc.Timestamp) engine.Iterator { @@ -211,7 +219,7 @@ func (s spanSetReader) NewTimeBoundIterator(start, end hlc.Timestamp) engine.Ite type spanSetWriter struct { w engine.Writer - spans *spanset.SpanSet + spans *SpanSet } var _ engine.Writer = spanSetWriter{} @@ -222,35 +230,35 @@ func (s spanSetWriter) ApplyBatchRepr(repr []byte, sync bool) error { } func (s spanSetWriter) Clear(key engine.MVCCKey) error { - if err := s.spans.CheckAllowed(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { return err } return s.w.Clear(key) } func (s spanSetWriter) ClearRange(start, end engine.MVCCKey) error { - if err := s.spans.CheckAllowed(spanset.SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { return err } return s.w.ClearRange(start, end) } func (s spanSetWriter) ClearIterRange(iter engine.Iterator, start, end engine.MVCCKey) error { - if err := s.spans.CheckAllowed(spanset.SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: start.Key, EndKey: end.Key}); err != nil { return err } return s.w.ClearIterRange(iter, start, end) } func (s spanSetWriter) Merge(key engine.MVCCKey, value []byte) error { - if err := s.spans.CheckAllowed(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { return err } return s.w.Merge(key, value) } func (s spanSetWriter) Put(key engine.MVCCKey, value []byte) error { - if err := s.spans.CheckAllowed(spanset.SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { + if err := s.spans.CheckAllowed(SpanReadWrite, roachpb.Span{Key: key.Key}); err != nil { return err } return s.w.Put(key, value) @@ -263,7 +271,7 @@ type spanSetReadWriter struct { var _ engine.ReadWriter = spanSetReadWriter{} -func makeSpanSetReadWriter(rw engine.ReadWriter, spans *spanset.SpanSet) spanSetReadWriter { +func makeSpanSetReadWriter(rw engine.ReadWriter, spans *SpanSet) spanSetReadWriter { return spanSetReadWriter{ spanSetReader{ r: rw, @@ -279,7 +287,7 @@ func makeSpanSetReadWriter(rw engine.ReadWriter, spans *spanset.SpanSet) spanSet type spanSetBatch struct { spanSetReadWriter b engine.Batch - spans *spanset.SpanSet + spans *SpanSet } var _ engine.Batch = spanSetBatch{} @@ -296,7 +304,9 @@ func (s spanSetBatch) Repr() []byte { return s.b.Repr() } -func makeSpanSetBatch(b engine.Batch, spans *spanset.SpanSet) engine.Batch { +// NewBatch returns an engine.Batch that asserts access of the underlying +// Batch against the given SpanSet. +func NewBatch(b engine.Batch, spans *SpanSet) engine.Batch { return &spanSetBatch{ makeSpanSetReadWriter(b, spans), b,