diff --git a/pkg/storage/endpoint/key_path.go b/pkg/storage/endpoint/key_path.go index 2eca0e6bf17..86022b273f7 100644 --- a/pkg/storage/endpoint/key_path.go +++ b/pkg/storage/endpoint/key_path.go @@ -53,8 +53,10 @@ const ( resourceGroupStatesPath = "states" controllerConfigPath = "controller" // tso storage endpoint has prefix `tso` - tsoServiceKey = utils.TSOServiceName - timestampKey = "timestamp" + tsoServiceKey = utils.TSOServiceName + globalTSOAllocatorEtcdPrefix = "gta" + // TimestampKey is the key of timestamp oracle used for the suffix. + TimestampKey = "timestamp" tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey keyspaceGroupMembershipKey = "membership" @@ -261,3 +263,20 @@ func buildPath(withSuffix bool, str ...string) string { } return sb.String() } + +// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is: +// 1. for the default keyspace group: +// "" in /pd/{cluster_id}/timestamp +// 2. for the non-default keyspace groups: +// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp +func GetKeyspaceGroupTSPath(groupID uint32) string { + if groupID == utils.DefaultKeyspaceGroupID { + return "" + } + return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) +} + +// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix. +func GetTimestampPath(tsPath string) string { + return path.Join(tsPath, TimestampKey) +} diff --git a/pkg/storage/endpoint/tso.go b/pkg/storage/endpoint/tso.go index e67f5d33f4f..f0aa9d8cde0 100644 --- a/pkg/storage/endpoint/tso.go +++ b/pkg/storage/endpoint/tso.go @@ -31,6 +31,7 @@ import ( type TSOStorage interface { LoadTimestamp(prefix string) (time.Time, error) SaveTimestamp(key string, ts time.Time) error + DeleteTimestamp(key string) error } var _ TSOStorage = (*StorageEndpoint)(nil) @@ -51,7 +52,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) { maxTSWindow := typeutil.ZeroTime for i, key := range keys { key := strings.TrimSpace(key) - if !strings.HasSuffix(key, timestampKey) { + if !strings.HasSuffix(key, TimestampKey) { continue } tsWindow, err := typeutil.ParseTimestamp([]byte(values[i])) @@ -89,3 +90,10 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error { return txn.Save(key, string(data)) }) } + +// DeleteTimestamp deletes the timestamp from the storage. +func (se *StorageEndpoint) DeleteTimestamp(key string) error { + return se.RunInTxn(context.Background(), func(txn kv.Txn) error { + return txn.Remove(key) + }) +} diff --git a/pkg/storage/storage_tso_test.go b/pkg/storage/storage_tso_test.go index b8f31cc377c..1dbba289512 100644 --- a/pkg/storage/storage_tso_test.go +++ b/pkg/storage/storage_tso_test.go @@ -21,6 +21,7 @@ import ( "time" "github.com/stretchr/testify/require" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/embed" @@ -42,9 +43,8 @@ func TestSaveLoadTimestamp(t *testing.T) { rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) storage := NewStorageWithEtcdBackend(client, rootPath) - key := "timestamp" expectedTS := time.Now().Round(0) - err = storage.SaveTimestamp(key, expectedTS) + err = storage.SaveTimestamp(endpoint.TimestampKey, expectedTS) re.NoError(err) ts, err := storage.LoadTimestamp("") re.NoError(err) @@ -68,16 +68,15 @@ func TestGlobalLocalTimestamp(t *testing.T) { storage := NewStorageWithEtcdBackend(client, rootPath) ltaKey := "lta" - timestampKey := "timestamp" dc1LocationKey, dc2LocationKey := "dc1", "dc2" localTS1 := time.Now().Round(0) - l1 := path.Join(ltaKey, dc1LocationKey, timestampKey) - l2 := path.Join(ltaKey, dc2LocationKey, timestampKey) + l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey) + l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey) err = storage.SaveTimestamp(l1, localTS1) re.NoError(err) globalTS := time.Now().Round(0) - err = storage.SaveTimestamp(timestampKey, globalTS) + err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS) re.NoError(err) localTS2 := time.Now().Round(0) err = storage.SaveTimestamp(l2, localTS2) @@ -108,14 +107,12 @@ func TestTimestampTxn(t *testing.T) { rootPath := path.Join("/pd", strconv.FormatUint(100, 10)) storage := NewStorageWithEtcdBackend(client, rootPath) - timestampKey := "timestamp" - globalTS1 := time.Now().Round(0) - err = storage.SaveTimestamp(timestampKey, globalTS1) + err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS1) re.NoError(err) globalTS2 := globalTS1.Add(-time.Millisecond).Round(0) - err = storage.SaveTimestamp(timestampKey, globalTS2) + err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2) re.Error(err) ts, err := storage.LoadTimestamp("") diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index 6e67ccf6951..d7a8a9eb81d 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -46,13 +46,12 @@ import ( const ( // GlobalDCLocation is the Global TSO Allocator's DC location label. - GlobalDCLocation = "global" - checkStep = time.Minute - patrolStep = time.Second - defaultAllocatorLeaderLease = 3 - globalTSOAllocatorEtcdPrefix = "gta" - localTSOAllocatorEtcdPrefix = "lta" - localTSOSuffixEtcdPrefix = "lts" + GlobalDCLocation = "global" + checkStep = time.Minute + patrolStep = time.Second + defaultAllocatorLeaderLease = 3 + localTSOAllocatorEtcdPrefix = "lta" + localTSOSuffixEtcdPrefix = "lts" ) var ( @@ -1406,16 +1405,3 @@ func (am *AllocatorManager) GetLeaderAddr() string { } return leaderAddrs[0] } - -// Construct the timestampOracle path prefix, which is: -// 1. for the default keyspace group: -// "" in /pd/{cluster_id}/timestamp -// 2. for the non-default keyspace groups: -// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp -func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string { - tsPath := "" - if am.kgID != mcsutils.DefaultKeyspaceGroupID { - tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix) - } - return tsPath -} diff --git a/pkg/tso/global_allocator.go b/pkg/tso/global_allocator.go index 2c715d0cc7c..07eec5490c8 100644 --- a/pkg/tso/global_allocator.go +++ b/pkg/tso/global_allocator.go @@ -28,6 +28,7 @@ import ( "github.com/tikv/pd/pkg/errs" mcsutils "github.com/tikv/pd/pkg/mcs/utils" "github.com/tikv/pd/pkg/slice" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -96,7 +97,7 @@ func NewGlobalTSOAllocator( member: am.member, timestampOracle: ×tampOracle{ client: am.member.GetLeadership().GetClient(), - tsPath: am.getKeyspaceGroupTSPath(am.kgID), + tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID), storage: am.storage, saveInterval: am.saveInterval, updatePhysicalInterval: am.updatePhysicalInterval, diff --git a/pkg/tso/keyspace_group_manager.go b/pkg/tso/keyspace_group_manager.go index 529e9bcdd37..ec47089405c 100644 --- a/pkg/tso/keyspace_group_manager.go +++ b/pkg/tso/keyspace_group_manager.go @@ -77,11 +77,14 @@ type state struct { keyspaceLookupTable map[uint32]uint32 // splittingGroups is the cache of splitting keyspace group related information. splittingGroups map[uint32]struct{} + // deletedGroups is the cache of deleted keyspace group related information. + deletedGroups map[uint32]struct{} } func (s *state) initialize() { s.keyspaceLookupTable = make(map[uint32]uint32) s.splittingGroups = make(map[uint32]struct{}) + s.deletedGroups = make(map[uint32]struct{}) } func (s *state) deInitialize() { @@ -116,6 +119,28 @@ func (s *state) getKeyspaceGroupMeta( return s.ams[groupID], s.kgs[groupID] } +// getSplittingGroups returns the IDs of the splitting keyspace groups. +func (s *state) getSplittingGroups() []uint32 { + s.RLock() + defer s.RUnlock() + groups := make([]uint32, 0, len(s.splittingGroups)) + for groupID := range s.splittingGroups { + groups = append(groups, groupID) + } + return groups +} + +// getDeletedGroups returns the IDs of the deleted keyspace groups. +func (s *state) getDeletedGroups() []uint32 { + s.RLock() + defer s.RUnlock() + groups := make([]uint32, 0, len(s.deletedGroups)) + for groupID := range s.deletedGroups { + groups = append(groups, groupID) + } + return groups +} + func (s *state) checkTSOSplit( targetGroupID uint32, ) (splitTargetAM, splitSourceAM *AllocatorManager, err error) { @@ -402,9 +427,10 @@ func (kgm *KeyspaceGroupManager) Initialize() error { return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err) } - kgm.wg.Add(2) + kgm.wg.Add(3) go kgm.primaryPriorityCheckLoop() go kgm.groupSplitPatroller() + go kgm.deletedGroupCleaner() return nil } @@ -915,10 +941,12 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) { am.close() kgm.ams[groupID] = nil } + + kgm.deletedGroups[groupID] = struct{}{} } // exitElectionMembership exits the election membership of the given keyspace group by -// deinitializing the allocator manager, but still keeps the keyspace group info. +// de-initializing the allocator manager, but still keeps the keyspace group info. func (kgm *KeyspaceGroupManager) exitElectionMembership(group *endpoint.KeyspaceGroup) { log.Info("resign election membership", zap.Uint32("keyspace-group-id", group.ID)) @@ -1272,7 +1300,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget // update the newly merged TSO to make sure it is greater than the original ones. var mergedTS time.Time for _, id := range mergeList { - ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id)) + ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id)) if err != nil || ts == typeutil.ZeroTime { log.Error("failed to load the keyspace group TSO", zap.String("member", kgm.tsoServiceID.ServiceAddr), @@ -1336,6 +1364,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget // groupSplitPatroller is used to patrol the groups that are in the on-going // split state and to check if we could speed up the split process. func (kgm *KeyspaceGroupManager) groupSplitPatroller() { + defer logutil.LogPanic() defer kgm.wg.Done() patrolInterval := groupPatrolInterval failpoint.Inject("fastGroupSplitPatroller", func() { @@ -1348,21 +1377,11 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() { for { select { case <-kgm.ctx.Done(): - log.Info("group split patroller is exiting") + log.Info("group split patroller exited") return case <-ticker.C: } - kgm.RLock() - if len(kgm.splittingGroups) == 0 { - kgm.RUnlock() - continue - } - var splittingGroups []uint32 - for id := range kgm.splittingGroups { - splittingGroups = append(splittingGroups, id) - } - kgm.RUnlock() - for _, groupID := range splittingGroups { + for _, groupID := range kgm.getSplittingGroups() { am, group := kgm.getKeyspaceGroupMeta(groupID) if !am.IsLeader() { continue @@ -1387,3 +1406,57 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() { } } } + +// deletedGroupCleaner is used to clean the deleted keyspace groups related data. +// For example, the TSO keys of the merged keyspace groups remain in the storage. +func (kgm *KeyspaceGroupManager) deletedGroupCleaner() { + defer logutil.LogPanic() + defer kgm.wg.Done() + patrolInterval := groupPatrolInterval + failpoint.Inject("fastDeletedGroupCleaner", func() { + patrolInterval = 200 * time.Millisecond + }) + ticker := time.NewTicker(patrolInterval) + defer ticker.Stop() + log.Info("deleted group cleaner is started", + zap.Duration("patrol-interval", patrolInterval)) + for { + select { + case <-kgm.ctx.Done(): + log.Info("deleted group cleaner exited") + return + case <-ticker.C: + } + for _, groupID := range kgm.getDeletedGroups() { + // Do not clean the default keyspace group data. + if groupID == mcsutils.DefaultKeyspaceGroupID { + continue + } + // Make sure the allocator and group meta are not in use anymore. + am, _ := kgm.getKeyspaceGroupMeta(groupID) + if am != nil { + log.Info("the keyspace group tso allocator has not been closed yet", + zap.Uint32("keyspace-group-id", groupID)) + continue + } + log.Info("delete the keyspace group tso key", + zap.Uint32("keyspace-group-id", groupID)) + // Clean up the remaining TSO keys. + // TODO: support the Local TSO Allocator clean up. + err := kgm.tsoSvcStorage.DeleteTimestamp( + endpoint.GetTimestampPath( + endpoint.GetKeyspaceGroupTSPath(groupID), + ), + ) + if err != nil { + log.Warn("failed to delete the keyspace group tso key", + zap.Uint32("keyspace-group-id", groupID), + zap.Error(err)) + continue + } + kgm.Lock() + delete(kgm.deletedGroups, groupID) + kgm.Unlock() + } + } +} diff --git a/pkg/tso/keyspace_group_manager_test.go b/pkg/tso/keyspace_group_manager_test.go index 2e03418bae7..09baac9a592 100644 --- a/pkg/tso/keyspace_group_manager_test.go +++ b/pkg/tso/keyspace_group_manager_test.go @@ -38,6 +38,7 @@ import ( "github.com/tikv/pd/pkg/utils/tempurl" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/utils/typeutil" "go.etcd.io/etcd/clientv3" "go.etcd.io/etcd/mvcc/mvccpb" "go.uber.org/goleak" @@ -91,6 +92,58 @@ func (suite *keyspaceGroupManagerTestSuite) createConfig() *TestServiceConfig { } } +func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() { + re := suite.Require() + re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/tso/fastDeletedGroupCleaner", "return(true)")) + + // Start with the empty keyspace group assignment. + mgr := suite.newUniqueKeyspaceGroupManager(0) + re.NotNil(mgr) + defer mgr.Close() + err := mgr.Initialize() + re.NoError(err) + + rootPath := mgr.legacySvcRootPath + svcAddr := mgr.tsoServiceID.ServiceAddr + + // Add keyspace group 1. + suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})}) + // Check if the TSO key is created. + testutil.Eventually(re, func() bool { + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1)) + re.NoError(err) + return ts != typeutil.ZeroTime + }) + // Delete keyspace group 1. + suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)}) + // Check if the TSO key is deleted. + testutil.Eventually(re, func() bool { + ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1)) + re.NoError(err) + return ts == typeutil.ZeroTime + }) + // Check if the keyspace group is deleted completely. + mgr.RLock() + re.Nil(mgr.ams[1]) + re.Nil(mgr.kgs[1]) + re.NotContains(mgr.deletedGroups, 1) + mgr.RUnlock() + // Try to delete the default keyspace group. + suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(mcsutils.DefaultKeyspaceGroupID)}) + // Default keyspace group should NOT be deleted. + mgr.RLock() + re.NotNil(mgr.ams[mcsutils.DefaultKeyspaceGroupID]) + re.NotNil(mgr.kgs[mcsutils.DefaultKeyspaceGroupID]) + re.NotContains(mgr.deletedGroups, mcsutils.DefaultKeyspaceGroupID) + mgr.RUnlock() + // Default keyspace group TSO key should NOT be deleted. + ts, err := mgr.legacySvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(mcsutils.DefaultKeyspaceGroupID)) + re.NoError(err) + re.NotEmpty(ts) + + re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/tso/fastDeletedGroupCleaner")) +} + // TestNewKeyspaceGroupManager tests the initialization of KeyspaceGroupManager. // It should initialize the allocator manager with the desired configurations and parameters. func (suite *keyspaceGroupManagerTestSuite) TestNewKeyspaceGroupManager() { diff --git a/pkg/tso/tso.go b/pkg/tso/tso.go index 54f0cb927be..1ce039a762c 100644 --- a/pkg/tso/tso.go +++ b/pkg/tso/tso.go @@ -16,7 +16,6 @@ package tso import ( "fmt" - "path" "sync/atomic" "time" @@ -34,7 +33,6 @@ import ( ) const ( - timestampKey = "timestamp" // UpdateTimestampGuard is the min timestamp interval. UpdateTimestampGuard = time.Millisecond // maxLogical is the max upper limit for logical time. @@ -142,7 +140,7 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int // GetTimestampPath returns the timestamp path in etcd. func (t *timestampOracle) GetTimestampPath() string { - return path.Join(t.tsPath, timestampKey) + return endpoint.GetTimestampPath(t.tsPath) } // SyncTimestamp is used to synchronize the timestamp. diff --git a/tests/integrations/mcs/tso/server_test.go b/tests/integrations/mcs/tso/server_test.go index 94e76d3ea03..6de703741ad 100644 --- a/tests/integrations/mcs/tso/server_test.go +++ b/tests/integrations/mcs/tso/server_test.go @@ -204,7 +204,7 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int var count int for _, kv := range resp.Kvs { key := strings.TrimSpace(string(kv.Key)) - if !strings.HasSuffix(key, "timestamp") { + if !strings.HasSuffix(key, endpoint.TimestampKey) { continue } count++ diff --git a/tools/pd-backup/pdbackup/backup.go b/tools/pd-backup/pdbackup/backup.go index a645138b68b..eeb5ee3cf30 100644 --- a/tools/pd-backup/pdbackup/backup.go +++ b/tools/pd-backup/pdbackup/backup.go @@ -25,6 +25,7 @@ import ( "path" "strconv" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/typeutil" "github.com/tikv/pd/server/config" @@ -74,8 +75,7 @@ func GetBackupInfo(client *clientv3.Client, pdAddr string) (*BackupInfo, error) backInfo.AllocIDMax = allocIDMax - timestampPath := path.Join(rootPath, "timestamp") - resp, err = etcdutil.EtcdKVGet(client, timestampPath) + resp, err = etcdutil.EtcdKVGet(client, endpoint.GetTimestampPath(rootPath)) if err != nil { return nil, err } diff --git a/tools/pd-backup/pdbackup/backup_test.go b/tools/pd-backup/pdbackup/backup_test.go index c747ace21de..8e3ca1eaaac 100644 --- a/tools/pd-backup/pdbackup/backup_test.go +++ b/tools/pd-backup/pdbackup/backup_test.go @@ -16,6 +16,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/tikv/pd/pkg/storage/endpoint" "github.com/tikv/pd/pkg/utils/etcdutil" "github.com/tikv/pd/pkg/utils/testutil" "github.com/tikv/pd/pkg/utils/typeutil" @@ -133,10 +134,9 @@ func (s *backupTestSuite) BeforeTest(suiteName, testName string) { var ( rootPath = path.Join(pdRootPath, strconv.FormatUint(clusterID, 10)) - timestampPath = path.Join(rootPath, "timestamp") allocTimestampMaxBytes = typeutil.Uint64ToBytes(allocTimestampMax) ) - _, err = s.etcdClient.Put(ctx, timestampPath, string(allocTimestampMaxBytes)) + _, err = s.etcdClient.Put(ctx, endpoint.GetTimestampPath(rootPath), string(allocTimestampMaxBytes)) s.NoError(err) var (