Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
integration: add TestV3WatchMultipleStreams
Browse files Browse the repository at this point in the history
Related etcd-io#4216.
  • Loading branch information
gyuho committed Jan 19, 2016
1 parent d2e35f6 commit 2bfdc22
Showing 1 changed file with 74 additions and 3 deletions.
77 changes: 74 additions & 3 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"math/rand"
"reflect"
"sort"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -569,9 +570,9 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})

wAPI := pb.NewWatchClient(clus.RandConn())
wStream, err := wAPI.Watch(context.TODO())
if err != nil {
t.Fatalf("wAPI.Watch error: %v", err)
wStream, wErr := wAPI.Watch(context.TODO())
if wErr != nil {
t.Fatalf("wAPI.Watch error: %v", wErr)
}

if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil {
Expand Down Expand Up @@ -641,6 +642,76 @@ func (evs eventsSortByKey) Len() int { return len(evs) }
func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] }
func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 }

// TestV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams.
func TestV3WatchMultipleStreams(t *testing.T) {
clus := newClusterGRPC(t, &clusterConfig{size: 3})
wAPI := pb.NewWatchClient(clus.RandConn())
kvc := pb.NewKVClient(clus.RandConn())

streams := make([]pb.Watch_WatchClient, 5)
for i := range streams {
wStream, errW := wAPI.Watch(context.TODO())
if errW != nil {
t.Fatalf("wAPI.Watch error: %v", errW)
}
if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}); err != nil {
t.Fatalf("wStream.Send error: %v", err)
}
streams[i] = wStream
}

for _, wStream := range streams {
wresp, err := wStream.Recv()
if err != nil {
t.Fatalf("wStream.Recv error: %v", err)
}
if !wresp.Created {
t.Fatalf("wresp.Created got = %v, want = true", wresp.Created)
}
}

if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil {
t.Fatalf("couldn't put key (%v)", err)
}

var wg sync.WaitGroup
wg.Add(len(streams))
wevents := []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
}
for i := range streams {
go func(i int) {
defer wg.Done()
wStream := streams[i]
wresp, err := wStream.Recv()
if err != nil {
t.Fatalf("wStream.Recv error: %v", err)
}
if wresp.WatchId != 0 {
t.Errorf("watchId got = %d, want = 0", wresp.WatchId)
}
if !reflect.DeepEqual(wresp.Events, wevents) {
t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents)
}
}(i)
}
wg.Wait()

// now Recv should block because there is no more events coming
for i := range streams {
wStream := streams[i]
rok, nr := WaitResponse(wStream, 1*time.Second)
if !rok {
t.Errorf("unexpected pb.WatchResponse is received %+v", nr)
}
}

clus.Terminate(t)
}

// WaitResponse waits on the given stream for given duration.
// If there is no more events, true and a nil response will be
// returned closing the WatchClient stream. Or the response will
Expand Down

0 comments on commit 2bfdc22

Please sign in to comment.