Skip to content

Commit

Permalink
Extract the timestamp key path constructor
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Jul 4, 2023
1 parent ba75b54 commit 8d4e3d3
Show file tree
Hide file tree
Showing 11 changed files with 51 additions and 45 deletions.
23 changes: 21 additions & 2 deletions pkg/storage/endpoint/key_path.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ const (
resourceGroupStatesPath = "states"
controllerConfigPath = "controller"
// tso storage endpoint has prefix `tso`
tsoServiceKey = utils.TSOServiceName
timestampKey = "timestamp"
tsoServiceKey = utils.TSOServiceName
globalTSOAllocatorEtcdPrefix = "gta"
// TimestampKey is the key of timestamp oracle used for the suffix.
TimestampKey = "timestamp"

tsoKeyspaceGroupPrefix = tsoServiceKey + "/" + utils.KeyspaceGroupsKey
keyspaceGroupMembershipKey = "membership"
Expand Down Expand Up @@ -261,3 +263,20 @@ func buildPath(withSuffix bool, str ...string) string {
}
return sb.String()
}

// GetKeyspaceGroupTSPath constructs the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func GetKeyspaceGroupTSPath(groupID uint32) string {
if groupID == utils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}

// GetTimestampPath returns the timestamp path for the given timestamp oracle path prefix.
func GetTimestampPath(tsPath string) string {
return path.Join(tsPath, TimestampKey)
}
2 changes: 1 addition & 1 deletion pkg/storage/endpoint/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (se *StorageEndpoint) LoadTimestamp(prefix string) (time.Time, error) {
maxTSWindow := typeutil.ZeroTime
for i, key := range keys {
key := strings.TrimSpace(key)
if !strings.HasSuffix(key, timestampKey) {
if !strings.HasSuffix(key, TimestampKey) {
continue
}
tsWindow, err := typeutil.ParseTimestamp([]byte(values[i]))
Expand Down
17 changes: 7 additions & 10 deletions pkg/storage/storage_tso_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
Expand All @@ -42,9 +43,8 @@ func TestSaveLoadTimestamp(t *testing.T) {
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

key := "timestamp"
expectedTS := time.Now().Round(0)
err = storage.SaveTimestamp(key, expectedTS)
err = storage.SaveTimestamp(endpoint.TimestampKey, expectedTS)
re.NoError(err)
ts, err := storage.LoadTimestamp("")
re.NoError(err)
Expand All @@ -68,16 +68,15 @@ func TestGlobalLocalTimestamp(t *testing.T) {
storage := NewStorageWithEtcdBackend(client, rootPath)

ltaKey := "lta"
timestampKey := "timestamp"
dc1LocationKey, dc2LocationKey := "dc1", "dc2"
localTS1 := time.Now().Round(0)
l1 := path.Join(ltaKey, dc1LocationKey, timestampKey)
l2 := path.Join(ltaKey, dc2LocationKey, timestampKey)
l1 := path.Join(ltaKey, dc1LocationKey, endpoint.TimestampKey)
l2 := path.Join(ltaKey, dc2LocationKey, endpoint.TimestampKey)

err = storage.SaveTimestamp(l1, localTS1)
re.NoError(err)
globalTS := time.Now().Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS)
re.NoError(err)
localTS2 := time.Now().Round(0)
err = storage.SaveTimestamp(l2, localTS2)
Expand Down Expand Up @@ -108,14 +107,12 @@ func TestTimestampTxn(t *testing.T) {
rootPath := path.Join("/pd", strconv.FormatUint(100, 10))
storage := NewStorageWithEtcdBackend(client, rootPath)

timestampKey := "timestamp"

globalTS1 := time.Now().Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS1)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS1)
re.NoError(err)

globalTS2 := globalTS1.Add(-time.Millisecond).Round(0)
err = storage.SaveTimestamp(timestampKey, globalTS2)
err = storage.SaveTimestamp(endpoint.TimestampKey, globalTS2)
re.Error(err)

ts, err := storage.LoadTimestamp("")
Expand Down
25 changes: 6 additions & 19 deletions pkg/tso/allocator_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,12 @@ import (

const (
// GlobalDCLocation is the Global TSO Allocator's DC location label.
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
globalTSOAllocatorEtcdPrefix = "gta"
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
GlobalDCLocation = "global"
checkStep = time.Minute
patrolStep = time.Second
defaultAllocatorLeaderLease = 3
localTSOAllocatorEtcdPrefix = "lta"
localTSOSuffixEtcdPrefix = "lts"
)

var (
Expand Down Expand Up @@ -1406,15 +1405,3 @@ func (am *AllocatorManager) GetLeaderAddr() string {
}
return leaderAddrs[0]
}

// Construct the timestampOracle path prefix, which is:
// 1. for the default keyspace group:
// "" in /pd/{cluster_id}/timestamp
// 2. for the non-default keyspace groups:
// {group}/gta in /ms/{cluster_id}/tso/{group}/gta/timestamp
func getKeyspaceGroupTSPath(groupID uint32) string {
if groupID == mcsutils.DefaultKeyspaceGroupID {
return ""
}
return path.Join(fmt.Sprintf("%05d", groupID), globalTSOAllocatorEtcdPrefix)
}
3 changes: 2 additions & 1 deletion pkg/tso/global_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/tikv/pd/pkg/errs"
mcsutils "github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/slice"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/logutil"
"github.com/tikv/pd/pkg/utils/tsoutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewGlobalTSOAllocator(
member: am.member,
timestampOracle: &timestampOracle{
client: am.member.GetLeadership().GetClient(),
tsPath: getKeyspaceGroupTSPath(am.kgID),
tsPath: endpoint.GetKeyspaceGroupTSPath(am.kgID),
storage: am.storage,
saveInterval: am.saveInterval,
updatePhysicalInterval: am.updatePhysicalInterval,
Expand Down
8 changes: 6 additions & 2 deletions pkg/tso/keyspace_group_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1278,7 +1278,7 @@ func (kgm *KeyspaceGroupManager) mergingChecker(ctx context.Context, mergeTarget
// update the newly merged TSO to make sure it is greater than the original ones.
var mergedTS time.Time
for _, id := range mergeList {
ts, err := kgm.tsoSvcStorage.LoadTimestamp(getKeyspaceGroupTSPath(id))
ts, err := kgm.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(id))
if err != nil || ts == typeutil.ZeroTime {
log.Error("failed to load the keyspace group TSO",
zap.String("member", kgm.tsoServiceID.ServiceAddr),
Expand Down Expand Up @@ -1439,7 +1439,11 @@ func (kgm *KeyspaceGroupManager) deletedGroupCleaner() {
zap.Uint32("keyspace-group-id", groupID))
// Clean up the remaining TSO keys.
// TODO: support the Local TSO Allocator.
err := kgm.tsoSvcStorage.DeleteTimestamp(path.Join(getKeyspaceGroupTSPath(groupID), "timestamp"))
err := kgm.tsoSvcStorage.DeleteTimestamp(
endpoint.GetTimestampPath(
endpoint.GetKeyspaceGroupTSPath(groupID),
),
)
if err != nil {
log.Warn("failed to delete the keyspace group tso key",
zap.Uint32("keyspace-group-id", groupID),
Expand Down
4 changes: 2 additions & 2 deletions pkg/tso/keyspace_group_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,15 +110,15 @@ func (suite *keyspaceGroupManagerTestSuite) TestDeletedGroupCleanup() {
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupPutEvent(1, []uint32{1}, []string{svcAddr})})
// Check if the TSO key is created.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(getKeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1))
re.NoError(err)
return ts != typeutil.ZeroTime
})
// Delete keyspace group 1.
suite.applyEtcdEvents(re, rootPath, []*etcdEvent{generateKeyspaceGroupDeleteEvent(1)})
// Check if the TSO key is deleted.
testutil.Eventually(re, func() bool {
ts, err := mgr.tsoSvcStorage.LoadTimestamp(getKeyspaceGroupTSPath(1))
ts, err := mgr.tsoSvcStorage.LoadTimestamp(endpoint.GetKeyspaceGroupTSPath(1))
re.NoError(err)
return ts == typeutil.ZeroTime
})
Expand Down
4 changes: 1 addition & 3 deletions pkg/tso/tso.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ package tso

import (
"fmt"
"path"
"sync/atomic"
"time"

Expand All @@ -34,7 +33,6 @@ import (
)

const (
timestampKey = "timestamp"
// UpdateTimestampGuard is the min timestamp interval.
UpdateTimestampGuard = time.Millisecond
// maxLogical is the max upper limit for logical time.
Expand Down Expand Up @@ -142,7 +140,7 @@ func (t *timestampOracle) calibrateLogical(rawLogical int64, suffixBits int) int

// GetTimestampPath returns the timestamp path in etcd.
func (t *timestampOracle) GetTimestampPath() string {
return path.Join(t.tsPath, timestampKey)
return endpoint.GetTimestampPath(t.tsPath)
}

// SyncTimestamp is used to synchronize the timestamp.
Expand Down
2 changes: 1 addition & 1 deletion tests/integrations/mcs/tso/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ func getEtcdTimestampKeyNum(re *require.Assertions, client *clientv3.Client) int
var count int
for _, kv := range resp.Kvs {
key := strings.TrimSpace(string(kv.Key))
if !strings.HasSuffix(key, "timestamp") {
if !strings.HasSuffix(key, endpoint.TimestampKey) {
continue
}
count++
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-backup/pdbackup/backup.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"path"
"strconv"

"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/typeutil"
"github.com/tikv/pd/server/config"
Expand Down Expand Up @@ -74,8 +75,7 @@ func GetBackupInfo(client *clientv3.Client, pdAddr string) (*BackupInfo, error)

backInfo.AllocIDMax = allocIDMax

timestampPath := path.Join(rootPath, "timestamp")
resp, err = etcdutil.EtcdKVGet(client, timestampPath)
resp, err = etcdutil.EtcdKVGet(client, endpoint.GetTimestampPath(rootPath))
if err != nil {
return nil, err
}
Expand Down
4 changes: 2 additions & 2 deletions tools/pd-backup/pdbackup/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/utils/etcdutil"
"github.com/tikv/pd/pkg/utils/testutil"
"github.com/tikv/pd/pkg/utils/typeutil"
Expand Down Expand Up @@ -133,10 +134,9 @@ func (s *backupTestSuite) BeforeTest(suiteName, testName string) {

var (
rootPath = path.Join(pdRootPath, strconv.FormatUint(clusterID, 10))
timestampPath = path.Join(rootPath, "timestamp")
allocTimestampMaxBytes = typeutil.Uint64ToBytes(allocTimestampMax)
)
_, err = s.etcdClient.Put(ctx, timestampPath, string(allocTimestampMaxBytes))
_, err = s.etcdClient.Put(ctx, endpoint.GetTimestampPath(rootPath), string(allocTimestampMaxBytes))
s.NoError(err)

var (
Expand Down

0 comments on commit 8d4e3d3

Please sign in to comment.