diff --git a/CHANGELOG-3.5.md b/CHANGELOG-3.5.md index d4fd7d83cbe..8b816d3b462 100644 --- a/CHANGELOG-3.5.md +++ b/CHANGELOG-3.5.md @@ -177,6 +177,7 @@ Note that any `etcd_debugging_*` metrics are experimental and subject to change. - Fix [`panic on error`](https://github.com/etcd-io/etcd/pull/11694) for metrics handler. - Add [gRPC keepalive related flags](https://github.com/etcd-io/etcd/pull/11711) `grpc-keepalive-min-time`, `grpc-keepalive-interval` and `grpc-keepalive-timeout`. +- [Fix grpc watch proxy hangs when failed to cancel a watcher](https://github.com/etcd-io/etcd/pull/12030) . ### Auth diff --git a/etcdmain/grpc_proxy.go b/etcdmain/grpc_proxy.go index 72bce98aaa8..0e7665f3274 100644 --- a/etcdmain/grpc_proxy.go +++ b/etcdmain/grpc_proxy.go @@ -357,7 +357,7 @@ func newGRPCProxyServer(lg *zap.Logger, client *clientv3.Client) *grpc.Server { } kvp, _ := grpcproxy.NewKvProxy(client) - watchp, _ := grpcproxy.NewWatchProxy(client) + watchp, _ := grpcproxy.NewWatchProxy(lg, client) if grpcProxyResolverPrefix != "" { grpcproxy.Register(lg, client, grpcProxyResolverPrefix, grpcProxyAdvertiseClientURL, grpcProxyResolverTTL) } diff --git a/proxy/grpcproxy/watch.go b/proxy/grpcproxy/watch.go index 417b559dbf3..fc62c4aa399 100644 --- a/proxy/grpcproxy/watch.go +++ b/proxy/grpcproxy/watch.go @@ -23,6 +23,7 @@ import ( "go.etcd.io/etcd/v3/etcdserver/api/v3rpc/rpctypes" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" "google.golang.org/grpc/status" @@ -44,9 +45,10 @@ type watchProxy struct { // kv is used for permission checking kv clientv3.KV + lg *zap.Logger } -func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { +func NewWatchProxy(lg *zap.Logger, c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { cctx, cancel := context.WithCancel(c.Ctx()) wp := &watchProxy{ cw: c.Watcher, @@ -54,6 +56,7 @@ func NewWatchProxy(c *clientv3.Client) (pb.WatchServer, <-chan struct{}) { leader: newLeader(c.Ctx(), c.Watcher), kv: c.KV, // for permission checking + lg: lg, } wp.ranges = newWatchRanges(wp) ch := make(chan struct{}) @@ -99,6 +102,7 @@ func (wp *watchProxy) Watch(stream pb.Watch_WatchServer) (err error) { ctx: ctx, cancel: cancel, kv: wp.kv, + lg: wp.lg, } var lostLeaderC <-chan struct{} @@ -181,6 +185,7 @@ type watchProxyStream struct { // kv is used for permission checking kv clientv3.KV + lg *zap.Logger } func (wps *watchProxyStream) close() { @@ -262,8 +267,10 @@ func (wps *watchProxyStream) recvLoop() error { wps.watchers[w.id] = w wps.ranges.add(w) wps.mu.Unlock() + wps.lg.Debug("create watcher", zap.String("key", w.wr.key), zap.String("end", w.wr.end), zap.Int64("watcherId", wps.nextWatcherID)) case *pb.WatchRequest_CancelRequest: wps.delete(uv.CancelRequest.WatchId) + wps.lg.Debug("cancel watcher", zap.Int64("watcherId", uv.CancelRequest.WatchId)) default: panic("not implemented") } diff --git a/proxy/grpcproxy/watch_broadcast.go b/proxy/grpcproxy/watch_broadcast.go index d43112ec89d..fc62e6dd6cb 100644 --- a/proxy/grpcproxy/watch_broadcast.go +++ b/proxy/grpcproxy/watch_broadcast.go @@ -17,9 +17,12 @@ package grpcproxy import ( "context" "sync" + "time" "go.etcd.io/etcd/v3/clientv3" pb "go.etcd.io/etcd/v3/etcdserver/etcdserverpb" + + "go.uber.org/zap" ) // watchBroadcast broadcasts a server watcher to many client watchers. @@ -36,15 +39,17 @@ type watchBroadcast struct { receivers map[*watcher]struct{} // responses counts the number of responses responses int + lg *zap.Logger } -func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { +func newWatchBroadcast(lg *zap.Logger, wp *watchProxy, w *watcher, update func(*watchBroadcast)) *watchBroadcast { cctx, cancel := context.WithCancel(wp.ctx) wb := &watchBroadcast{ cancel: cancel, nextrev: w.nextrev, receivers: make(map[*watcher]struct{}), donec: make(chan struct{}), + lg: lg, } wb.add(w) go func() { @@ -61,6 +66,7 @@ func newWatchBroadcast(wp *watchProxy, w *watcher, update func(*watchBroadcast)) cctx = withClientAuthToken(cctx, w.wps.stream.Context()) wch := wp.cw.Watch(cctx, w.wr.key, opts...) + wp.lg.Debug("watch", zap.String("key", w.wr.key)) for wr := range wch { wb.bcast(wr) @@ -148,5 +154,13 @@ func (wb *watchBroadcast) stop() { } wb.cancel() - <-wb.donec + + select { + case <-wb.donec: + // watchProxyStream will hold watchRanges global mutex lock all the time if client failed to cancel etcd watchers. + // and it will cause the watch proxy to not work. + // please see pr https://github.com/etcd-io/etcd/pull/12030 to get more detail info. + case <-time.After(time.Second): + wb.lg.Error("failed to cancel etcd watcher") + } } diff --git a/proxy/grpcproxy/watch_broadcasts.go b/proxy/grpcproxy/watch_broadcasts.go index 8fe9e5f512e..dacd3007d1d 100644 --- a/proxy/grpcproxy/watch_broadcasts.go +++ b/proxy/grpcproxy/watch_broadcasts.go @@ -93,7 +93,7 @@ func (wbs *watchBroadcasts) add(w *watcher) { } } // no fit; create a bcast - wb := newWatchBroadcast(wbs.wp, w, wbs.update) + wb := newWatchBroadcast(wbs.wp.lg, wbs.wp, w, wbs.update) wbs.watchers[w] = wb wbs.bcasts[wb] = struct{}{} } diff --git a/proxy/grpcproxy/watcher.go b/proxy/grpcproxy/watcher.go index b994ec2f76c..879b8179e72 100644 --- a/proxy/grpcproxy/watcher.go +++ b/proxy/grpcproxy/watcher.go @@ -123,6 +123,7 @@ func (w *watcher) post(wr *pb.WatchResponse) bool { case w.wps.watchCh <- wr: case <-time.After(50 * time.Millisecond): w.wps.cancel() + w.wps.lg.Error("failed to put a watch response on the watcher's proxy stream channel,err is timeout") return false } return true