From 750dd6b4612c4f8cb6dde161f5ecd60c4993d336 Mon Sep 17 00:00:00 2001 From: Bin Shi <39923490+binshi-bing@users.noreply.github.com> Date: Wed, 19 Apr 2023 19:55:20 -0700 Subject: [PATCH] Fixed bugs in tso service registry watching loop. (#6346) ref tikv/pd#6343 Fixed the following two bugs: 1. When re-watch a range, to continue from what left by the last watch, the revision is wresp.Header.Revision + 1 instead of wresp.Header.Revision, where wresp.Header.Revision is the revision indicated in the response of the last watch. Because of this bug, it was processing the same event endless as you can see from the log below. 2. In tso service watch loop in /Users/binshi/code/pingcap/my-pd/pkg/keyspace/tso_keyspace_group.go, If this is delete event, the json.Unmarshal(event.Kv.Value, s) will fail with the error "unexpected end of JSON input", so there is no way to get s.serviceAddr from the result of json.Unmarshal. Signed-off-by: Bin Shi --- pkg/keyspace/tso_keyspace_group.go | 25 ++++++++++++++++++++----- pkg/tso/keyspace_group_manager.go | 3 ++- server/server.go | 2 +- 3 files changed, 23 insertions(+), 7 deletions(-) diff --git a/pkg/keyspace/tso_keyspace_group.go b/pkg/keyspace/tso_keyspace_group.go index f18b80ed305..bb0f413fb2d 100644 --- a/pkg/keyspace/tso_keyspace_group.go +++ b/pkg/keyspace/tso_keyspace_group.go @@ -76,6 +76,8 @@ type GroupManager struct { // TODO: add user kind with different balancer // when we ensure where the correspondence between tso node and user kind will be found nodesBalancer balancer.Balancer[string] + // serviceRegistryMap stores the mapping from the service registry key to the service address. + serviceRegistryMap map[string]string } // NewKeyspaceGroupManager creates a Manager of keyspace group related data. @@ -131,6 +133,7 @@ func (m *GroupManager) Bootstrap() error { // If the etcd client is not nil, start the watch loop. if m.client != nil { m.nodesBalancer = balancer.GenByPolicy[string](m.policy) + m.serviceRegistryMap = make(map[string]string) m.wg.Add(1) go m.startWatchLoop() } @@ -169,6 +172,7 @@ func (m *GroupManager) startWatchLoop() { continue } m.nodesBalancer.Put(s.ServiceAddr) + m.serviceRegistryMap[string(item.Key)] = s.ServiceAddr } break } @@ -219,17 +223,28 @@ func (m *GroupManager) watchServiceAddrs(ctx context.Context, revision int64) (i return revision, wresp.Err() } for _, event := range wresp.Events { - s := &discovery.ServiceRegistryEntry{} - if err := json.Unmarshal(event.Kv.Value, s); err != nil { - log.Warn("failed to unmarshal service registry entry", zap.Error(err)) - } switch event.Type { case clientv3.EventTypePut: + s := &discovery.ServiceRegistryEntry{} + if err := json.Unmarshal(event.Kv.Value, s); err != nil { + log.Warn("failed to unmarshal service registry entry", + zap.String("event-kv-key", string(event.Kv.Key)), zap.Error(err)) + break + } m.nodesBalancer.Put(s.ServiceAddr) + m.serviceRegistryMap[string(event.Kv.Key)] = s.ServiceAddr case clientv3.EventTypeDelete: - m.nodesBalancer.Delete(s.ServiceAddr) + key := string(event.Kv.Key) + if serviceAddr, ok := m.serviceRegistryMap[key]; ok { + delete(m.serviceRegistryMap, key) + m.nodesBalancer.Delete(serviceAddr) + } else { + log.Warn("can't retrieve service addr from service registry map", + zap.String("event-kv-key", key)) + } } } + revision = wresp.Header.Revision + 1 } } } diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 8aba66c5412..febbbdabe41 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -485,6 +485,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( log.Warn("failed to unmarshal keyspace group", zap.Uint32("keyspace-group-id", groupID), zap.Error(errs.ErrJSONUnmarshal.Wrap(err).FastGenWithCause())) + break } kgm.updateKeyspaceGroup(group) case clientv3.EventTypeDelete: @@ -499,7 +500,7 @@ func (kgm *KeyspaceGroupManager) watchKeyspaceGroupsMetaChange(revision int64) ( } } } - revision = wresp.Header.Revision + revision = wresp.Header.Revision + 1 } select { diff --git a/server/server.go b/server/server.go index 36a3cefd0af..808e01bb659 100644 --- a/server/server.go +++ b/server/server.go @@ -1886,7 +1886,7 @@ func (s *Server) watchServicePrimaryAddr(ctx context.Context, serviceName string s.servicePrimaryMap.Delete(serviceName) } } - revision = wresp.Header.Revision + revision = wresp.Header.Revision + 1 } } }