Skip to content
/ etcd Public
forked from etcd-io/etcd

Commit

Permalink
integration: watch cancel test
Browse files Browse the repository at this point in the history
Related etcd-io#4216.
  • Loading branch information
gyuho committed Jan 16, 2016
1 parent bc66139 commit f37bc72
Showing 1 changed file with 187 additions and 0 deletions.
187 changes: 187 additions & 0 deletions integration/v3_grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -432,3 +432,190 @@ func TestV3WatchFromCurrentRevision(t *testing.T) {
clus.Terminate(t)
}
}

// TestV3WatchCancel tests Watch APIs cancellation.
func TestV3WatchCancel(t *testing.T) {
tests := []struct { // send Put, WatchRequest, and then receive WatchResponse in order
preqs []*pb.PutRequest
wreqs []*pb.WatchRequest

wresps []*pb.WatchResponse
}{
// one key, one put, one watch, one cancellation
{
[]*pb.PutRequest{
nil,
{Key: []byte("foo"), Value: []byte("bar")},
nil,
nil,
},
[]*pb.WatchRequest{
{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}},
nil,
{CancelRequest: &pb.WatchCancelRequest{WatchId: 0}},
nil,
},

[]*pb.WatchResponse{
{
Header: &pb.ResponseHeader{Revision: 1},
Created: true,
Canceled: false,
},
{
Header: &pb.ResponseHeader{Revision: 2},
Created: false,
Canceled: false,
Events: []*storagepb.Event{
{
Type: storagepb.PUT,
Kv: &storagepb.KeyValue{Key: []byte("foo"), Value: []byte("bar"), CreateRevision: 2, ModRevision: 2, Version: 1},
},
},
},
nil,
{
Header: &pb.ResponseHeader{Revision: 2},
Created: false,
Canceled: true,
},
},
},
// no key, no put, 2 watch, 2 cancellation
{
[]*pb.PutRequest{
nil,
nil,
nil,
nil,
},
[]*pb.WatchRequest{
{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}},
{CreateRequest: &pb.WatchCreateRequest{Key: []byte("foo")}},
{CancelRequest: &pb.WatchCancelRequest{WatchId: 0}},
{CancelRequest: &pb.WatchCancelRequest{WatchId: 0}},
},

[]*pb.WatchResponse{
{
Header: &pb.ResponseHeader{Revision: 1},
Created: true,
Canceled: false,
},
{
Header: &pb.ResponseHeader{Revision: 1},
Created: true,
Canceled: false,
},
{
Header: &pb.ResponseHeader{Revision: 1},
Created: false,
Canceled: true,
},
{
Header: &pb.ResponseHeader{Revision: 1},
Created: false,
Canceled: true,
},
},
},
}

for i, tt := range tests {
clus := newClusterGRPC(t, &clusterConfig{size: 3})

kvc := pb.NewKVClient(clus.RandConn())
wAPI := pb.NewWatchClient(clus.RandConn())

wStream, err := wAPI.Watch(context.TODO())
if err != nil {
t.Fatalf("#%d: wAPI.Watch error: %v", i, err)
}

wcreatedN := 0
for _, wreq := range tt.wreqs {
if wreq == nil {
continue
}
if wreq.CreateRequest != nil {
wcreatedN++
}
}
createdWatchIds := make([]int64, wcreatedN)
createWatchIdx := 0 // increment whenever Watch gets created
cancelWatchIdx := 0 // increment whenever Watch gets canceled

for j := range tt.preqs {
preq := tt.preqs[j]
wreq := tt.wreqs[j]
wresp := tt.wresps[j]

// send PutRequest in the background
if preq != nil {
if _, err := kvc.Put(context.TODO(), preq); err != nil {
t.Errorf("#%d.%d: couldn't put key (%v)", i, j, err)
}
}

// send WatchRequest
if wreq != nil {
if wreq.CancelRequest != nil {
wreq.CancelRequest.WatchId = createdWatchIds[cancelWatchIdx]
cancelWatchIdx++
}
if err := wStream.Send(wreq); err != nil {
t.Fatalf("#%d.%d: wStream.Send error: %v", i, j, err)
}
}

// receive WatchRequest
if wresp != nil {
resp, err := wStream.Recv()
if err != nil {
t.Fatalf("#%d.%d: wStream.Recv error: %v", i, j, err)
}

if resp.Header == nil {
t.Fatalf("#%d.%d: unexpected nil resp.Header", i, j)
}
if resp.Header.Revision != wresp.Header.Revision {
t.Fatalf("#%d.%d: resp.Header.Revision got = %d, want = %d", i, j, resp.Header.Revision, wresp.Header.Revision)
}

if wresp.Created != resp.Created {
t.Fatalf("#%d.%d: resp.Created got = %v, want = %v", i, j, resp.Created, wresp.Created)
}
if resp.Created {
createdWatchIds[createWatchIdx] = resp.WatchId
createWatchIdx++
}

if wresp.Canceled != resp.Canceled {
t.Fatalf("#%d.%d: resp.Canceled got = %v, want = %v", i, j, resp.Canceled, wresp.Canceled)
}

if !reflect.DeepEqual(resp.Events, wresp.Events) {
t.Fatalf("#%d.%d: resp.Events got = %+v, want = %+v", i, j, resp.Events, wresp.Events)
}
}
}

rCh := make(chan *pb.WatchResponse)
go func() {
resp, _ := wStream.Recv()
rCh <- resp
}()
select {
case nr := <-rCh:
t.Errorf("#%d: unexpected response is received %+v", i, nr)
case <-time.After(2 * time.Second):
}
wStream.CloseSend()
rv, ok := <-rCh
if rv != nil || !ok {
t.Errorf("#%d: rv, ok got = %v %v, want = nil true", i, rv, ok)
}

clus.Terminate(t)
}
}

0 comments on commit f37bc72

Please sign in to comment.