From 89161b2673ca9ff532ed9e67fb0c4b923062534e Mon Sep 17 00:00:00 2001 From: Jack Kleeman Date: Mon, 11 May 2020 17:37:17 +0100 Subject: [PATCH] mvcc: avoid negative watcher count metrics The watch count metrics are not robust to duplicate cancellations. These cause the count to be decremented twice, leading eventually to negative counts. We are seeing this in production. The duplicate cancellations themselves are not themselves a big problem (except performance), but they are caused by the new proactive cancellation logic (#11850) which cancels proactively even immediately before initiating a Close, thus nearly guaranteeing a Close-cancel race, as discussed in watchable_store.go. We can avoid this in most cases by not sending a cancellation when we are going to Close. --- clientv3/watch.go | 12 ++++++------ integration/v3_watch_test.go | 29 +++++++++++++++++++++++++++++ mvcc/watchable_store.go | 5 ++++- 3 files changed, 39 insertions(+), 7 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 9b0cc036af3..b70d8c3bf86 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -511,6 +511,12 @@ func (w *watchGrpcStream) run() { case <-w.ctx.Done(): return case ws := <-w.closingc: + w.closeSubstream(ws) + delete(closing, ws) + if len(w.substreams)+len(w.resuming) == 0 { + // no more watchers on this stream, shutdown, skip cancellation + return + } if ws.id != -1 { // client is closing an established watch; close it on the server proactively instead of waiting // to close when the next message arrives @@ -523,12 +529,6 @@ func (w *watchGrpcStream) run() { req := &pb.WatchRequest{RequestUnion: cr} wc.Send(req) } - w.closeSubstream(ws) - delete(closing, ws) - if len(w.substreams)+len(w.resuming) == 0 { - // no more watchers on this stream, shutdown - return - } } } } diff --git a/integration/v3_watch_test.go b/integration/v3_watch_test.go index 2aca66605b1..042057d9a71 100644 --- a/integration/v3_watch_test.go +++ b/integration/v3_watch_test.go @@ -1233,3 +1233,32 @@ func TestV3WatchCancellation(t *testing.T) { t.Fatalf("expected one watch, got %s", minWatches) } } + +// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far. +func TestV3WatchCloseCancelRace(t *testing.T) { + clus := NewClusterV3(t, &ClusterConfig{Size: 1}) + defer clus.Terminate(t) + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + cli := clus.RandClient() + + for i := 0; i < 1000; i++ { + ctx, cancel := context.WithCancel(ctx) + cli.Watch(ctx, "/foo") + cancel() + } + + // Wait a little for cancellations to take hold + time.Sleep(3 * time.Second) + + minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total") + if err != nil { + t.Fatal(err) + } + + if minWatches != "0" { + t.Fatalf("expected zero watches, got %s", minWatches) + } +} diff --git a/mvcc/watchable_store.go b/mvcc/watchable_store.go index 78df19326b9..9ac418205c0 100644 --- a/mvcc/watchable_store.go +++ b/mvcc/watchable_store.go @@ -146,10 +146,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { s.mu.Lock() if s.unsynced.delete(wa) { slowWatcherGauge.Dec() + watcherGauge.Dec() break } else if s.synced.delete(wa) { + watcherGauge.Dec() break } else if wa.compacted { + watcherGauge.Dec() break } else if wa.ch == nil { // already canceled (e.g., cancel/close race) @@ -169,6 +172,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { } if victimBatch != nil { slowWatcherGauge.Dec() + watcherGauge.Dec() delete(victimBatch, wa) break } @@ -178,7 +182,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) { time.Sleep(time.Millisecond) } - watcherGauge.Dec() wa.ch = nil s.mu.Unlock() }