diff --git a/clientv3/watch.go b/clientv3/watch.go index 87d222d1d68..bec01993687 100644 --- a/clientv3/watch.go +++ b/clientv3/watch.go @@ -25,6 +25,7 @@ import ( pb "go.etcd.io/etcd/etcdserver/etcdserverpb" mvccpb "go.etcd.io/etcd/mvcc/mvccpb" + "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -140,6 +141,7 @@ type watcher struct { // streams holds all the active grpc streams keyed by ctx value. streams map[string]*watchGrpcStream + lg *zap.Logger } // watchGrpcStream tracks all watch resources attached to a single grpc stream. @@ -242,6 +244,7 @@ func NewWatchFromWatchClient(wc pb.WatchClient, c *Client) Watcher { } if c != nil { w.callOpts = c.callOpts + w.lg = c.lg } return w } @@ -544,10 +547,14 @@ func (w *watchGrpcStream) run() { w.resuming = append(w.resuming, ws) if len(w.resuming) == 1 { // head of resume queue, can register a new watcher - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } case *progressRequest: - wc.Send(wreq.toPB()) + if err := wc.Send(wreq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } // new events from the watch client @@ -571,7 +578,9 @@ func (w *watchGrpcStream) run() { } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } // reset for next iteration @@ -616,7 +625,9 @@ func (w *watchGrpcStream) run() { }, } req := &pb.WatchRequest{RequestUnion: cr} - wc.Send(req) + if err := wc.Send(req); err != nil { + lg.Warningf("error when sending request: %v", err) + } } // watch client failed on Recv; spawn another if possible @@ -629,7 +640,9 @@ func (w *watchGrpcStream) run() { return } if ws := w.nextResume(); ws != nil { - wc.Send(ws.initReq.toPB()) + if err := wc.Send(ws.initReq.toPB()); err != nil { + lg.Warningf("error when sending request: %v", err) + } } cancelSet = make(map[int64]struct{})