From 4183d2fdf4df12e14696a0b03ffe7c84ec875530 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Fri, 31 Mar 2023 13:39:47 +0800 Subject: [PATCH 1/3] fix watch keyspace revision Signed-off-by: Ryan Leung --- server/keyspace_service.go | 26 ++++++++++++++++++++------ 1 file changed, 20 insertions(+), 6 deletions(-) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 5255d725815..caf181ce1e3 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -22,9 +22,11 @@ import ( "github.com/gogo/protobuf/proto" "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/log" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/storage/endpoint" "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" ) // KeyspaceServer wraps GrpcServer to provide keyspace service. @@ -82,16 +84,24 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques ctx, cancel := context.WithCancel(s.Context()) defer cancel() - err := s.sendAllKeyspaceMeta(ctx, stream) + revision, err := s.sendAllKeyspaceMeta(ctx, stream) if err != nil { return err } - watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix()) + for { + watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix(), clientv3.WithRev(revision)) select { case <-ctx.Done(): return nil case res := <-watchChan: + if res.CompactRevision != 0 { + log.Warn("required revision has been compacted, use the compact revision", + zap.Int64("required-revision", revision), + zap.Int64("compact-revision", res.CompactRevision)) + revision = res.CompactRevision + break + } keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(res.Events)) for _, event := range res.Events { if event.Type != clientv3.EventTypePut { @@ -112,20 +122,24 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques } } -func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) error { +func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspacepb.Keyspace_WatchKeyspacesServer) (int64, error) { getResp, err := s.client.Get(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix()) if err != nil { - return err + return 0, err } metas := make([]*keyspacepb.KeyspaceMeta, getResp.Count) for i, kv := range getResp.Kvs { meta := &keyspacepb.KeyspaceMeta{} if err = proto.Unmarshal(kv.Value, meta); err != nil { - return err + return 0, err } metas[i] = meta } - return stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas}) + var revision int64 + if getResp.Header != nil { + revision = getResp.Header.GetRevision() + } + return revision, stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas}) } // UpdateKeyspaceState updates the state of keyspace specified in the request. From c46b84250740a21bc6d9ed2100bf1ba73cc13f04 Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 3 Apr 2023 11:48:09 +0800 Subject: [PATCH 2/3] address the comment Signed-off-by: Ryan Leung --- server/keyspace_service.go | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index caf181ce1e3..6190f0f847a 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/keyspacepb" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" + "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/keyspace" "github.com/tikv/pd/pkg/storage/endpoint" "go.etcd.io/etcd/clientv3" @@ -89,21 +90,27 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques return err } + watcher := clientv3.NewWatcher(s.client) + defer watcher.Close() + for { - watchChan := s.client.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix(), clientv3.WithRev(revision)) - select { - case <-ctx.Done(): - return nil - case res := <-watchChan: - if res.CompactRevision != 0 { + rch := watcher.Watch(ctx, path.Join(s.rootPath, endpoint.KeyspaceMetaPrefix()), clientv3.WithPrefix(), clientv3.WithRev(revision)) + for wresp := range rch { + if wresp.CompactRevision != 0 { log.Warn("required revision has been compacted, use the compact revision", zap.Int64("required-revision", revision), - zap.Int64("compact-revision", res.CompactRevision)) - revision = res.CompactRevision + zap.Int64("compact-revision", wresp.CompactRevision)) + revision = wresp.CompactRevision break } - keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(res.Events)) - for _, event := range res.Events { + if wresp.Canceled { + log.Error("watcher is canceled with", + zap.Int64("revision", revision), + errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) + return nil + } + keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(wresp.Events)) + for _, event := range wresp.Events { if event.Type != clientv3.EventTypePut { continue } @@ -119,6 +126,12 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques } } } + select { + case <-ctx.Done(): + // server closed, return + return nil + default: + } } } @@ -137,7 +150,8 @@ func (s *KeyspaceServer) sendAllKeyspaceMeta(ctx context.Context, stream keyspac } var revision int64 if getResp.Header != nil { - revision = getResp.Header.GetRevision() + // start from the next revision + revision = getResp.Header.GetRevision() + 1 } return revision, stream.Send(&keyspacepb.WatchKeyspacesResponse{Header: s.header(), Keyspaces: metas}) } From 2162c82c8e0ec5c155ca2666f7b5f929f07be24a Mon Sep 17 00:00:00 2001 From: Ryan Leung Date: Mon, 3 Apr 2023 14:27:18 +0800 Subject: [PATCH 3/3] address the comment Signed-off-by: Ryan Leung --- server/keyspace_service.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/keyspace_service.go b/server/keyspace_service.go index 6190f0f847a..64e646119ed 100644 --- a/server/keyspace_service.go +++ b/server/keyspace_service.go @@ -107,7 +107,7 @@ func (s *KeyspaceServer) WatchKeyspaces(request *keyspacepb.WatchKeyspacesReques log.Error("watcher is canceled with", zap.Int64("revision", revision), errs.ZapError(errs.ErrEtcdWatcherCancel, wresp.Err())) - return nil + return wresp.Err() } keyspaces := make([]*keyspacepb.KeyspaceMeta, 0, len(wresp.Events)) for _, event := range wresp.Events {