From 12362d292dbd2c8998995f1623ebab0abfbd39cd Mon Sep 17 00:00:00 2001 From: Gyu-Ho Lee Date: Tue, 19 Jan 2016 14:13:09 -0800 Subject: [PATCH] integration: add TestV3WatchMultipleStreams Related https://github.com/coreos/etcd/issues/4216. --- integration/v3_grpc_test.go | 73 +++++++++++++++++++++++++++++++++++-- 1 file changed, 70 insertions(+), 3 deletions(-) diff --git a/integration/v3_grpc_test.go b/integration/v3_grpc_test.go index 2cc899b6d4b..1496ff42648 100644 --- a/integration/v3_grpc_test.go +++ b/integration/v3_grpc_test.go @@ -19,6 +19,7 @@ import ( "math/rand" "reflect" "sort" + "sync" "testing" "time" @@ -569,9 +570,9 @@ func TestV3WatchMultipleEventsFromCurrentRevision(t *testing.T) { clus := newClusterGRPC(t, &clusterConfig{size: 3}) wAPI := pb.NewWatchClient(clus.RandConn()) - wStream, err := wAPI.Watch(context.TODO()) - if err != nil { - t.Fatalf("wAPI.Watch error: %v", err) + wStream, wErr := wAPI.Watch(context.TODO()) + if wErr != nil { + t.Fatalf("wAPI.Watch error: %v", wErr) } if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Prefix: []byte("foo")}}); err != nil { @@ -641,6 +642,72 @@ func (evs eventsSortByKey) Len() int { return len(evs) } func (evs eventsSortByKey) Swap(i, j int) { evs[i], evs[j] = evs[j], evs[i] } func (evs eventsSortByKey) Less(i, j int) bool { return bytes.Compare(evs[i].Kv.Key, evs[j].Kv.Key) < 0 } +// TestV3WatchMultipleStreams tests multiple watchers on the same key on multiple streams. +func TestV3WatchMultipleStreams(t *testing.T) { + clus := newClusterGRPC(t, &clusterConfig{size: 3}) + wAPI := pb.NewWatchClient(clus.RandConn()) + kvc := pb.NewKVClient(clus.RandConn()) + + streams := make([]pb.Watch_WatchClient, 5) + for i := range streams { + wStream, errW := wAPI.Watch(context.TODO()) + if errW != nil { + t.Fatalf("wAPI.Watch error: %v", errW) + } + if err := wStream.Send(&pb.WatchRequest{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}}); err != nil { + t.Fatalf("wStream.Send error: %v", err) + } + streams[i] = wStream + } + + for _, wStream := range streams { + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + if !wresp.Created { + t.Fatalf("wresp.Created got = %v, want = true", wresp.Created) + } + } + + if _, err := kvc.Put(context.TODO(), &pb.PutRequest{Key: []byte("foo"), Value: []byte("bar")}); err != nil { + t.Fatalf("couldn't put key (%v)", err) + } + + var wg sync.WaitGroup + wg.Add(len(streams)) + wevents := []*storagepb.Event{ + { + Type: storagepb.PUT, + Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1}, + }, + } + for i := range streams { + go func(i int) { + defer wg.Done() + wStream := streams[i] + wresp, err := wStream.Recv() + if err != nil { + t.Fatalf("wStream.Recv error: %v", err) + } + if wresp.WatchId != 0 { + t.Errorf("watchId got = %d, want = 0", wresp.WatchId) + } + if !reflect.DeepEqual(wresp.Events, wevents) { + t.Errorf("wresp.Events got = %+v, want = %+v", wresp.Events, wevents) + } + // now Recv should block because there is no more events coming + rok, nr := WaitResponse(wStream, 1*time.Second) + if !rok { + t.Errorf("unexpected pb.WatchResponse is received %+v", nr) + } + }(i) + } + wg.Wait() + + clus.Terminate(t) +} + // WaitResponse waits on the given stream for given duration. // If there is no more events, true and a nil response will be // returned closing the WatchClient stream. Or the response will