Skip to content

Commit

Permalink
mvcc: add HashByRev to kv.go
Browse files Browse the repository at this point in the history
HashByRev computes the hash of all MVCC keys up to a given revision.
  • Loading branch information
fanminshi committed Jul 14, 2017
1 parent fbb75d2 commit dd1c11e
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 3 deletions.
6 changes: 4 additions & 2 deletions mvcc/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,12 @@ type KV interface {
// Write creates a write transaction.
Write() TxnWrite

// Hash retrieves the hash of KV state and revision.
// This method is designed for consistency checking purposes.
// Hash computes the hash of the KV's backend.
Hash() (hash uint32, revision int64, err error)

// HashByRev computes the hash of all MVCC keys up to a given revision.
HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error)

// Compact frees all superseded keys with revisions less than rev.
Compact(rev int64) (<-chan struct{}, error)

Expand Down
59 changes: 58 additions & 1 deletion mvcc/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package mvcc
import (
"encoding/binary"
"errors"
"hash/crc32"
"math"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -98,11 +99,15 @@ type store struct {
fifoSched schedule.Scheduler

stopc chan struct{}

compactc chan struct{}
}

// NewStore returns a new store. It is useful to create a store inside
// mvcc pkg. It should only be used for testing externally.
func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *store {
closedc := make(chan struct{})
close(closedc)
s := &store{
b: b,
ig: ig,
Expand All @@ -116,7 +121,8 @@ func NewStore(b backend.Backend, le lease.Lessor, ig ConsistentIndexGetter) *sto
bytesBuf8: make([]byte, 8),
fifoSched: schedule.NewFIFOScheduler(),

stopc: make(chan struct{}),
stopc: make(chan struct{}),
compactc: closedc,
}
s.ReadView = &readView{s}
s.WriteView = &writeView{s}
Expand Down Expand Up @@ -160,6 +166,56 @@ func (s *store) Hash() (hash uint32, revision int64, err error) {
return h, s.currentRev, err
}

func (s *store) HashByRev(rev int64) (hash uint32, currentRev int64, compactRev int64, err error) {
readTx := s.b.ReadTx()
for {
s.mu.RLock()
compactRev = s.compactMainRev
currentRev = s.currentRev
compactc := s.compactc
s.mu.RUnlock()
if rev > 0 && rev < compactRev {
return 0, 0, compactRev, ErrCompacted
} else if rev > 0 && rev > currentRev {
return 0, currentRev, 0, ErrFutureRev
}
select {
case <-compactc:
case <-s.stopc:
return 0, 0, 0, ErrClosed
}
s.mu.Lock()
if s.compactc == compactc {
s.b.ForceCommit()
currentRev = s.currentRev
compactRev = s.compactMainRev
readTx.Lock()
defer readTx.Unlock()
s.mu.Unlock()
break
}
s.mu.Unlock()
}

if rev == 0 {
rev = currentRev
}
upper := revision{main: rev + 1}
h := crc32.New(crc32.MakeTable(crc32.Castagnoli))

h.Write(keyBucketName)
err = readTx.UnsafeForEach(keyBucketName, func(k, v []byte) error {
if !upper.GreaterThan(bytesToRev(k)) {
return nil
}
h.Write(k)
h.Write(v)
return nil
})

return h.Sum32(), currentRev, compactRev, err
}

func (s *store) Compact(rev int64) (<-chan struct{}, error) {
s.mu.Lock()
defer s.mu.Unlock()
Expand Down Expand Up @@ -192,6 +248,7 @@ func (s *store) Compact(rev int64) (<-chan struct{}, error) {

keep := s.kvindex.Compact(rev)
ch := make(chan struct{})
s.compactc = ch
var j = func(ctx context.Context) {
if ctx.Err() != nil {
s.compactBarrier(ctx, ch)
Expand Down

0 comments on commit dd1c11e

Please sign in to comment.