diff --git a/clientv3/watch.go b/clientv3/watch.go index 36bf1a71820..3612d35f165 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -650,6 +650,12 @@ func (w *watchGrpcStream) run() { return case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + // no more watchers on this stream, shutdown, skip cancellation + if len(w.substreams)+len(w.resuming) == 0 { + return + } if ws.id != -1 { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives @@ -665,12 +671,6 @@ func (w *watchGrpcStream) run() { lg.Warning("failed to send watch cancel request", zap.Int64("watch-id", ws.id), zap.Error(err)) } } - w.closeSubstream(ws) - delete(closing, ws) - // no more watchers on this stream, shutdown - if len(w.substreams)+len(w.resuming) == 0 { - return - } } } } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index de1ee4f295c..4ae371973c3 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1245,3 +1245,32 @@ func TestV3WatchCancellation(t *testing.T) { t.Fatalf("expected one watch, got %s", minWatches) } } + +// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far. +func TestV3WatchCloseCancelRace(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cli := clus.RandClient() + + for i := 0; i < 1000; i++ { + ctx, cancel := context.WithCancel(ctx) + cli.Watch(ctx, "/foo") + cancel() + } + + // Wait a little for cancellations to take hold + time.Sleep(3 * time.Second) + + minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + + if minWatches != "0" { + t.Fatalf("expected zero watches, got %s", minWatches) + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 15e2c55f5c2..b74fa375f95 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }