Skip to content
This repository has been archived by the owner on Feb 18, 2025. It is now read-only.

Commit

Permalink
mvcc: avoid negative watcher count metrics
Browse files Browse the repository at this point in the history
The watch count metrics are not robust to duplicate cancellations. These
cause the count to be decremented twice, leading eventually to negative
counts. We are seeing this in production. The duplicate cancellations
themselves are not themselves a big problem (except performance), but
they are caused by the new proactive cancellation logic (etcd-io#11850) which
cancels proactively even immediately before initiating a Close, thus
nearly guaranteeing a Close-cancel race, as discussed in
watchable_store.go. We can avoid this in most cases by not sending a
cancellation when we are going to Close.
  • Loading branch information
jackkleeman authored and suhailpatel committed Feb 3, 2022
1 parent e4c4409 commit d256dd0
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 7 deletions.
12 changes: 6 additions & 6 deletions clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -637,6 +637,12 @@ func (w *watchGrpcStream) run() {
return

case ws := <-w.closingc:
w.closeSubstream(ws)
delete(closing, ws)
if len(w.substreams)+len(w.resuming) == 0 {
// no more watchers on this stream, shutdown, skip cancellation
return
}
if ws.id != -1 {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
Expand All @@ -649,12 +655,6 @@ func (w *watchGrpcStream) run() {
req := &pb.WatchRequest{RequestUnion: cr}
wc.Send(req)
}
w.closeSubstream(ws)
delete(closing, ws)
// no more watchers on this stream, shutdown
if len(w.substreams)+len(w.resuming) == 0 {
return
}
}
}
}
Expand Down
29 changes: 29 additions & 0 deletions integration/v3_watch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1233,3 +1233,32 @@ func TestV3WatchCancellation(t *testing.T) {
t.Fatalf("expected one watch, got %s", minWatches)
}
}

// TestV3WatchCloseCancelRace ensures that watch close doesn't decrement the watcher total too far.
func TestV3WatchCloseCancelRace(t *testing.T) {
clus := NewClusterV3(t, &ClusterConfig{Size: 1})
defer clus.Terminate(t)

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

cli := clus.RandClient()

for i := 0; i < 1000; i++ {
ctx, cancel := context.WithCancel(ctx)
cli.Watch(ctx, "/foo")
cancel()
}

// Wait a little for cancellations to take hold
time.Sleep(3 * time.Second)

minWatches, err := clus.Members[0].Metric("etcd_debugging_mvcc_watcher_total")
if err != nil {
t.Fatal(err)
}

if minWatches != "0" {
t.Fatalf("expected zero watches, got %s", minWatches)
}
}
5 changes: 4 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
break
} else if wa.compacted {
watcherGauge.Dec()
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
Expand All @@ -169,6 +172,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
if victimBatch != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(victimBatch, wa)
break
}
Expand All @@ -178,7 +182,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
time.Sleep(time.Millisecond)
}

watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}
Expand Down

0 comments on commit d256dd0

Please sign in to comment.