From f9761381865eac29efa4f409fdc1654a2ef95237 Mon Sep 17 00:00:00 2001 From: Ted Yu Date: Wed, 20 May 2020 15:44:41 -0700 Subject: [PATCH] clientv3: non-recursive watch Signed-off-by: Ted Yu --- clientv3/watch.go | 82 ++++++++++++++++++++++++++--------------------- 1 file changed, 45 insertions(+), 37 deletions(-) diff --git a/clientv3/watch.go b/clientv3/watch.go index 3612d35f165..d3981bfd9d1 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -311,55 +311,63 @@ func (w *watcher) Watch(ctx context.Context, key string, opts ...OpOption) Watch ok := false ctxKey := streamKeyFromCtx(ctx) - // find or allocate appropriate grpc watch stream - w.mu.Lock() - if w.streams == nil { - // closed + var closeCh chan WatchResponse + for { + // find or allocate appropriate grpc watch stream + w.mu.Lock() + if w.streams == nil { + // closed + w.mu.Unlock() + ch := make(chan WatchResponse) + close(ch) + return ch + } + wgs := w.streams[ctxKey] + if wgs == nil { + wgs = w.newWatcherGrpcStream(ctx) + w.streams[ctxKey] = wgs + } + donec := wgs.donec + reqc := wgs.reqc w.mu.Unlock() - ch := make(chan WatchResponse) - close(ch) - return ch - } - wgs := w.streams[ctxKey] - if wgs == nil { - wgs = w.newWatcherGrpcStream(ctx) - w.streams[ctxKey] = wgs - } - donec := wgs.donec - reqc := wgs.reqc - w.mu.Unlock() - - // couldn't create channel; return closed channel - closeCh := make(chan WatchResponse, 1) - // submit request - select { - case reqc <- wr: - ok = true - case <-wr.ctx.Done(): - case <-donec: - if wgs.closeErr != nil { - closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} - break + // couldn't create channel; return closed channel + if closeCh == nil { + closeCh = make(chan WatchResponse, 1) } - // retry; may have dropped stream from no ctxs - return w.Watch(ctx, key, opts...) - } - // receive channel - if ok { + // submit request select { - case ret := <-wr.retc: - return ret - case <-ctx.Done(): + case reqc <- wr: + ok = true + case <-wr.ctx.Done(): + ok = false case <-donec: + ok = false if wgs.closeErr != nil { closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} break } // retry; may have dropped stream from no ctxs - return w.Watch(ctx, key, opts...) + continue + } + + // receive channel + if ok { + select { + case ret := <-wr.retc: + return ret + case <-ctx.Done(): + case <-donec: + if wgs.closeErr != nil { + closeCh <- WatchResponse{Canceled: true, closeErr: wgs.closeErr} + break + } + // retry; may have dropped stream from no ctxs + continue + } } + break } close(closeCh)