Skip to content

Commit

Permalink
storage: start initial revision at 1
Browse files Browse the repository at this point in the history
When the start revision was 0, there was no way to safely watch
starting from the first store revision.
  • Loading branch information
Anthony Romano committed Jan 15, 2016
1 parent 02ab7be commit ecba9b6
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 97 deletions.
143 changes: 62 additions & 81 deletions storage/kv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,9 @@ func testKVRange(t *testing.T, f rangeFunc) {
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
kvs := put3TestKVs(s)

wrev := int64(3)
wrev := int64(4)
tests := []struct {
key, end []byte
wkvs []storagepb.KeyValue
Expand Down Expand Up @@ -151,25 +144,18 @@ func testKVRangeRev(t *testing.T, f rangeFunc) {
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
kvs := put3TestKVs(s)

tests := []struct {
rev int64
wrev int64
wkvs []storagepb.KeyValue
}{
{-1, 3, kvs},
{0, 3, kvs},
{1, 1, kvs[:1]},
{2, 2, kvs[:2]},
{3, 3, kvs},
{-1, 4, kvs},
{0, 4, kvs},
{2, 2, kvs[:1]},
{3, 3, kvs[:2]},
{4, 4, kvs},
}

for i, tt := range tests {
Expand All @@ -194,10 +180,8 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
s.Put([]byte("foo1"), []byte("bar1"), lease.NoLease)
s.Put([]byte("foo2"), []byte("bar2"), lease.NoLease)
if err := s.Compact(3); err != nil {
put3TestKVs(s)
if err := s.Compact(4); err != nil {
t.Fatalf("compact error (%v)", err)
}

Expand All @@ -206,9 +190,9 @@ func testKVRangeBadRev(t *testing.T, f rangeFunc) {
werr error
}{
{-1, ErrCompacted},
{1, ErrCompacted},
{2, ErrCompacted},
{3, ErrCompacted},
{4, ErrFutureRev},
{5, ErrFutureRev},
{100, ErrFutureRev},
}
for i, tt := range tests {
Expand All @@ -227,16 +211,9 @@ func testKVRangeLimit(t *testing.T, f rangeFunc) {
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
kvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
kvs := put3TestKVs(s)

wrev := int64(3)
wrev := int64(4)
tests := []struct {
limit int64
wkvs []storagepb.KeyValue
Expand Down Expand Up @@ -276,16 +253,16 @@ func testKVPutMultipleTimes(t *testing.T, f putFunc) {
base := int64(i + 1)

rev := f(s, []byte("foo"), []byte("bar"), lease.LeaseID(base))
if rev != base {
t.Errorf("#%d: rev = %d, want %d", i, rev, base)
if rev != base+1 {
t.Errorf("#%d: rev = %d, want %d", i, rev, base+1)
}

kvs, _, err := s.Range([]byte("foo"), nil, 0, 0)
if err != nil {
t.Fatal(err)
}
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: base, Version: base, Lease: base},
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: base + 1, Version: base, Lease: base},
}
if !reflect.DeepEqual(kvs, wkvs) {
t.Errorf("#%d: kvs = %+v, want %+v", i, kvs, wkvs)
Expand All @@ -305,27 +282,27 @@ func testKVDeleteRange(t *testing.T, f deleteRangeFunc) {
}{
{
[]byte("foo"), nil,
4, 1,
5, 1,
},
{
[]byte("foo"), []byte("foo1"),
4, 1,
5, 1,
},
{
[]byte("foo"), []byte("foo2"),
4, 2,
5, 2,
},
{
[]byte("foo"), []byte("foo3"),
4, 3,
5, 3,
},
{
[]byte("foo3"), []byte("foo8"),
3, 0,
4, 0,
},
{
[]byte("foo3"), nil,
3, 0,
4, 0,
},
}

Expand Down Expand Up @@ -357,14 +334,14 @@ func testKVDeleteMultipleTimes(t *testing.T, f deleteRangeFunc) {
s.Put([]byte("foo"), []byte("bar"), lease.NoLease)

n, rev := f(s, []byte("foo"), nil)
if n != 1 || rev != 2 {
t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 2)
if n != 1 || rev != 3 {
t.Fatalf("n = %d, rev = %d, want (%d, %d)", n, rev, 1, 3)
}

for i := 0; i < 10; i++ {
n, rev := f(s, []byte("foo"), nil)
if n != 0 || rev != 2 {
t.Fatalf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 0, 2)
if n != 0 || rev != 3 {
t.Fatalf("#%d: n = %d, rev = %d, want (%d, %d)", i, n, rev, 0, 3)
}
}
}
Expand All @@ -376,7 +353,7 @@ func TestKVOperationInSequence(t *testing.T) {
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
base := int64(i * 2)
base := int64(i*2 + 1)

// put foo
rev := s.Put([]byte("foo"), []byte("bar"), lease.NoLease)
Expand Down Expand Up @@ -417,7 +394,7 @@ func TestKVOperationInSequence(t *testing.T) {
}
}

func TestKVTxnBlockNonTnxOperations(t *testing.T) {
func TestKVTxnBlockNonTxnOperations(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{})

Expand Down Expand Up @@ -488,14 +465,14 @@ func TestKVTxnWrongID(t *testing.T) {
}

// test that txn range, put, delete on single key in sequence repeatedly works correctly.
func TestKVTnxOperationInSequence(t *testing.T) {
func TestKVTxnOperationInSequence(t *testing.T) {
b, tmpPath := backend.NewDefaultTmpBackend()
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

for i := 0; i < 10; i++ {
id := s.TxnBegin()
base := int64(i)
base := int64(i + 1)

// put foo
rev, err := s.TxnPut(id, []byte("foo"), []byte("bar"), lease.NoLease)
Expand Down Expand Up @@ -561,25 +538,25 @@ func TestKVCompactReserveLastValue(t *testing.T) {
wkvs []storagepb.KeyValue
}{
{
0,
1,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo"), Value: []byte("bar0"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 1},
},
},
{
1,
2,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 1, ModRevision: 2, Version: 2, Lease: 2},
{Key: []byte("foo"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 3, Version: 2, Lease: 2},
},
},
{
2,
3,
nil,
},
{
3,
4,
[]storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
{Key: []byte("foo"), Value: []byte("bar2"), CreateRevision: 5, ModRevision: 5, Version: 1, Lease: 3},
},
},
}
Expand Down Expand Up @@ -615,8 +592,8 @@ func TestKVCompactBad(t *testing.T) {
{0, nil},
{1, nil},
{1, ErrCompacted},
{3, nil},
{4, ErrFutureRev},
{4, nil},
{5, ErrFutureRev},
{100, ErrFutureRev},
}
for i, tt := range tests {
Expand Down Expand Up @@ -701,14 +678,7 @@ func TestKVSnapshot(t *testing.T) {
s := NewStore(b, &lease.FakeLessor{})
defer cleanup(s, b, tmpPath)

s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
wkvs := []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 1, ModRevision: 1, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 3},
}
wkvs := put3TestKVs(s)

newPath := "new_test"
f, err := os.Create(newPath)
Expand All @@ -734,8 +704,8 @@ func TestKVSnapshot(t *testing.T) {
if !reflect.DeepEqual(kvs, wkvs) {
t.Errorf("kvs = %+v, want %+v", kvs, wkvs)
}
if rev != 3 {
t.Errorf("rev = %d, want %d", rev, 3)
if rev != 4 {
t.Errorf("rev = %d, want %d", rev, 4)
}
}

Expand All @@ -757,8 +727,8 @@ func TestWatchableKVWatch(t *testing.T) {
Kv: &storagepb.KeyValue{
Key: []byte("foo"),
Value: []byte("bar"),
CreateRevision: 1,
ModRevision: 1,
CreateRevision: 2,
ModRevision: 2,
Version: 1,
Lease: 1,
},
Expand All @@ -783,8 +753,8 @@ func TestWatchableKVWatch(t *testing.T) {
Kv: &storagepb.KeyValue{
Key: []byte("foo1"),
Value: []byte("bar1"),
CreateRevision: 2,
ModRevision: 2,
CreateRevision: 3,
ModRevision: 3,
Version: 1,
Lease: 2,
},
Expand All @@ -810,8 +780,8 @@ func TestWatchableKVWatch(t *testing.T) {
Kv: &storagepb.KeyValue{
Key: []byte("foo1"),
Value: []byte("bar1"),
CreateRevision: 2,
ModRevision: 2,
CreateRevision: 3,
ModRevision: 3,
Version: 1,
Lease: 2,
},
Expand All @@ -835,8 +805,8 @@ func TestWatchableKVWatch(t *testing.T) {
Kv: &storagepb.KeyValue{
Key: []byte("foo1"),
Value: []byte("bar11"),
CreateRevision: 2,
ModRevision: 3,
CreateRevision: 3,
ModRevision: 4,
Version: 2,
Lease: 3,
},
Expand All @@ -858,3 +828,14 @@ func cleanup(s KV, b backend.Backend, path string) {
b.Close()
os.Remove(path)
}

func put3TestKVs(s KV) []storagepb.KeyValue {
s.Put([]byte("foo"), []byte("bar"), 1)
s.Put([]byte("foo1"), []byte("bar1"), 2)
s.Put([]byte("foo2"), []byte("bar2"), 3)
return []storagepb.KeyValue{
{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1, Lease: 1},
{Key: []byte("foo1"), Value: []byte("bar1"), CreateRevision: 3, ModRevision: 3, Version: 1, Lease: 2},
{Key: []byte("foo2"), Value: []byte("bar2"), CreateRevision: 4, ModRevision: 4, Version: 1, Lease: 3},
}
}
6 changes: 3 additions & 3 deletions storage/kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewStore(b backend.Backend, le lease.Lessor) *store {

le: le,

currentRev: revision{},
currentRev: revision{main: 1},
compactMainRev: -1,
stopc: make(chan struct{}),
}
Expand Down Expand Up @@ -256,7 +256,7 @@ func (s *store) Restore(b backend.Backend) error {

s.b = b
s.kvindex = newTreeIndex()
s.currentRev = revision{}
s.currentRev = revision{main: 1}
s.compactMainRev = -1
s.tx = b.BatchTx()
s.txnID = -1
Expand All @@ -267,7 +267,7 @@ func (s *store) Restore(b backend.Backend) error {

func (s *store) restore() error {
min, max := newRevBytes(), newRevBytes()
revToBytes(revision{}, min)
revToBytes(revision{main: 1}, min)
revToBytes(revision{main: math.MaxInt64, sub: math.MaxInt64}, max)

// restore index
Expand Down
Loading

0 comments on commit ecba9b6

Please sign in to comment.