Skip to content

Commit

Permalink
Optimize contiguous runs of put/cput commands if writing to virgin ke…
Browse files Browse the repository at this point in the history
…yspace

If more than 10 contiguous put or cput commands, the keyspace is searched
using a full order iterator to determine whether it's clear, in which case
the "blind" versions of MVCCPut and MVCCConditionalPut are used to more
efficiently write values without requiring a prefix iterator to search for
an existing value on each put.

Stable sorting was discontinued as it showed a performance hit on the
benchmarks, even for randomly ordered insertions.
  • Loading branch information
spencerkimball committed May 3, 2016
1 parent 38488c4 commit a1e8979
Show file tree
Hide file tree
Showing 6 changed files with 522 additions and 210 deletions.
438 changes: 252 additions & 186 deletions roachpb/api.pb.go

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions roachpb/api.proto
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ message PutRequest {
// timestamp. This option should be used with care as it precludes
// the use of this value with transactions.
optional bool inline = 3 [(gogoproto.nullable) = false];
// NOTE: For internal use only! Set to indicate that the put is
// writing to virgin keyspace and no reads are necessary to
// rationalize MVCC.
optional bool blind = 4 [(gogoproto.nullable) = false];
}

// A PutResponse is the return value from the Put() method.
Expand All @@ -93,6 +97,10 @@ message ConditionalPutRequest {
// to indicate there should be no existing entry. This is different
// from the expectation that the value exists but is empty.
optional Value exp_value = 3;
// NOTE: For internal use only! Set to indicate that the put is
// writing to virgin keyspace and no reads are necessary to
// rationalize MVCC.
optional bool blind = 4 [(gogoproto.nullable) = false];
}

// A ConditionalPutResponse is the return value from the
Expand Down
18 changes: 12 additions & 6 deletions sql/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
gosql "database/sql"
"fmt"
"math/rand"
"path/filepath"
"testing"

Expand Down Expand Up @@ -94,11 +95,12 @@ func (kv *kvNative) insert(rows, run int) error {
}

func (kv *kvNative) update(rows, run int) error {
perm := rand.Perm(rows)
pErr := kv.db.Txn(func(txn *client.Txn) *roachpb.Error {
// Read all values in a batch.
b := txn.NewBatch()
for i := 0; i < rows; i++ {
b.Get(fmt.Sprintf("%s%06d", kv.prefix, i))
b.Get(fmt.Sprintf("%s%06d", kv.prefix, perm[i]))
}
if pErr := txn.Run(b); pErr != nil {
return pErr
Expand All @@ -107,7 +109,7 @@ func (kv *kvNative) update(rows, run int) error {
wb := txn.NewBatch()
for i, result := range b.Results {
v := result.Rows[0].ValueInt()
wb.Put(fmt.Sprintf("%s%06d", kv.prefix, i), v+1)
wb.Put(fmt.Sprintf("%s%06d", kv.prefix, perm[i]), v+1)
}
if pErr := txn.CommitInBatch(wb); pErr != nil {
return pErr
Expand All @@ -118,9 +120,11 @@ func (kv *kvNative) update(rows, run int) error {
}

func (kv *kvNative) del(rows, run int) error {
firstRow := rows * run
lastRow := rows * (run + 1)
pErr := kv.db.Txn(func(txn *client.Txn) *roachpb.Error {
b := txn.NewBatch()
for i := 0; i < rows; i++ {
for i := firstRow; i < lastRow; i++ {
b.Del(fmt.Sprintf("%s%06d", kv.prefix, i))
}
return txn.CommitInBatch(b)
Expand Down Expand Up @@ -206,27 +210,29 @@ func (kv *kvSQL) insert(rows, run int) error {
}

func (kv *kvSQL) update(rows, run int) error {
perm := rand.Perm(rows)
var buf bytes.Buffer
buf.WriteString(`UPDATE bench.kv SET v = v + 1 WHERE k IN (`)
for j := 0; j < rows; j++ {
if j > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, `'%06d'`, j)
fmt.Fprintf(&buf, `'%06d'`, perm[j])
}
buf.WriteString(`)`)
_, err := kv.db.Exec(buf.String())
return err
}

func (kv *kvSQL) del(rows, run int) error {
firstRow := rows * run
var buf bytes.Buffer
buf.WriteString(`DELETE FROM bench.kv WHERE k IN (`)
for j := 0; j < rows; j++ {
if j > 0 {
buf.WriteString(", ")
}
fmt.Fprintf(&buf, `'%06d'`, j)
fmt.Fprintf(&buf, `'%06d'`, j+firstRow)
}
buf.WriteString(`)`)
_, err := kv.db.Exec(buf.String())
Expand Down Expand Up @@ -302,7 +308,7 @@ func runKVBenchmark(b *testing.B, typ, op string, rows int) {
opFn = kv.scan
}

if err := kv.prep(rows, op != "insert"); err != nil {
if err := kv.prep(rows, op != "insert" && op != "delete"); err != nil {
b.Fatal(err)
}
b.ResetTimer()
Expand Down
80 changes: 80 additions & 0 deletions storage/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ const (
// need a periodic gossip to safeguard against failure of a leader
// to gossip after performing an update to the map.
configGossipInterval = 1 * time.Minute
// optimizePutThreshold is the minimum length of a contiguous run
// of batched puts or conditional puts, after which the constituent
// put operations will possibly be optimized by determining whether
// the key space being written is starting out empty.
optimizePutThreshold = 10
)

// This flag controls whether Transaction entries are automatically gc'ed
Expand Down Expand Up @@ -1816,6 +1821,76 @@ func isOnePhaseCommit(ba roachpb.BatchRequest) bool {
return !isEndTransactionExceedingDeadline(ba.Header, *etArg)
}

// optimizePuts searches for contiguous runs of Put & CPut commands in
// the supplied request union. Any run which exceeds a minimum length
// threshold employs a full order iterator to determine whether the
// range of keys being written is empty. If so, then the run can be
// set to put "blindly", meaning no iterator need be used to read
// existing values during the MVCC write.
func optimizePuts(batch engine.Engine, reqs []roachpb.RequestUnion) {
var iter engine.Iterator

processPuts := func(startIdx, count int, minKey, maxKey roachpb.Key) {
if count < optimizePutThreshold { // don't bother if below this threshold
return
}
if iter == nil {
iter = batch.NewIterator(false /* total order iterator */)
}

// If there are enough puts in the run to justify calling seek,
// we can determine whether any part of the range being written
// is "virgin" and set the puts to write blindly.
// Find the first non-empty key in the run.
iter.Seek(engine.MakeMVCCMetadataKey(minKey))
var iterKey roachpb.Key
if iter.Valid() && bytes.Compare(iter.Key().Key, maxKey) <= 0 {
iterKey = iter.Key().Key
}
// Set the prefix of the run which is being written to virgin
// keyspace to "blindly" put values.
for _, r := range reqs[startIdx : startIdx+count] {
if iterKey == nil || bytes.Compare(iterKey, r.GetInner().Header().Key) > 0 {
switch t := r.GetInner().(type) {
case *roachpb.PutRequest:
t.Blind = true
case *roachpb.ConditionalPutRequest:
t.Blind = true
}
}
}
}

var putCount int
var minKey, maxKey roachpb.Key
addPut := func(key roachpb.Key) {
putCount++
if minKey == nil || bytes.Compare(key, minKey) < 0 {
minKey = key
}
if maxKey == nil || bytes.Compare(key, maxKey) > 0 {
maxKey = key
}
}
for i, r := range reqs {
switch t := r.GetInner().(type) {
case *roachpb.PutRequest:
addPut(t.Key)
case *roachpb.ConditionalPutRequest:
addPut(t.Key)
default:
processPuts(i-putCount, putCount, minKey, maxKey)
putCount = 0
minKey = nil
maxKey = nil
}
}
processPuts(len(reqs)-putCount, putCount, minKey, maxKey)
if iter != nil {
iter.Close()
}
}

func (r *Replica) executeBatch(
ctx context.Context, idKey storagebase.CmdIDKey,
batch engine.Engine, ms *engine.MVCCStats, ba roachpb.BatchRequest) (
Expand All @@ -1830,6 +1905,11 @@ func (r *Replica) executeBatch(
remScanResults = ba.Header.MaxScanResults
}

// Optimize any contiguous sequences of put and conditional put ops.
if len(ba.Requests) >= optimizePutThreshold {
optimizePuts(batch, ba.Requests)
}

for index, union := range ba.Requests {
// Execute the command.
args := union.GetInner()
Expand Down
7 changes: 6 additions & 1 deletion storage/replica_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,9 @@ func (r *Replica) Put(
if !args.Inline {
ts = h.Timestamp
}
if args.Blind {
return reply, engine.MVCCBlindPut(ctx, batch, ms, args.Key, ts, args.Value, h.Txn)
}
return reply, engine.MVCCPut(ctx, batch, ms, args.Key, ts, args.Value, h.Txn)
}

Expand All @@ -203,7 +206,9 @@ func (r *Replica) ConditionalPut(
ctx context.Context, batch engine.Engine, ms *engine.MVCCStats, h roachpb.Header, args roachpb.ConditionalPutRequest,
) (roachpb.ConditionalPutResponse, error) {
var reply roachpb.ConditionalPutResponse

if args.Blind {
return reply, engine.MVCCBlindConditionalPut(ctx, batch, ms, args.Key, h.Timestamp, args.Value, args.ExpValue, h.Txn)
}
return reply, engine.MVCCConditionalPut(ctx, batch, ms, args.Key, h.Timestamp, args.Value, args.ExpValue, h.Txn)
}

Expand Down
Loading

0 comments on commit a1e8979

Please sign in to comment.