Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

storage: sync Raft log writes #13481

Merged
merged 1 commit into from
Feb 8, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/ccl/storageccl/writebatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func evalWriteBatch(
return storage.EvalResult{}, errors.New("WriteBatch can only be called on empty ranges")
}

if err := batch.ApplyBatchRepr(args.Data); err != nil {
if err := batch.ApplyBatchRepr(args.Data, false /* !sync */); err != nil {
return storage.EvalResult{}, err
}
return storage.EvalResult{}, nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/abort_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (sc *AbortCache) ClearData(e engine.Engine) error {
if err != nil {
return err
}
return b.Commit()
return b.Commit(false /* !sync */)
}

// Get looks up an abort cache entry recorded for this transaction ID.
Expand Down
18 changes: 9 additions & 9 deletions pkg/storage/engine/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func testBatchBasics(t *testing.T, writeOnly bool, commit func(e Engine, b Batch
func TestBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, false /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
return b.Commit(false /* !sync */)
})
}

Expand Down Expand Up @@ -231,14 +231,14 @@ func TestBatchRepr(t *testing.T) {
t.Fatalf("expected %v, but found %v", expOps, ops)
}

return e.ApplyBatchRepr(repr)
return e.ApplyBatchRepr(repr, false /* !sync */)
})
}

func TestWriteBatchBasics(t *testing.T) {
defer leaktest.AfterTest(t)()
testBatchBasics(t, true /* writeOnly */, func(e Engine, b Batch) error {
return b.Commit()
return b.Commit(false /* !sync */)
})
}

Expand All @@ -264,7 +264,7 @@ func TestApplyBatchRepr(t *testing.T) {

b2 := e.NewBatch()
defer b2.Close()
if err := b2.ApplyBatchRepr(repr1); err != nil {
if err := b2.ApplyBatchRepr(repr1, false /* !sync */); err != nil {
t.Fatal(err)
}
repr2 := b2.Repr()
Expand All @@ -290,11 +290,11 @@ func TestApplyBatchRepr(t *testing.T) {

b4 := e.NewBatch()
defer b4.Close()
if err := b4.ApplyBatchRepr(repr); err != nil {
if err := b4.ApplyBatchRepr(repr, false /* !sync */); err != nil {
t.Fatal(err)
}
// Intentionally don't call Repr() because the expected user wouldn't.
if err := b4.Commit(); err != nil {
if err := b4.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}

Expand Down Expand Up @@ -456,7 +456,7 @@ func TestBatchProto(t *testing.T) {
t.Fatalf("expected GetProto to fail ok=%t: %s", ok, err)
}
// Commit and verify the proto can be read directly from the engine.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
if ok, _, _, err := e.GetProto(mvccKey("proto"), getVal); !ok || err != nil {
Expand Down Expand Up @@ -545,7 +545,7 @@ func TestBatchScan(t *testing.T) {
}

// Now, commit batch and re-scan using engine direct to compare results.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
for i, scan := range scans {
Expand Down Expand Up @@ -912,7 +912,7 @@ func TestBatchDistinctPanics(t *testing.T) {
func() { _ = batch.Put(a, nil) },
func() { _ = batch.Merge(a, nil) },
func() { _ = batch.Clear(a) },
func() { _ = batch.ApplyBatchRepr(nil) },
func() { _ = batch.ApplyBatchRepr(nil, false) },
func() { _, _ = batch.Get(a) },
func() { _, _, _, _ = batch.GetProto(a, nil) },
func() { _ = batch.Iterate(a, a, nil) },
Expand Down
10 changes: 5 additions & 5 deletions pkg/storage/engine/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func setupMVCCData(
// sstables.
if scaled := len(order) / 20; i > 0 && (i%scaled) == 0 {
log.Infof(context.Background(), "committing (%d/~%d)", i/scaled, 20)
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand All @@ -143,7 +143,7 @@ func setupMVCCData(
b.Fatal(err)
}
}
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down Expand Up @@ -352,7 +352,7 @@ func runMVCCBatchPut(emk engineMaker, valueSize, batchSize int, b *testing.B) {
}
}

if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}

Expand Down Expand Up @@ -403,7 +403,7 @@ func runMVCCBatchTimeSeries(emk engineMaker, batchSize int, b *testing.B) {
}
}

if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down Expand Up @@ -561,7 +561,7 @@ func runBatchApplyBatchRepr(
} else {
batch = eng.NewBatch()
}
if err := batch.ApplyBatchRepr(repr); err != nil {
if err := batch.ApplyBatchRepr(repr, false /* !sync */); err != nil {
b.Fatal(err)
}
batch.Close()
Expand Down
11 changes: 7 additions & 4 deletions pkg/storage/engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,10 @@ type Reader interface {
type Writer interface {
// ApplyBatchRepr atomically applies a set of batched updates. Created by
// calling Repr() on a batch. Using this method is equivalent to constructing
// and committing a batch whose Repr() equals repr.
ApplyBatchRepr(repr []byte) error
// and committing a batch whose Repr() equals repr. If sync is true, the
// batch is synchronously written to disk. It is an error to specify
// sync=true if the Writer is a Batch.
ApplyBatchRepr(repr []byte, sync bool) error
// Clear removes the item from the db with the given key.
// Note that clear actually removes entries from the storage
// engine, rather than inserting tombstones.
Expand Down Expand Up @@ -202,8 +204,9 @@ type Engine interface {
type Batch interface {
ReadWriter
// Commit atomically applies any batched updates to the underlying
// engine. This is a noop unless the engine was created via NewBatch().
Commit() error
// engine. This is a noop unless the engine was created via NewBatch(). If
// sync is true, the batch is synchronously committed to disk.
Commit(sync bool) error
// Distinct returns a view of the existing batch which only sees writes that
// were performed before the Distinct batch was created. That is, the
// returned batch will not read its own writes, but it will read writes to
Expand Down
4 changes: 2 additions & 2 deletions pkg/storage/engine/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ func TestEngineBatchCommit(t *testing.T) {
t.Fatal(err)
}
}
if err := batch.Commit(); err != nil {
if err := batch.Commit(false /* !sync */); err != nil {
t.Fatal(err)
}
close(writesDone)
Expand Down Expand Up @@ -320,7 +320,7 @@ func TestEngineBatch(t *testing.T) {
}
iter.Close()
// Commit the batch and try getting the value from the engine.
if err := b.Commit(); err != nil {
if err := b.Commit(false /* !sync */); err != nil {
t.Errorf("%d: %v", i, err)
continue
}
Expand Down
20 changes: 10 additions & 10 deletions pkg/storage/engine/rocksdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,8 +441,8 @@ func (r *RocksDB) Merge(key MVCCKey, value []byte) error {
// ApplyBatchRepr atomically applies a set of batched updates. Created by
// calling Repr() on a batch. Using this method is equivalent to constructing
// and committing a batch whose Repr() equals repr.
func (r *RocksDB) ApplyBatchRepr(repr []byte) error {
return dbApplyBatchRepr(r.rdb, repr)
func (r *RocksDB) ApplyBatchRepr(repr []byte, sync bool) error {
return dbApplyBatchRepr(r.rdb, repr, sync)
}

// Get returns the value for the given key.
Expand Down Expand Up @@ -965,13 +965,13 @@ func (r *rocksDBBatch) Merge(key MVCCKey, value []byte) error {

// ApplyBatchRepr atomically applies a set of batched updates to the current
// batch (the receiver).
func (r *rocksDBBatch) ApplyBatchRepr(repr []byte) error {
func (r *rocksDBBatch) ApplyBatchRepr(repr []byte, sync bool) error {
if r.distinctOpen {
panic("distinct batch open")
}
r.flushMutations()
r.flushes++ // make sure that Repr() doesn't take a shortcut
return dbApplyBatchRepr(r.batch, repr)
return dbApplyBatchRepr(r.batch, repr, sync)
}

func (r *rocksDBBatch) Get(key MVCCKey) ([]byte, error) {
Expand Down Expand Up @@ -1065,7 +1065,7 @@ func (r *rocksDBBatch) NewIterator(prefix bool) Iterator {
return iter
}

func (r *rocksDBBatch) Commit() error {
func (r *rocksDBBatch) Commit(sync bool) error {
if r.closed() {
panic("this batch was already committed")
}
Expand All @@ -1078,7 +1078,7 @@ func (r *rocksDBBatch) Commit() error {
// We've previously flushed mutations to the C++ batch, so we have to flush
// any remaining mutations as well and then commit the batch.
r.flushMutations()
if err := statusToError(C.DBCommitAndCloseBatch(r.batch)); err != nil {
if err := statusToError(C.DBCommitAndCloseBatch(r.batch, C.bool(sync))); err != nil {
return err
}
r.batch = nil
Expand All @@ -1088,7 +1088,7 @@ func (r *rocksDBBatch) Commit() error {

// Fast-path which avoids flushing mutations to the C++ batch. Instead, we
// directly apply the mutations to the database.
if err := r.parent.ApplyBatchRepr(r.builder.Finish()); err != nil {
if err := r.parent.ApplyBatchRepr(r.builder.Finish(), sync); err != nil {
return err
}
C.DBClose(r.batch)
Expand Down Expand Up @@ -1132,7 +1132,7 @@ func (r *rocksDBBatch) flushMutations() {
r.flushes++
r.flushedCount += r.builder.count
r.flushedSize += len(r.builder.repr)
if err := r.ApplyBatchRepr(r.builder.Finish()); err != nil {
if err := r.ApplyBatchRepr(r.builder.Finish(), false); err != nil {
panic(err)
}
// Force a seek of the underlying iterator on the next Seek/ReverseSeek.
Expand Down Expand Up @@ -1471,8 +1471,8 @@ func dbMerge(rdb *C.DBEngine, key MVCCKey, value []byte) error {
return statusToError(C.DBMerge(rdb, goToCKey(key), goToCSlice(value)))
}

func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte) error {
return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr)))
func dbApplyBatchRepr(rdb *C.DBEngine, repr []byte, sync bool) error {
return statusToError(C.DBApplyBatchRepr(rdb, goToCSlice(repr), C.bool(sync)))
}

// dbGet returns the value for the given key.
Expand Down
Loading