Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tso: implement deletedGroupCleaner to clean up the legacy TSO key #6745

Merged
merged 7 commits into from
Jul 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
globalTSOAllocatorEtcdPrefix = "gta"
// TimestampKey is the key of timestamp oracle used for the suffix.
TimestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
Expand Down Expand Up @@ -261,3 +263,20 @@ func buildPath(withSuffix bool, str ...string) string {
}
return sb.String()
}

// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func GetKeyspaceGroupTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func GetTimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
}
10 changes: 9 additions & 1 deletion pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
type TSOStorage interface {
LoadTimestamp(prefix string) (time.Time, error)
SaveTimestamp(key string, ts time.Time) error
DeleteTimestamp(key string) error
}

var _ TSOStorage = (*StorageEndpoint)(nil)
Expand All @@ -51,7 +52,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
maxTSWindow := typeutil.ZeroTime
for i, key := range keys {
key := strings.TrimSpace(key)
if !strings.HasSuffix(key, timestampKey) {
if !strings.HasSuffix(key, TimestampKey) {
continue
}
tsWindow, err := typeutil.ParseTimestamp([]byte(values[i]))
Expand Down Expand Up @@ -89,3 +90,10 @@ func (se *StorageEndpoint) SaveTimestamp(key string, ts time.Time) error {
return txn.Save(key, string(data))
})
}

// DeleteTimestamp deletes the timestamp from the storage.
func (se *StorageEndpoint) DeleteTimestamp(key string) error {
return se.RunInTxn(context.Background(), func(txn kv.Txn) error {
return txn.Remove(key)
})
}
17 changes: 7 additions & 10 deletions pkg/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
Expand All @@ -42,9 +43,8 @@ func TestSaveLoadTimestamp(t *testing.T) {
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

key := "timestamp"
expectedTS := time.Now().Round(0)
err = storage.SaveTimestamp(key, expectedTS)
err = storage.SaveTimestamp(endpoint.TimestampKey, expectedTS)
re.NoError(err)
ts, err := storage.LoadTimestamp("")
re.NoError(err)
Expand All @@ -68,16 +68,15 @@ func TestGlobalLocalTimestamp(t *testing.T) {
storage := NewStorageWithEtcdBackend(client, rootPath)

ltaKey := "lta"
timestampKey := "timestamp"
dc1LocationKey, dc2LocationKey := "dc1", "dc2"
localTS1 := time.Now().Round(0)
l1 := path.Join(ltaKey, dc1LocationKey, timestampKey)
l2 := path.Join(ltaKey, dc2LocationKey, timestampKey)
l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey)
l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey)

err = storage.SaveTimestamp(l1, localTS1)
re.NoError(err)
globalTS := time.Now().Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS)
re.NoError(err)
localTS2 := time.Now().Round(0)
err = storage.SaveTimestamp(l2, localTS2)
Expand Down Expand Up @@ -108,14 +107,12 @@ func TestTimestampTxn(t *testing.T) {
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

timestampKey := "timestamp"

globalTS1 := time.Now().Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS1)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS1)
re.NoError(err)

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS2)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2)
re.Error(err)

ts, err := storage.LoadTimestamp("")
Expand Down
26 changes: 6 additions & 20 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ import (

const (
// GlobalDCLocation is the Global TSO Allocator's DC location label.
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
globalTSOAllocatorEtcdPrefix = "gta"
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -1406,16 +1405,3 @@ func (am *AllocatorManager) GetLeaderAddr() string {
}
return leaderAddrs[0]
}

// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func (am *AllocatorManager) getKeyspaceGroupTSPath(groupID uint32) string {
tsPath := ""
if am.kgID != mcsutils.DefaultKeyspaceGroupID {
tsPath = path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}
return tsPath
}
3 changes: 2 additions & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: am.getKeyspaceGroupTSPath(am.kgID),
tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
103 changes: 88 additions & 15 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,14 @@ type state struct {
keyspaceLookupTable map[uint32]uint32
// splittingGroups is the cache of splitting keyspace group related information.
splittingGroups map[uint32]struct{}
// deletedGroups is the cache of deleted keyspace group related information.
deletedGroups map[uint32]struct{}
}

func (s *state) initialize() {
s.keyspaceLookupTable = make(map[uint32]uint32)
s.splittingGroups = make(map[uint32]struct{})
s.deletedGroups = make(map[uint32]struct{})
}

func (s *state) deInitialize() {
Expand Down Expand Up @@ -116,6 +119,28 @@ func (s *state) getKeyspaceGroupMeta(
return s.ams[groupID], s.kgs[groupID]
}

// getSplittingGroups returns the IDs of the splitting keyspace groups.
func (s *state) getSplittingGroups() []uint32 {
s.RLock()
defer s.RUnlock()
groups := make([]uint32, 0, len(s.splittingGroups))
for groupID := range s.splittingGroups {
groups = append(groups, groupID)
}
return groups
}

// getDeletedGroups returns the IDs of the deleted keyspace groups.
func (s *state) getDeletedGroups() []uint32 {
s.RLock()
defer s.RUnlock()
groups := make([]uint32, 0, len(s.deletedGroups))
for groupID := range s.deletedGroups {
groups = append(groups, groupID)
}
return groups
}

func (s *state) checkTSOSplit(
targetGroupID uint32,
) (splitTargetAM, splitSourceAM *AllocatorManager, err error) {
Expand Down Expand Up @@ -402,9 +427,10 @@ func (kgm *KeyspaceGroupManager) Initialize() error {
return errs.ErrLoadKeyspaceGroupsTerminated.Wrap(err)
}

kgm.wg.Add(2)
kgm.wg.Add(3)
go kgm.primaryPriorityCheckLoop()
go kgm.groupSplitPatroller()
go kgm.deletedGroupCleaner()

return nil
}
Expand Down Expand Up @@ -915,10 +941,12 @@ func (kgm *KeyspaceGroupManager) deleteKeyspaceGroup(groupID uint32) {
am.close()
kgm.ams[groupID] = nil
}

kgm.deletedGroups[groupID] = struct{}{}
}

// exitElectionMembership exits the election membership of the given keyspace group by
// deinitializing the allocator manager, but still keeps the keyspace group info.
// de-initializing the allocator manager, but still keeps the keyspace group info.
func (kgm *KeyspaceGroupManager) exitElectionMembership(group *endpoint.KeyspaceGroup) {
log.Info("resign election membership", zap.Uint32("keyspace-group-id", group.ID))

Expand Down Expand Up @@ -1272,7 +1300,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// update the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(am.getKeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id))
if err != nil || ts == typeutil.ZeroTime {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1336,6 +1364,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// groupSplitPatroller is used to patrol the groups that are in the on-going
// split state and to check if we could speed up the split process.
func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
defer logutil.LogPanic()
defer kgm.wg.Done()
patrolInterval := groupPatrolInterval
failpoint.Inject("fastGroupSplitPatroller", func() {
Expand All @@ -1348,21 +1377,11 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
for {
select {
case <-kgm.ctx.Done():
log.Info("group split patroller is exiting")
log.Info("group split patroller exited")
return
case <-ticker.C:
}
kgm.RLock()
if len(kgm.splittingGroups) == 0 {
kgm.RUnlock()
continue
}
var splittingGroups []uint32
for id := range kgm.splittingGroups {
splittingGroups = append(splittingGroups, id)
}
kgm.RUnlock()
for _, groupID := range splittingGroups {
for _, groupID := range kgm.getSplittingGroups() {
am, group := kgm.getKeyspaceGroupMeta(groupID)
if !am.IsLeader() {
continue
Expand All @@ -1387,3 +1406,57 @@ func (kgm *KeyspaceGroupManager) groupSplitPatroller() {
}
}
}

// deletedGroupCleaner is used to clean the deleted keyspace groups related data.
// For example, the TSO keys of the merged keyspace groups remain in the storage.
func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
defer logutil.LogPanic()
defer kgm.wg.Done()
patrolInterval := groupPatrolInterval
failpoint.Inject("fastDeletedGroupCleaner", func() {
patrolInterval = 200 * time.Millisecond
})
ticker := time.NewTicker(patrolInterval)
defer ticker.Stop()
log.Info("deleted group cleaner is started",
zap.Duration("patrol-interval", patrolInterval))
for {
select {
case <-kgm.ctx.Done():
log.Info("deleted group cleaner exited")
return
case <-ticker.C:
}
for _, groupID := range kgm.getDeletedGroups() {
// Do not clean the default keyspace group data.
if groupID == mcsutils.DefaultKeyspaceGroupID {
continue
}
// Make sure the allocator and group meta are not in use anymore.
am, _ := kgm.getKeyspaceGroupMeta(groupID)
if am != nil {
log.Info("the keyspace group tso allocator has not been closed yet",
zap.Uint32("keyspace-group-id", groupID))
continue
}
log.Info("delete the keyspace group tso key",
zap.Uint32("keyspace-group-id", groupID))
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator clean up.
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.GetTimestampPath(
endpoint.GetKeyspaceGroupTSPath(groupID),
),
)
if err != nil {
log.Warn("failed to delete the keyspace group tso key",
zap.Uint32("keyspace-group-id", groupID),
zap.Error(err))
continue
}
kgm.Lock()
delete(kgm.deletedGroups, groupID)
kgm.Unlock()
}
}
}
Loading