Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
integration: test with multi events in unsynced
Browse files Browse the repository at this point in the history
Related etcd-io#4216.
  • Loading branch information
gyuho committed Jan 20, 2016
1 parent 72195af commit b0e9591
Showing 1 changed file with 97 additions and 12 deletions.
109 changes: 97 additions & 12 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,6 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
},
},
},

// TODO: watch and receive multiple-events from synced (need Txn)
}

for i, tt := range tests {
Expand Down Expand Up @@ -565,7 +563,7 @@ func TestV3WatchMultiple(t *testing.T) {
}

// TestV3WatchMultipleEventsFromCurrentRevision tests Watch APIs from current revision
// in cases it receives multiple events.
// when it receives multiple events.
func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})

Expand All @@ -579,14 +577,13 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
t.Fatalf("wStream.Send error: %v", err)
}

kvc := pb.NewKVClient(clus.RandConn())
txn := pb.TxnRequest{}
for i := 0; i < 3; i++ {
for i := 0; i < 2; i++ {
ru := &pb.RequestUnion{}
ru.RequestPut = &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}
txn.Success = append(txn.Success, ru)
}

kvc := pb.NewKVClient(clus.RandConn())
tresp, err := kvc.Txn(context.Background(), &txn)
if err != nil {
t.Fatalf("kvc.Txn error: %v", err)
Expand All @@ -595,8 +592,18 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
t.Fatalf("kvc.Txn failed: %+v", tresp)
}

wevents := []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
}
events := []*storagepb.Event{}
for len(events) < 3 {
for len(events) < 2 {
resp, err := wStream.Recv()
if err != nil {
t.Errorf("wStream.Recv error: %v", err)
Expand All @@ -605,26 +612,104 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
continue
}
events = append(events, resp.Events...)
break
}
sort.Sort(eventsSortByKey(events))

if !reflect.DeepEqual(events, wevents) {
t.Errorf("events got = %+v, want = %+v", events, wevents)
}

rok, nr := WaitResponse(wStream, 1*time.Second)
if !rok {
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
}

// can't defer because tcp ports will be in use
clus.Terminate(t)
}

// TestV3WatchMultipleEventsFromUnsynced tests Watch APIs from unsynced watchers
// when it receives multiple events.
func TestV3WatchMultipleEventsFromUnsynced(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})

kvc := pb.NewKVClient(clus.RandConn())
for i := 0; i < 2; i++ {
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("val")}); err != nil {
t.Fatalf("couldn't put key (%v)", err)
}
}

wAPI := pb.NewWatchClient(clus.RandConn())
wStream, wErr := wAPI.Watch(context.TODO())
if wErr != nil {
t.Fatalf("wAPI.Watch error: %v", wErr)
}

if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo"), StartRevision: 1}}); err != nil {
t.Fatalf("wStream.Send error: %v", err)
}

for i := 0; i < 2; i++ {
if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte(fmt.Sprintf("foo%d", i)), Value: []byte("bar")}); err != nil {
t.Fatalf("couldn't put key (%v)", err)
}
}

wevents := []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("val"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("val"), CreateRevision: 3, ModRevision: 3, Version: 1},
},
}
events := []*storagepb.Event{}
for len(events) < 2 {
resp, err := wStream.Recv()
if err != nil {
t.Errorf("wStream.Recv error: %v", err)
}
if resp.Created {
continue
}
events = append(events, resp.Events...)
break // should receive multiple events first
}
sort.Sort(eventsSortByKey(events))

if !reflect.DeepEqual(events, wevents) {
t.Errorf("events got = %+v, want = %+v", events, wevents)
}

lastWevents := []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo0"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 4, Version: 2},
},
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo2"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
Kv: &storagepb.KeyValue{Key: []byte("foo1"), Value: []byte("bar"), CreateRevision: 3, ModRevision: 5, Version: 2},
},
}
lastEvents := []*storagepb.Event{}
for len(lastEvents) < 2 {
resp, err := wStream.Recv()
if err != nil {
t.Errorf("wStream.Recv error: %v", err)
}
if resp.Created {
continue
}
lastEvents = append(lastEvents, resp.Events...)
}
sort.Sort(eventsSortByKey(lastEvents))

if !reflect.DeepEqual(events, wevents) {
t.Errorf("events got = %+v, want = %+v", events, wevents)
if !reflect.DeepEqual(lastEvents, lastWevents) {
t.Errorf("lastEvents got = %+v, want = %+v", lastEvents, lastWevents)
}

rok, nr := WaitResponse(wStream, 1*time.Second)
Expand Down

0 comments on commit b0e9591

Please sign in to comment.