Skip to content

Commit

Permalink
Merge branch 'master' into fix-watch-ctx
Browse files Browse the repository at this point in the history
  • Loading branch information
ti-chi-bot[bot] authored May 15, 2023
2 parents 38fa449 + 2a82be3 commit b89aaff
Show file tree
Hide file tree
Showing 58 changed files with 1,330 additions and 251 deletions.
72 changes: 55 additions & 17 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,20 @@ func WithMaxErrorRetry(count int) ClientOption {
}
}

// WithMetricsLabels configures the client with metrics labels.
func WithMetricsLabels(labels prometheus.Labels) ClientOption {
return func(c *client) {
c.option.metricsLabels = labels
}
}

// WithInitMetricsOption configures the client with metrics labels.
func WithInitMetricsOption(initMetrics bool) ClientOption {
return func(c *client) {
c.option.initMetrics = initMetrics
}
}

var _ Client = (*client)(nil)

// serviceModeKeeper is for service mode switching.
Expand All @@ -252,20 +266,20 @@ type serviceModeKeeper struct {
// triggering service mode switching concurrently.
sync.RWMutex
serviceMode pdpb.ServiceMode
tsoClient atomic.Value // *tsoClient
tsoClient *tsoClient
tsoSvcDiscovery ServiceDiscovery
}

func (smk *serviceModeKeeper) close() {
smk.Lock()
defer smk.Unlock()
switch smk.serviceMode {
func (k *serviceModeKeeper) close() {
k.Lock()
defer k.Unlock()
switch k.serviceMode {
case pdpb.ServiceMode_API_SVC_MODE:
smk.tsoSvcDiscovery.Close()
k.tsoSvcDiscovery.Close()
fallthrough
case pdpb.ServiceMode_PD_SVC_MODE:
if tsoCli := smk.tsoClient.Load(); tsoCli != nil {
tsoCli.(*tsoClient).Close()
if k.tsoClient != nil {
k.tsoClient.Close()
}
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
}
Expand Down Expand Up @@ -415,6 +429,11 @@ func (c *client) loadKeyspaceMeta(keyspace string) error {
}

func (c *client) setup() error {
// Init the metrics.
if c.option.initMetrics {
initAndRegisterMetrics(c.option.metricsLabels)
}

// Init the client base.
if err := c.pdSvcDiscovery.Init(); err != nil {
return err
Expand Down Expand Up @@ -486,8 +505,8 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
}
newTSOCli.Setup()
// Replace the old TSO client.
oldTSOClient := c.getTSOClient()
c.tsoClient.Store(newTSOCli)
oldTSOClient := c.tsoClient
c.tsoClient = newTSOCli
oldTSOClient.Close()
// Replace the old TSO service discovery if needed.
oldTSOSvcDiscovery := c.tsoSvcDiscovery
Expand All @@ -506,11 +525,10 @@ func (c *client) setServiceMode(newMode pdpb.ServiceMode) {
zap.String("new-mode", newMode.String()))
}

func (c *client) getTSOClient() *tsoClient {
if tsoCli := c.tsoClient.Load(); tsoCli != nil {
return tsoCli.(*tsoClient)
}
return nil
func (c *client) getServiceClientProxy() (*tsoClient, pdpb.ServiceMode) {
c.RLock()
defer c.RUnlock()
return c.tsoClient, c.serviceMode
}

func (c *client) scheduleUpdateTokenConnection() {
Expand Down Expand Up @@ -675,7 +693,7 @@ func (c *client) GetLocalTSAsync(ctx context.Context, dcLocation string) TSFutur
req := tsoReqPool.Get().(*tsoRequest)
req.requestCtx = ctx
req.clientCtx = c.ctx
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
req.start = time.Now()
req.dcLocation = dcLocation

Expand Down Expand Up @@ -704,6 +722,26 @@ func (c *client) GetLocalTS(ctx context.Context, dcLocation string) (physical in
return resp.Wait()
}

func (c *client) GetMinTS(ctx context.Context) (physical int64, logical int64, err error) {
tsoClient, serviceMode := c.getServiceClientProxy()
if tsoClient == nil {
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("tso client is nil")
}

switch serviceMode {
case pdpb.ServiceMode_UNKNOWN_SVC_MODE:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("unknown service mode")
case pdpb.ServiceMode_PD_SVC_MODE:
// If the service mode is switched to API during GetTS() call, which happens during migration,
// returning the default timeline should be fine.
return c.GetTS(ctx)
case pdpb.ServiceMode_API_SVC_MODE:
return tsoClient.getMinTS(ctx)
default:
return 0, 0, errs.ErrClientGetMinTSO.FastGenByArgs("undefined service mode")
}
}

func handleRegionResponse(res *pdpb.GetRegionResponse) *Region {
if res.Region == nil {
return nil
Expand Down Expand Up @@ -1395,7 +1433,7 @@ func (c *client) respForErr(observer prometheus.Observer, start time.Time, err e
// GetTSOAllocators returns {dc-location -> TSO allocator leader URL} connection map
// For test only.
func (c *client) GetTSOAllocators() *sync.Map {
tsoClient := c.getTSOClient()
tsoClient, _ := c.getServiceClientProxy()
if tsoClient == nil {
return nil
}
Expand Down
13 changes: 7 additions & 6 deletions client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/stretchr/testify/require"
"github.com/tikv/pd/client/testutil"
"github.com/tikv/pd/client/tlsutil"
"github.com/tikv/pd/client/tsoutil"
"go.uber.org/goleak"
"google.golang.org/grpc"
)
Expand All @@ -32,13 +33,13 @@ func TestMain(m *testing.M) {
goleak.VerifyTestMain(m, testutil.LeakOptions...)
}

func TestTsLessEqual(t *testing.T) {
func TestTSLessEqual(t *testing.T) {
re := require.New(t)
re.True(tsLessEqual(9, 9, 9, 9))
re.True(tsLessEqual(8, 9, 9, 8))
re.False(tsLessEqual(9, 8, 8, 9))
re.False(tsLessEqual(9, 8, 9, 6))
re.True(tsLessEqual(9, 6, 9, 8))
re.True(tsoutil.TSLessEqual(9, 9, 9, 9))
re.True(tsoutil.TSLessEqual(8, 9, 9, 8))
re.False(tsoutil.TSLessEqual(9, 8, 8, 9))
re.False(tsoutil.TSLessEqual(9, 8, 9, 6))
re.True(tsoutil.TSLessEqual(9, 6, 9, 8))
}

func TestUpdateURLs(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions client/errs/errno.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ var (
ErrClientTSOStreamClosed = errors.Normalize("encountered TSO stream being closed unexpectedly", errors.RFCCodeText("PD:client:ErrClientTSOStreamClosed"))
ErrClientGetTSOTimeout = errors.Normalize("get TSO timeout", errors.RFCCodeText("PD:client:ErrClientGetTSOTimeout"))
ErrClientGetTSO = errors.Normalize("get TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetTSO"))
ErrClientGetMinTSO = errors.Normalize("get min TSO failed, %v", errors.RFCCodeText("PD:client:ErrClientGetMinTSO"))
ErrClientGetLeader = errors.Normalize("get leader failed, %v", errors.RFCCodeText("PD:client:ErrClientGetLeader"))
ErrClientGetMember = errors.Normalize("get member failed", errors.RFCCodeText("PD:client:ErrClientGetMember"))
ErrClientGetClusterInfo = errors.Normalize("get cluster info failed", errors.RFCCodeText("PD:client:ErrClientGetClusterInfo"))
Expand Down
2 changes: 1 addition & 1 deletion client/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/opentracing/opentracing-go v1.2.0
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3
github.com/prometheus/client_golang v1.11.1
github.com/stretchr/testify v1.8.2
Expand Down
4 changes: 2 additions & 2 deletions client/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm
github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0=
github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be h1:eHtwHgPzzm8aIZ4x8o7zg1b23cjUl0AikW+SDLpqf3E=
github.com/pingcap/kvproto v0.0.0-20230426023724-d90a321b46be/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1 h1:VXQ6Du/nKZ9IQnI9NWMzKbftWu8NV5pQkSLKIRzzGN4=
github.com/pingcap/kvproto v0.0.0-20230511011722-6e0e8a7deaa1/go.mod h1:guCyM5N+o+ru0TsoZ1hi9lDjUMs2sIBjW3ARTEpVbnk=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3 h1:HR/ylkkLmGdSSDaD8IDP+SZrdhV1Kibl9KrHxJ9eciw=
github.com/pingcap/log v1.1.1-0.20221110025148-ca232912c9f3/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
Expand Down
Loading

0 comments on commit b89aaff

Please sign in to comment.