From 9b8e21da199554246b126c827e20a67b91a9be7c Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 3 Jul 2023 15:10:56 +0800 Subject: [PATCH 1/3] Allow TSO fallback happens in TestMixedTSODeployment Signed-off-by: JmPotato --- client/client.go | 8 ++++++++ client/option.go | 1 + client/tso_dispatcher.go | 2 +- tests/integrations/tso/client_test.go | 7 +++++-- 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/client/client.go b/client/client.go index 0597f7344ee..95211f4a148 100644 --- a/client/client.go +++ b/client/client.go @@ -265,6 +265,14 @@ func WithInitMetricsOption(initMetrics bool) ClientOption { } } +// WithAllowTSOFallback configures the client with `allowTSOFallback` option. +// NOTICE: This should only be used for testing. +func WithAllowTSOFallback() ClientOption { + return func(c *client) { + c.option.allowTSOFallback = true + } +} + var _ Client = (*client)(nil) // serviceModeKeeper is for service mode switching. diff --git a/client/option.go b/client/option.go index d6a6d61d2f9..9d46c7b1a70 100644 --- a/client/option.go +++ b/client/option.go @@ -54,6 +54,7 @@ type option struct { enableForwarding bool metricsLabels prometheus.Labels initMetrics bool + allowTSOFallback bool // Dynamic options. dynamicOptions [dynamicOptionCount]atomic.Value diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 37bea8db9e5..dfc5fd4a791 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -756,7 +756,7 @@ func (c *tsoClient) compareAndSwapTS( // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) && !c.option.allowTSOFallback { log.Panic("[tso] timestamp fallback", zap.String("dc-location", dcLocation), zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index a1c2ec08565..b282216a0f4 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -436,7 +436,7 @@ func TestMixedTSODeployment(t *testing.T) { ctx1, cancel1 := context.WithCancel(context.Background()) var wg sync.WaitGroup - checkTSO(ctx1, re, &wg, backendEndpoints) + checkTSO(ctx1, re, &wg, backendEndpoints, pd.WithAllowTSOFallback() /* It's expected that the timestamp fallback happens here */) wg.Add(1) go func() { defer wg.Done() @@ -497,7 +497,10 @@ func TestUpgradingAPIandTSOClusters(t *testing.T) { re.NoError(failpoint.Disable("github.com/tikv/pd/client/usePDServiceMode")) } -func checkTSO(ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, backendEndpoints string) { +func checkTSO( + ctx context.Context, re *require.Assertions, wg *sync.WaitGroup, + backendEndpoints string, opts ...pd.ClientOption, +) { wg.Add(tsoRequestConcurrencyNumber) for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { From 1290fb042044ce1544cf5d18f12b3e5925901669 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 3 Jul 2023 15:19:22 +0800 Subject: [PATCH 2/3] Refine the log Signed-off-by: JmPotato --- client/tso_dispatcher.go | 22 ++++++++++++++++++---- 1 file changed, 18 insertions(+), 4 deletions(-) diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index dfc5fd4a791..79dbddcb241 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -756,8 +756,23 @@ func (c *tsoClient) compareAndSwapTS( // to compare with the new TSO's first logical. For example, if we have a TSO resp with logical 10, count 5, then // all TSOs we get will be [6, 7, 8, 9, 10]. lastTSOInfo.logical stores the logical part of the largest ts returned // last time. - if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) && !c.option.allowTSOFallback { - log.Panic("[tso] timestamp fallback", + if tsoutil.TSLessEqual(physical, firstLogical, lastTSOInfo.physical, lastTSOInfo.logical) { + if !c.option.allowTSOFallback { + log.Panic("[tso] timestamp fallback", + zap.String("dc-location", dcLocation), + zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), + zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), + zap.String("cur-ts", fmt.Sprintf("(%d, %d)", physical, firstLogical)), + zap.String("last-tso-server", lastTSOInfo.tsoServer), + zap.String("cur-tso-server", curTSOInfo.tsoServer), + zap.Uint32("last-keyspace-group-in-request", lastTSOInfo.reqKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-request", curTSOInfo.reqKeyspaceGroupID), + zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), + zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), + zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) + } + log.Error("[tso] timestamp fallback", zap.String("dc-location", dcLocation), zap.Uint32("keyspace", c.svcDiscovery.GetKeyspaceID()), zap.String("last-ts", fmt.Sprintf("(%d, %d)", lastTSOInfo.physical, lastTSOInfo.logical)), @@ -769,8 +784,7 @@ func (c *tsoClient) compareAndSwapTS( zap.Uint32("last-keyspace-group-in-response", lastTSOInfo.respKeyspaceGroupID), zap.Uint32("cur-keyspace-group-in-response", curTSOInfo.respKeyspaceGroupID), zap.Time("last-response-received-at", lastTSOInfo.respReceivedAt), - zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt), - ) + zap.Time("cur-response-received-at", curTSOInfo.respReceivedAt)) } lastTSOInfo.tsoServer = curTSOInfo.tsoServer lastTSOInfo.reqKeyspaceGroupID = curTSOInfo.reqKeyspaceGroupID From 9cbdd020aa8e5e99a0ac214aeec6ef7b5b1b5ec8 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Mon, 3 Jul 2023 15:27:23 +0800 Subject: [PATCH 3/3] Fix the code Signed-off-by: JmPotato --- tests/integrations/tso/client_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integrations/tso/client_test.go b/tests/integrations/tso/client_test.go index b282216a0f4..6727877a1c7 100644 --- a/tests/integrations/tso/client_test.go +++ b/tests/integrations/tso/client_test.go @@ -505,7 +505,7 @@ func checkTSO( for i := 0; i < tsoRequestConcurrencyNumber; i++ { go func() { defer wg.Done() - cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ",")) + cli := mcs.SetupClientWithAPIContext(ctx, re, pd.NewAPIContextV1(), strings.Split(backendEndpoints, ","), opts...) defer cli.Close() var ts, lastTS uint64 for {