Skip to content

Commit

Permalink
Merge pull request #5883 from westhood/master
Browse files Browse the repository at this point in the history
clientv3: fix sync base
  • Loading branch information
xiang90 authored Jul 7, 2016
2 parents c4a280e + 16b0c1d commit b6a4972
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 1 deletion.
54 changes: 54 additions & 0 deletions clientv3/integration/mirror_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
package integration

import (
"fmt"
"reflect"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -69,3 +71,55 @@ func TestMirrorSync(t *testing.T) {
t.Fatal("failed to receive update in one second")
}
}

func TestMirrorSyncBase(t *testing.T) {
cluster := integration.NewClusterV3(nil, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(nil)

cli := cluster.Client(0)
ctx := context.TODO()

keyCh := make(chan string)
var wg sync.WaitGroup

for i := 0; i < 50; i++ {
wg.Add(1)

go func() {
defer wg.Done()

for key := range keyCh {
if _, err := cli.Put(ctx, key, "test"); err != nil {
t.Fatal(err)
}
}
}()
}

for i := 0; i < 2000; i++ {
keyCh <- fmt.Sprintf("test%d", i)
}

close(keyCh)
wg.Wait()

syncer := mirror.NewSyncer(cli, "test", 0)
respCh, errCh := syncer.SyncBase(ctx)

count := 0

for resp := range respCh {
count = count + len(resp.Kvs)
if !resp.More {
break
}
}

for err := range errCh {
t.Fatalf("unexpected error %v", err)
}

if count != 2000 {
t.Errorf("unexpected kv count: %d", count)
}
}
2 changes: 1 addition & 1 deletion clientv3/mirror/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (s *syncer) SyncBase(ctx context.Context) (<-chan clientv3.GetResponse, cha
// If len(s.prefix) != 0, we will sync key-value space with given prefix.
// We then range from the prefix to the next prefix if exists. Or we will
// range from the prefix to the end if the next prefix does not exists.
opts = append(opts, clientv3.WithPrefix())
opts = append(opts, clientv3.WithRange(clientv3.GetPrefixRangeEnd(s.prefix)))
key = s.prefix
}

Expand Down
6 changes: 6 additions & 0 deletions clientv3/op.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,12 @@ func WithSort(target SortTarget, order SortOrder) OpOption {
}
}

// GetPrefixRangeEnd gets the range end of the prefix.
// 'Get(foo, WithPrefix())' is equal to 'Get(foo, WithRange(GetPrefixRangeEnd(foo))'.
func GetPrefixRangeEnd(prefix string) string {
return string(getPrefix([]byte(prefix)))
}

func getPrefix(key []byte) []byte {
end := make([]byte, len(key))
copy(end, key)
Expand Down

0 comments on commit b6a4972

Please sign in to comment.