From c3c41234f17d71c99178a3d904076fdc2e8be5e3 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 8 Aug 2016 23:45:50 -0700 Subject: [PATCH 1/3] integration: support querying member metrics --- integration/cluster.go | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/integration/cluster.go b/integration/cluster.go index 80d84504da8..78e1f469405 100644 --- a/integration/cluster.go +++ b/integration/cluster.go @@ -708,6 +708,32 @@ func (m *member) Terminate(t *testing.T) { plog.Printf("terminated %s (%s)", m.Name, m.grpcAddr) } +// Metric gets the metric value for a member +func (m *member) Metric(metricName string) (string, error) { + cfgtls := transport.TLSInfo{} + tr, err := transport.NewTimeoutTransport(cfgtls, time.Second, time.Second, time.Second) + if err != nil { + return "", err + } + cli := &http.Client{Transport: tr} + resp, err := cli.Get(m.ClientURLs[0].String() + "/metrics") + if err != nil { + return "", err + } + defer resp.Body.Close() + b, rerr := ioutil.ReadAll(resp.Body) + if rerr != nil { + return "", rerr + } + lines := strings.Split(string(b), "\n") + for _, l := range lines { + if strings.HasPrefix(l, metricName) { + return strings.Split(l, " ")[1], nil + } + } + return "", nil +} + func MustNewHTTPClient(t *testing.T, eps []string, tls *transport.TLSInfo) client.Client { cfgtls := transport.TLSInfo{} if tls != nil { From 5e651a0d0d23785a68503a337059ba00d967e2a1 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Tue, 9 Aug 2016 00:08:28 -0700 Subject: [PATCH 2/3] clientv3: close watcher stream once all watchers detach Fixes #6134 --- clientv3/watch.go | 25 ++++++++++++++++++------- 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 4b73446150d..ef4aa5304e7 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -311,7 +311,12 @@ func (w *watcher) Close() (err error) { } func (w *watchGrpcStream) Close() (err error) { - close(w.stopc) + w.mu.Lock() + if w.stopc != nil { + close(w.stopc) + w.stopc = nil + } + w.mu.Unlock() <-w.donec select { case err = <-w.errc: @@ -374,11 +379,17 @@ func (w *watchGrpcStream) addStream(resp *pb.WatchResponse, pendingReq *watchReq // closeStream closes the watcher resources and removes it func (w *watchGrpcStream) closeStream(ws *watcherStream) { + w.mu.Lock() // cancels request stream; subscriber receives nil channel close(ws.initReq.retc) // close subscriber's channel close(ws.outc) delete(w.streams, ws.id) + if len(w.streams) == 0 && w.stopc != nil { + close(w.stopc) + w.stopc = nil + } + w.mu.Unlock() } // run is the root of the goroutines for managing a watcher client @@ -404,6 +415,7 @@ func (w *watchGrpcStream) run() { var pendingReq, failedReq *watchRequest curReqC := w.reqc + stopc := w.stopc cancelSet := make(map[int64]struct{}) for { @@ -473,7 +485,7 @@ func (w *watchGrpcStream) run() { failedReq = pendingReq } cancelSet = make(map[int64]struct{}) - case <-w.stopc: + case <-stopc: return } @@ -625,9 +637,7 @@ func (w *watchGrpcStream) serveStream(ws *watcherStream) { } } - w.mu.Lock() w.closeStream(ws) - w.mu.Unlock() // lazily send cancel message if events on missing id } @@ -655,13 +665,14 @@ func (w *watchGrpcStream) resume() (ws pb.Watch_WatchClient, err error) { // openWatchClient retries opening a watchclient until retryConnection fails func (w *watchGrpcStream) openWatchClient() (ws pb.Watch_WatchClient, err error) { for { - select { - case <-w.stopc: + w.mu.Lock() + stopc := w.stopc + w.mu.Unlock() + if stopc == nil { if err == nil { err = context.Canceled } return nil, err - default: } if ws, err = w.remote.Watch(w.ctx, grpc.FailFast(false)); ws != nil && err == nil { break From 8c1c291332275fdb77432b3874119e461bca7614 Mon Sep 17 00:00:00 2001 From: Anthony Romano Date: Mon, 8 Aug 2016 23:47:16 -0700 Subject: [PATCH 3/3] clientv3/integration: test watcher cancelation propagation to server --- clientv3/integration/watch_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/clientv3/integration/watch_test.go b/clientv3/integration/watch_test.go index c63866a8ce7..8a768857601 100644 --- a/clientv3/integration/watch_test.go +++ b/clientv3/integration/watch_test.go @@ -727,3 +727,27 @@ func TestWatchWithCreatedNotification(t *testing.T) { t.Fatalf("expected created event, got %v", resp) } } + +// TestWatchCancelOnServer ensures client watcher cancels propagate back to the server. +func TestWatchCancelOnServer(t *testing.T) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + defer cluster.Terminate(t) + + client := cluster.RandClient() + + for i := 0; i < 10; i++ { + ctx, cancel := context.WithTimeout(context.Background(), time.Second) + client.Watch(ctx, "a", clientv3.WithCreatedNotify()) + cancel() + } + // wait for cancels to propagate + time.Sleep(time.Second) + + watchers, err := cluster.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + if watchers != "0" { + t.Fatalf("expected 0 watchers, got %q", watchers) + } +}