Skip to content

Commit

Permalink
mvcc: avoid negative watcher count metrics
Browse files Browse the repository at this point in the history
The watch count metrics are not robust to duplicate cancellations. These
cause the count to be decremented twice, leading eventually to negative
counts. We are seeing this in production. The duplicate cancellations
themselves are not themselves a big problem (except performance), but
they are caused by the new proactive cancellation logic (#11850). As it
turns out, w.closingc seems to receive two messages for a cancellation.
I have added a fix which ensures that we won't send duplicate cancel
requests.
  • Loading branch information
jackkleeman committed May 11, 2020
1 parent c667c14 commit 67e464c
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
6 changes: 5 additions & 1 deletion clientv3/watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -650,7 +650,11 @@ func (w *watchGrpcStream) run() {
return

case ws := <-w.closingc:
if ws.id != -1 {
if ws.id == -1 {
// this stream hasn't actually started, don't attempt to cancel
} else if _, ok := w.substreams[ws.id]; !ok {
// this is a duplicate cancellation, the substream no longer exists
} else {
// client is closing an established watch; close it on the server proactively instead of waiting
// to close when the next message arrives
cancelSet[ws.id] = struct{}{}
Expand Down
5 changes: 4 additions & 1 deletion mvcc/watchable_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,13 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
s.mu.Lock()
if s.unsynced.delete(wa) {
slowWatcherGauge.Dec()
watcherGauge.Dec()
break
} else if s.synced.delete(wa) {
watcherGauge.Dec()
break
} else if wa.compacted {
watcherGauge.Dec()
break
} else if wa.ch == nil {
// already canceled (e.g., cancel/close race)
Expand All @@ -177,6 +180,7 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
}
if victimBatch != nil {
slowWatcherGauge.Dec()
watcherGauge.Dec()
delete(victimBatch, wa)
break
}
Expand All @@ -186,7 +190,6 @@ func (s *watchableStore) cancelWatcher(wa *watcher) {
time.Sleep(time.Millisecond)
}

watcherGauge.Dec()
wa.ch = nil
s.mu.Unlock()
}
Expand Down

0 comments on commit 67e464c

Please sign in to comment.