From bb23a5c739c0e869062301d3e9d634103d3d6533 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Thu, 10 Mar 2022 02:09:56 +0800 Subject: [PATCH 01/10] ref tikv#4686 Signed-off-by: lhy1024 --- client/client.go | 23 ++++ client/go.mod | 2 +- client/go.sum | 4 +- go.mod | 2 +- go.sum | 4 +- server/api/min_resolved_ts.go | 66 +++++++++ server/api/min_resolved_ts_test.go | 77 +++++++++++ server/api/router.go | 4 + server/api/service_gc_safepoint_test.go | 2 +- server/cluster/cluster.go | 20 ++- server/grpc_service.go | 37 +++++ server/storage/endpoint/key_path.go | 11 ++ server/storage/endpoint/min_resolved_ts.go | 126 ++++++++++++++++++ server/storage/storage.go | 1 + server/storage/storage_test.go | 28 ++++ tests/client/client_test.go | 32 +++++ tests/client/go.mod | 2 +- tests/client/go.sum | 4 +- tests/server/cluster/cluster_test.go | 45 ++++++- tools/pd-ctl/pdctl/command/min_resolved_ts.go | 45 +++++++ tools/pd-ctl/pdctl/ctl.go | 1 + tools/pd-tso-bench/go.sum | 2 + 22 files changed, 525 insertions(+), 13 deletions(-) create mode 100644 server/api/min_resolved_ts.go create mode 100644 server/api/min_resolved_ts_test.go create mode 100644 server/storage/endpoint/min_resolved_ts.go create mode 100644 tools/pd-ctl/pdctl/command/min_resolved_ts.go diff --git a/client/client.go b/client/client.go index 405194b9444..021f2e8ea1b 100644 --- a/client/client.go +++ b/client/client.go @@ -124,6 +124,8 @@ type Client interface { StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error // WatchGlobalConfig returns an stream with all global config and updates WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) + // ReportMinResolvedTS reports the min resolved ts to pd. + ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error // Close closes the client. @@ -1865,3 +1867,24 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } + +func (c *client) ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error { + if span := opentracing.SpanFromContext(ctx); span != nil { + span = opentracing.StartSpan("pdclient.ReportMinResolvedTS", opentracing.ChildOf(span.Context())) + defer span.Finish() + } + ctx, cancel := context.WithTimeout(ctx, c.option.timeout) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: c.requestHeader(), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) + _, err := c.getClient().ReportMinResolvedTS(ctx, req) + cancel() + + if err != nil { + return errors.WithStack(err) + } + return nil +} diff --git a/client/go.mod b/client/go.mod index e3b9192bb30..fb1d5a0bae0 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b + github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/prometheus/client_golang v1.11.0 go.uber.org/goleak v1.1.11 diff --git a/client/go.sum b/client/go.sum index c705a06c247..2f27cfbb563 100644 --- a/client/go.sum +++ b/client/go.sum @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/go.mod b/go.mod index 2ab84160c59..5f851246a48 100644 --- a/go.mod +++ b/go.mod @@ -27,7 +27,7 @@ require ( github.com/pingcap/errcode v0.3.0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce - github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b + github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a github.com/pingcap/log v0.0.0-20210906054005-afc726e70354 github.com/pingcap/sysutil v0.0.0-20211208032423-041a72e5860d github.com/pingcap/tidb-dashboard v0.0.0-20220117082709-e8076b5c79ba diff --git a/go.sum b/go.sum index cf6a9b525f4..7cf337d336c 100644 --- a/go.sum +++ b/go.sum @@ -398,8 +398,8 @@ github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce h1:Y1kCxlCtlPTMt github.com/pingcap/failpoint v0.0.0-20200702092429-9f69995143ce/go.mod h1:w4PEZ5y16LeofeeGwdgZB4ddv9bLyDuIX+ljstgKZyk= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go new file mode 100644 index 00000000000..1e45506148c --- /dev/null +++ b/server/api/min_resolved_ts.go @@ -0,0 +1,66 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "net/http" + + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/storage/endpoint" + "github.com/unrolled/render" +) + +type minResolvedTSHandler struct { + svr *server.Server + rd *render.Render +} + +func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolvedTSHandler { + return &minResolvedTSHandler{ + svr: svr, + rd: rd, + } +} + +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type listMinResolvedTS struct { + MinResolvedTSList []*endpoint.MinResolvedTSPoint `json:"list"` + MinResolvedTSForCluster uint64 `json:"min_resolved_ts"` +} + +// @Tags minresolvedts +// @Summary Get min resolved ts. +// @Produce json +// @Success 200 {array} listMinResolvedTS +// @Failure 500 {string} string "PD server failed to proceed the request." +// @Router /min-resolved-ts [get] +func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) { + storage := h.svr.GetStorage() + minResolvedTS, err := storage.LoadClusterMinResolvedTS() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + minResolvedTSList, err := storage.LoadAllMinResolvedTS() + if err != nil { + h.rd.JSON(w, http.StatusInternalServerError, err.Error()) + return + } + list := listMinResolvedTS{ + MinResolvedTSList: minResolvedTSList, + MinResolvedTSForCluster: minResolvedTS, + } + h.rd.JSON(w, http.StatusOK, list) +} diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go new file mode 100644 index 00000000000..e523770792a --- /dev/null +++ b/server/api/min_resolved_ts_test.go @@ -0,0 +1,77 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package api + +import ( + "fmt" + + . "github.com/pingcap/check" + "github.com/pingcap/kvproto/pkg/metapb" + "github.com/tikv/pd/pkg/apiutil" + "github.com/tikv/pd/server" + "github.com/tikv/pd/server/storage/endpoint" +) + +var _ = Suite(&testMinResolvedTSSuite{}) + +type testMinResolvedTSSuite struct { + svr *server.Server + cleanup cleanUpFunc + urlPrefix string +} + +func (s *testMinResolvedTSSuite) SetUpSuite(c *C) { + s.svr, s.cleanup = mustNewServer(c) + mustWaitLeader(c, []*server.Server{s.svr}) + + addr := s.svr.GetAddr() + s.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) + + mustBootstrapCluster(c, s.svr) + mustPutStore(c, s.svr, 1, metapb.StoreState_Up, metapb.NodeState_Serving, nil) +} + +func (s *testMinResolvedTSSuite) TearDownSuite(c *C) { + s.cleanup() +} + +func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) { + url := s.urlPrefix + "/min-resolved-ts" + storage := s.svr.GetStorage() + testData := []uint64{233333, 23333, 2333, 233, 1} + result := &listMinResolvedTS{ + MinResolvedTSList: make([]*endpoint.MinResolvedTSPoint, 0), + } + for i, minResolvedTS := range testData { + storeID := uint64(i + 1) + err := storage.SaveMinResolvedTS(storeID, minResolvedTS) + c.Assert(err, IsNil) + result.MinResolvedTSList = append(result.MinResolvedTSList, &endpoint.MinResolvedTSPoint{ + StoreID: storeID, + MinResolvedTS: minResolvedTS, + }) + } + ts, err := storage.LoadClusterMinResolvedTS() + c.Assert(err, IsNil) + result.MinResolvedTSForCluster = ts + + res, err := testDialClient.Get(url) + c.Assert(err, IsNil) + defer res.Body.Close() + listResp := &listMinResolvedTS{} + err = apiutil.ReadJSON(res.Body, listResp) + c.Assert(err, IsNil) + c.Assert(listResp, DeepEquals, result) +} diff --git a/server/api/router.go b/server/api/router.go index 8a923f81dd3..aaada96e600 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -351,6 +351,10 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "GetGCSafePoint", "/gc/safepoint", serviceGCSafepointHandler.List, setMethods("GET"), setAuditBackend(localLog)) registerFunc(apiRouter, "DeleteGCSafePoint", "/gc/safepoint/{service_id}", serviceGCSafepointHandler.Delete, setMethods("DELETE"), setAuditBackend(localLog)) + // min resolved ts API + MinResolvedTSHandler := newMinResolvedTSHandler(svr, rd) + registerFunc(apiRouter, "GetMinResolvedTS", "/min-resolved-ts", MinResolvedTSHandler.List, setMethods("GET"), setAuditBackend(localLog)) + // unsafe admin operation API unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) registerFunc(clusterRouter, "RemoveFailedStoresUnsafely", "/admin/unsafe/remove-failed-stores", diff --git a/server/api/service_gc_safepoint_test.go b/server/api/service_gc_safepoint_test.go index 32237499bce..c3bc931c843 100644 --- a/server/api/service_gc_safepoint_test.go +++ b/server/api/service_gc_safepoint_test.go @@ -49,7 +49,7 @@ func (s *testServiceGCSafepointSuite) TearDownSuite(c *C) { s.cleanup() } -func (s *testServiceGCSafepointSuite) TestRegionStats(c *C) { +func (s *testServiceGCSafepointSuite) TestServiceGCSafepoint(c *C) { sspURL := s.urlPrefix + "/gc/safepoint" storage := s.svr.GetStorage() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 83852a300c9..e2fc8c5fdc7 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -1067,12 +1067,18 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { c.onStoreVersionChangeLocked() if err == nil { // clean up the residual information. - c.RemoveStoreLimit(storeID) + c.CleanTombstoneResidualInfo(storeID) c.hotStat.RemoveRollingStoreStats(storeID) } return err } +// CleanTombstoneResidualInfo clean up the residual information of tombstone store. +func (c *RaftCluster) CleanTombstoneResidualInfo(storeID uint64) { + c.RemoveStoreLimit(storeID) + c.RemoveMinResolvedTSStorage(storeID) +} + // PauseLeaderTransfer prevents the store from been selected as source or // target store of TransferLeader. func (c *RaftCluster) PauseLeaderTransfer(storeID uint64) error { @@ -1219,7 +1225,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error { errs.ZapError(err)) return err } - c.RemoveStoreLimit(store.GetID()) + c.CleanTombstoneResidualInfo(store.GetID()) log.Info("delete store succeeded", zap.Stringer("store", store.GetMeta())) } @@ -1650,6 +1656,16 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { log.Error("persist store limit meet error", errs.ZapError(err)) } +// RemoveMinResolvedTSStorage remove min resolved ts storage for a given store ID. +func (c *RaftCluster) RemoveMinResolvedTSStorage(storeID uint64) error { + if c.storage != nil { + if err := c.storage.RemoveMinResolvedTS(storeID); err != nil { + return err + } + } + return nil +} + // SetStoreLimit sets a store limit for a given type and rate. func (c *RaftCluster) SetStoreLimit(storeID uint64, typ storelimit.Type, ratePerMin float64) error { old := c.opt.GetScheduleConfig().Clone() diff --git a/server/grpc_service.go b/server/grpc_service.go index a6df2bc1231..4a1ea8825e9 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1819,3 +1819,40 @@ func (s *GrpcServer) handleDamagedStore(stats *pdpb.StoreStats) error { // TODO: reimplement add scheduler logic to avoid repeating the introduction HTTP requests inside `server/api`. return s.GetHandler().AddEvictOrGrant(float64(stats.GetStoreId()), schedulers.EvictLeaderName) } + +// ReportMinResolvedTS implements gRPC PDServer. +func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.ReportMinResolvedTsRequest) (*pdpb.ReportMinResolvedTsResponse, error) { + forwardedHost := getForwardedHost(ctx) + if !s.isLocalRequest(forwardedHost) { + client, err := s.getDelegateClient(ctx, forwardedHost) + if err != nil { + return nil, err + } + ctx = grpcutil.ResetForwardContext(ctx) + return pdpb.NewPDClient(client).ReportMinResolvedTS(ctx, request) + } + + if err := s.validateRequest(request.GetHeader()); err != nil { + return nil, err + } + + rc := s.GetRaftCluster() + if rc == nil { + return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil + } + + var storage endpoint.MinResolvedTSStorage = s.storage + storeID := request.StoreId + minResolvedTS := request.MinResolvedTs + + if err := storage.SaveMinResolvedTS(storeID, minResolvedTS); err != nil { + return nil, err + } + log.Debug("updated min resolved-ts", + zap.Uint64("store", storeID), + zap.Uint64("min resolved-ts", minResolvedTS)) + + return &pdpb.ReportMinResolvedTsResponse{ + Header: s.header(), + }, nil +} diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 195c4dea2aa..4cd5763e3a4 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -30,6 +30,7 @@ const ( replicationPath = "replication_mode" customScheduleConfigPath = "scheduler_config" gcWorkerServiceSafePointID = "gc_worker" + minResolvedTS = "min_resolved_ts" ) // AppendToRootPath appends the given key to the rootPath. @@ -97,3 +98,13 @@ func GCSafePointServicePrefixPath() string { func gcSafePointServicePath(serviceID string) string { return path.Join(gcSafePointPath(), "service", serviceID) } + +// MinResolvedTSPath returns the min resolved ts with the given store ID. +func MinResolvedTSPath(storeID uint64) string { + return path.Join(minResolvedTS, fmt.Sprintf("%020d", storeID)) +} + +// MinResolvedTSPrefixPath returns the min resolved ts key path prefix. +func MinResolvedTSPrefixPath() string { + return minResolvedTS + "/" +} diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go new file mode 100644 index 00000000000..fea649dd256 --- /dev/null +++ b/server/storage/endpoint/min_resolved_ts.go @@ -0,0 +1,126 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package endpoint + +import ( + "math" + "path" + "strconv" + + "go.etcd.io/etcd/clientv3" +) + +// MinResolvedTSPoint is the min resolved ts for a store +// NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. +type MinResolvedTSPoint struct { + StoreID uint64 `json:"store_id"` + MinResolvedTS uint64 `json:"min_resolved_ts"` +} + +// MinResolvedTSStorage defines the storage operations on the min resolved ts. +type MinResolvedTSStorage interface { + LoadMinResolvedTS(storeID uint64) (uint64, error) + SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error + LoadClusterMinResolvedTS() (uint64, error) + LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) + RemoveMinResolvedTS(storeID uint64) error +} + +var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) + +// LoadMinResolvedTS loads the min resolved ts with the given store ID from storage. +func (se *StorageEndpoint) LoadMinResolvedTS(storeID uint64) (uint64, error) { + value, err := se.Load(MinResolvedTSPath(storeID)) + if err != nil { + return 0, err + } + if value == "" { + return 0, nil + } + minResolvedTS, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, err + } + return minResolvedTS, nil +} + +// SaveMinResolvedTS saves the min resolved ts with the given store ID to storage. +func (se *StorageEndpoint) SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error { + value := strconv.FormatUint(minResolvedTS, 16) + return se.Save(MinResolvedTSPath(storeID), value) +} + +// LoadClusterMinResolvedTS returns the min resolved ts for the cluster +func (se *StorageEndpoint) LoadClusterMinResolvedTS() (uint64, error) { + prefix := MinResolvedTSPrefixPath() + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return math.MaxUint64, err + } + if len(keys) == 0 { + // There's no service safepoint. It may be a new cluster, or upgraded from an older version + return 0, nil + } + + min := uint64(math.MaxUint64) + for i := range keys { + var ts uint64 + if ts, err = strconv.ParseUint(values[i], 16, 64); err != nil { + return min, err + } + if ts < min { + min = ts + } + } + return min, nil +} + +// LoadAllMinResolvedTS returns min resolved ts of all stores. +func (se *StorageEndpoint) LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) { + prefix := MinResolvedTSPrefixPath() + prefixEnd := clientv3.GetPrefixRangeEnd(prefix) + keys, values, err := se.LoadRange(prefix, prefixEnd, 0) + if err != nil { + return nil, err + } + if len(keys) == 0 { + return []*MinResolvedTSPoint{}, nil + } + + tss := make([]*MinResolvedTSPoint, 0, len(keys)) + for i, key := range keys { + var minResolvedTS, storeID uint64 + if minResolvedTS, err = strconv.ParseUint(values[i], 16, 64); err != nil { + return nil, err + } + if storeID, err = strconv.ParseUint(path.Base(key), 16, 64); err != nil { + return nil, err + } + ts := &MinResolvedTSPoint{ + StoreID: storeID, + MinResolvedTS: minResolvedTS, + } + tss = append(tss, ts) + } + + return tss, nil +} + +// RemoveMinResolvedTS removes min resolved ts for the store +func (se *StorageEndpoint) RemoveMinResolvedTS(storeID uint64) error { + key := MinResolvedTSPath(storeID) + return se.Remove(key) +} diff --git a/server/storage/storage.go b/server/storage/storage.go index cd63156f8c8..b92a9248672 100644 --- a/server/storage/storage.go +++ b/server/storage/storage.go @@ -37,6 +37,7 @@ type Storage interface { endpoint.RuleStorage endpoint.ReplicationStatusStorage endpoint.GCSafePointStorage + endpoint.MinResolvedTSStorage } // NewStorageWithMemoryBackend creates a new storage with memory backend. diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index 51870a62133..feb08f0a3a6 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -279,3 +279,31 @@ func (s *testStorageSuite) TestLoadRegionsExceedRangeLimit(c *C) { } c.Assert(failpoint.Disable("github.com/tikv/pd/server/storage/kv/withRangeLimit"), IsNil) } + +func (s *testStorageSuite) TestMinResolvedTSStorage(c *C) { + storage := NewStorageWithMemoryBackend() + testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} + + r, e := storage.LoadClusterMinResolvedTS() + c.Assert(r, Equals, uint64(0)) + c.Assert(e, IsNil) + for i, minResolvedTS := range testData { + storeID := uint64(i + 1) + err := storage.SaveMinResolvedTS(storeID, minResolvedTS) + c.Assert(err, IsNil) + minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) + c.Assert(err, IsNil) + c.Assert(minResolvedTS, Equals, minResolvedTS1) + min, err := storage.LoadClusterMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(min, Equals, minResolvedTS) + } + for i := range testData { + storeID := uint64(i + 1) + err := storage.RemoveMinResolvedTS(storeID) + c.Assert(err, IsNil) + } + r, e = storage.LoadClusterMinResolvedTS() + c.Assert(r, Equals, uint64(0)) + c.Assert(e, IsNil) +} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index e7301372d47..41d18b987d0 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -1164,6 +1164,38 @@ func (s *testClientSuite) TestScatterRegion(c *C) { c.Succeed() } +func (s *testClientSuite) TestReportMinResolvedTS(c *C) { + storage := s.srv.GetStorage() + testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} + r, e := storage.LoadClusterMinResolvedTS() + c.Assert(r, Equals, uint64(0)) + c.Assert(e, IsNil) + for i, minResolvedTS := range testData { + storeID := uint64(i + 1) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: newHeader(s.srv), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + _, err := s.grpcSvr.ReportMinResolvedTS(context.Background(), req) + c.Assert(err, IsNil) + minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) + c.Assert(err, IsNil) + c.Assert(minResolvedTS, Equals, minResolvedTS1) + min, err := storage.LoadClusterMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(min, Equals, minResolvedTS) + } + for i := range testData { + storeID := uint64(i + 1) + err := storage.RemoveMinResolvedTS(storeID) + c.Assert(err, IsNil) + } + r, e = storage.LoadClusterMinResolvedTS() + c.Assert(r, Equals, uint64(0)) + c.Assert(e, IsNil) +} + type testConfigTTLSuite struct { ctx context.Context cancel context.CancelFunc diff --git a/tests/client/go.mod b/tests/client/go.mod index 2d7f51778a7..b258d794a76 100644 --- a/tests/client/go.mod +++ b/tests/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/golang/protobuf v1.5.2 // indirect github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b + github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a github.com/tikv/pd v0.0.0-00010101000000-000000000000 github.com/tikv/pd/client v0.0.0-00010101000000-000000000000 go.etcd.io/etcd v0.5.0-alpha.5.0.20191023171146-3cf2f69b5738 diff --git a/tests/client/go.sum b/tests/client/go.sum index 0531d16e10b..57569d5ee6b 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -403,8 +403,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= -github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20200511115504-543df19646ad/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20210625125904-98ed8e2eb1c7/go.mod h1:8AanEdAHATuRurdGxZXBz0At+9avep+ub7U1AGYLIMM= diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 7f4affc47f3..dcc27e65e9c 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -245,7 +245,7 @@ func resetStoreState(c *C, rc *cluster.RaftCluster, storeID uint64, state metapb if state == metapb.StoreState_Offline { rc.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) } else if state == metapb.StoreState_Tombstone { - rc.RemoveStoreLimit(storeID) + rc.CleanTombstoneResidualInfo(storeID) } } @@ -1115,3 +1115,46 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { err = rc.HandleRegionHeartbeat(region) c.Assert(err, IsNil) } + +func (s *clusterTestSuite) TestMinResolvedTSWithTombstone(c *C) { + tc, err := tests.NewTestCluster(s.ctx, 1) + defer tc.Destroy() + c.Assert(err, IsNil) + + err = tc.RunInitialServers() + c.Assert(err, IsNil) + tc.WaitLeader() + leaderServer := tc.GetServer(tc.GetLeader()) + grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) + clusterID := leaderServer.GetClusterID() + bootstrapCluster(c, clusterID, grpcPDClient) + rc := leaderServer.GetRaftCluster() + c.Assert(rc, NotNil) + + req := &pdpb.GetAllStoresRequest{ + Header: testutil.NewRequestHeader(clusterID), + } + resp, err := grpcPDClient.GetAllStores(context.Background(), req) + c.Assert(err, IsNil) + c.Assert(resp.Stores, HasLen, 1) + storeID := resp.Stores[0].GetId() + + min := uint64(233) + req2 := &pdpb.ReportMinResolvedTsRequest{ + Header: testutil.NewRequestHeader(clusterID), + StoreId: storeID, + MinResolvedTs: min, + } + _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req2) + c.Assert(err, IsNil) + ts0, err := rc.GetStorage().LoadMinResolvedTS(storeID) + c.Assert(err, IsNil) + c.Assert(ts0, Equals, min) + ts1, err := rc.GetStorage().LoadClusterMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts1, Equals, min) + resetStoreState(c, rc, storeID, metapb.StoreState_Tombstone) + ts2, err := rc.GetStorage().LoadClusterMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts2, Equals, uint64(0)) +} diff --git a/tools/pd-ctl/pdctl/command/min_resolved_ts.go b/tools/pd-ctl/pdctl/command/min_resolved_ts.go new file mode 100644 index 00000000000..dbf0c47b2de --- /dev/null +++ b/tools/pd-ctl/pdctl/command/min_resolved_ts.go @@ -0,0 +1,45 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package command + +import ( + "net/http" + + "github.com/spf13/cobra" +) + +var ( + minResolvedTSPrefix = "pd/api/v1/min-resolved-ts" +) + +// NewMinResolvedTSCommand return min resolved ts subcommand of rootCmd +func NewMinResolvedTSCommand() *cobra.Command { + l := &cobra.Command{ + Use: "min-resolved-ts", + Short: "show min resolved ts", + Run: ShowMinResolvedTS, + } + return l +} + +// ShowMinResolvedTS show min resolved ts +func ShowMinResolvedTS(cmd *cobra.Command, args []string) { + r, err := doRequest(cmd, minResolvedTSPrefix, http.MethodGet, http.Header{}) + if err != nil { + cmd.Printf("Failed to get min resolved ts: %s\n", err) + return + } + cmd.Println(r) +} diff --git a/tools/pd-ctl/pdctl/ctl.go b/tools/pd-ctl/pdctl/ctl.go index ef3c1467208..2399d26fed0 100644 --- a/tools/pd-ctl/pdctl/ctl.go +++ b/tools/pd-ctl/pdctl/ctl.go @@ -62,6 +62,7 @@ func GetRootCmd() *cobra.Command { command.NewLogCommand(), command.NewPluginCommand(), command.NewServiceGCSafepointCommand(), + command.NewMinResolvedTSCommand(), command.NewCompletionCommand(), command.NewUnsafeCommand(), ) diff --git a/tools/pd-tso-bench/go.sum b/tools/pd-tso-bench/go.sum index 789c13355e2..0cf5621abd8 100644 --- a/tools/pd-tso-bench/go.sum +++ b/tools/pd-tso-bench/go.sum @@ -110,6 +110,8 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= +github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= From 09b7079ff70fb12aa0232a82254f47a1fa995df6 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Sat, 12 Mar 2022 04:18:25 +0800 Subject: [PATCH 02/10] address comments Signed-off-by: lhy1024 --- client/client.go | 23 ----- client/go.mod | 2 +- client/go.sum | 4 +- server/api/min_resolved_ts.go | 14 +-- server/api/min_resolved_ts_test.go | 19 +--- server/cluster/cluster.go | 76 +++++++++++++--- server/core/store.go | 26 ++++-- server/core/store_option.go | 7 ++ server/grpc_service.go | 5 +- server/storage/endpoint/key_path.go | 11 +-- server/storage/endpoint/min_resolved_ts.go | 86 ++---------------- server/storage/storage_test.go | 28 ------ tests/client/client_test.go | 32 ------- tests/client/go.sum | 1 + tests/server/cluster/cluster_test.go | 101 +++++++++++++++------ 15 files changed, 184 insertions(+), 251 deletions(-) diff --git a/client/client.go b/client/client.go index 021f2e8ea1b..405194b9444 100644 --- a/client/client.go +++ b/client/client.go @@ -124,8 +124,6 @@ type Client interface { StoreGlobalConfig(ctx context.Context, items []GlobalConfigItem) error // WatchGlobalConfig returns an stream with all global config and updates WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem, error) - // ReportMinResolvedTS reports the min resolved ts to pd. - ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error // UpdateOption updates the client option. UpdateOption(option DynamicOption, value interface{}) error // Close closes the client. @@ -1867,24 +1865,3 @@ func (c *client) WatchGlobalConfig(ctx context.Context) (chan []GlobalConfigItem }() return globalConfigWatcherCh, err } - -func (c *client) ReportMinResolvedTS(ctx context.Context, storeID, minResolvedTS uint64) error { - if span := opentracing.SpanFromContext(ctx); span != nil { - span = opentracing.StartSpan("pdclient.ReportMinResolvedTS", opentracing.ChildOf(span.Context())) - defer span.Finish() - } - ctx, cancel := context.WithTimeout(ctx, c.option.timeout) - req := &pdpb.ReportMinResolvedTsRequest{ - Header: c.requestHeader(), - StoreId: storeID, - MinResolvedTs: minResolvedTS, - } - ctx = grpcutil.BuildForwardContext(ctx, c.GetLeaderAddr()) - _, err := c.getClient().ReportMinResolvedTS(ctx, req) - cancel() - - if err != nil { - return errors.WithStack(err) - } - return nil -} diff --git a/client/go.mod b/client/go.mod index fb1d5a0bae0..e3b9192bb30 100644 --- a/client/go.mod +++ b/client/go.mod @@ -7,7 +7,7 @@ require ( github.com/pingcap/check v0.0.0-20211026125417-57bd13f7b5f0 github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 - github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a + github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee github.com/prometheus/client_golang v1.11.0 go.uber.org/goleak v1.1.11 diff --git a/client/go.sum b/client/go.sum index 2f27cfbb563..c705a06c247 100644 --- a/client/go.sum +++ b/client/go.sum @@ -106,8 +106,8 @@ github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c h1:xpW9bvK+HuuTm github.com/pingcap/errors v0.11.5-0.20211224045212-9687c2b0f87c/go.mod h1:X2r9ueLEUZgtx2cIogM0v4Zj5uvvzhuuiu7Pn8HzMPg= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZFh4N3vQ5HEtld3S+Y+StULhWVvumU0= github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= -github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= -github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b h1:/OL63rEIcCEivpgTLCkhxVbO3RMxSuHtsKWSgDwS6oY= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee h1:VO2t6IBpfvW34TdtD/G10VvnGqjLic1jzOuHjUb5VqM= github.com/pingcap/log v0.0.0-20211215031037-e024ba4eb0ee/go.mod h1:DWQW5jICDR7UJh4HtxXSM20Churx4CQL0fwL/SoOSA4= diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index 1e45506148c..a28e9c8df8a 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -18,7 +18,6 @@ import ( "net/http" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/storage/endpoint" "github.com/unrolled/render" ) @@ -36,8 +35,7 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type listMinResolvedTS struct { - MinResolvedTSList []*endpoint.MinResolvedTSPoint `json:"list"` - MinResolvedTSForCluster uint64 `json:"min_resolved_ts"` + MinResolvedTS uint64 `json:"min_resolved_ts"` } // @Tags minresolvedts @@ -48,19 +46,13 @@ type listMinResolvedTS struct { // @Router /min-resolved-ts [get] func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) { storage := h.svr.GetStorage() - minResolvedTS, err := storage.LoadClusterMinResolvedTS() - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - minResolvedTSList, err := storage.LoadAllMinResolvedTS() + minResolvedTS, err := storage.LoadMinResolvedTS() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } list := listMinResolvedTS{ - MinResolvedTSList: minResolvedTSList, - MinResolvedTSForCluster: minResolvedTS, + MinResolvedTS: minResolvedTS, } h.rd.JSON(w, http.StatusOK, list) } diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go index e523770792a..595ea42b1b9 100644 --- a/server/api/min_resolved_ts_test.go +++ b/server/api/min_resolved_ts_test.go @@ -21,7 +21,6 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/server" - "github.com/tikv/pd/server/storage/endpoint" ) var _ = Suite(&testMinResolvedTSSuite{}) @@ -50,23 +49,11 @@ func (s *testMinResolvedTSSuite) TearDownSuite(c *C) { func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) { url := s.urlPrefix + "/min-resolved-ts" storage := s.svr.GetStorage() - testData := []uint64{233333, 23333, 2333, 233, 1} + min := uint64(233) + storage.SaveMinResolvedTS(min) result := &listMinResolvedTS{ - MinResolvedTSList: make([]*endpoint.MinResolvedTSPoint, 0), + MinResolvedTS: min, } - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - err := storage.SaveMinResolvedTS(storeID, minResolvedTS) - c.Assert(err, IsNil) - result.MinResolvedTSList = append(result.MinResolvedTSList, &endpoint.MinResolvedTSPoint{ - StoreID: storeID, - MinResolvedTS: minResolvedTS, - }) - } - ts, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - result.MinResolvedTSForCluster = ts - res, err := testDialClient.Get(url) c.Assert(err, IsNil) defer res.Body.Close() diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index e2fc8c5fdc7..44fccfd76f0 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -17,6 +17,7 @@ package cluster import ( "context" "fmt" + "math" "net/http" "strconv" "sync" @@ -53,6 +54,7 @@ import ( ) var backgroundJobInterval = 10 * time.Second +var saveMinResolvedTSInterval = 1 * time.Second const ( clientTimeout = 3 * time.Second @@ -251,15 +253,17 @@ func (c *RaftCluster) Start(s Server) error { c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) - c.wg.Add(5) + c.wg.Add(6) go c.runCoordinator() failpoint.Inject("highFrequencyClusterJobs", func() { backgroundJobInterval = 100 * time.Microsecond + saveMinResolvedTSInterval = 1 * time.Microsecond }) go c.runBackgroundJobs(backgroundJobInterval) go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() + go c.runMinResolvedTSJob(saveMinResolvedTSInterval) c.running = true return nil @@ -1067,18 +1071,12 @@ func (c *RaftCluster) BuryStore(storeID uint64, forceBury bool) error { c.onStoreVersionChangeLocked() if err == nil { // clean up the residual information. - c.CleanTombstoneResidualInfo(storeID) + c.RemoveStoreLimit(storeID) c.hotStat.RemoveRollingStoreStats(storeID) } return err } -// CleanTombstoneResidualInfo clean up the residual information of tombstone store. -func (c *RaftCluster) CleanTombstoneResidualInfo(storeID uint64) { - c.RemoveStoreLimit(storeID) - c.RemoveMinResolvedTSStorage(storeID) -} - // PauseLeaderTransfer prevents the store from been selected as source or // target store of TransferLeader. func (c *RaftCluster) PauseLeaderTransfer(storeID uint64) error { @@ -1225,7 +1223,7 @@ func (c *RaftCluster) RemoveTombStoneRecords() error { errs.ZapError(err)) return err } - c.CleanTombstoneResidualInfo(store.GetID()) + c.RemoveStoreLimit(store.GetID()) log.Info("delete store succeeded", zap.Stringer("store", store.GetMeta())) } @@ -1656,14 +1654,62 @@ func (c *RaftCluster) RemoveStoreLimit(storeID uint64) { log.Error("persist store limit meet error", errs.ZapError(err)) } -// RemoveMinResolvedTSStorage remove min resolved ts storage for a given store ID. -func (c *RaftCluster) RemoveMinResolvedTSStorage(storeID uint64) error { - if c.storage != nil { - if err := c.storage.RemoveMinResolvedTS(storeID); err != nil { - return err +// GetMinResolvedTS returns the min resolved ts of all stores. +func (c *RaftCluster) GetMinResolvedTS() uint64 { + c.RLock() + defer c.RUnlock() + if !c.isInitialized() { + return math.MaxUint64 + } + min := uint64(math.MaxUint64) + for _, s := range c.GetStores() { + if !core.IsAvailableForMinResolvedTS(s) { + continue + } + if min > s.GetMinResolvedTS() { + min = s.GetMinResolvedTS() + } + } + return min +} + +// SetMinResolvedTS sets up a store with min resolved ts. +func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error { + c.Lock() + defer c.Unlock() + + store := c.GetStore(storeID) + if store == nil { + return errs.ErrStoreNotFound.FastGenByArgs(storeID) + } + + newStore := store.Clone( + core.SetMinResolvedTS(minResolvedTS), + ) + + return c.putStoreLocked(newStore) +} + +func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) { + defer logutil.LogPanic() + defer c.wg.Done() + + ticker := time.NewTicker(saveInterval) + defer ticker.Stop() + for { + select { + case <-c.ctx.Done(): + log.Info("min resolved ts background jobs has been stopped") + return + case <-ticker.C: + minResolvedTS := c.GetMinResolvedTS() + if minResolvedTS != math.MaxUint64 { + c.Lock() + defer c.Unlock() + c.storage.SaveMinResolvedTS(minResolvedTS) + } } } - return nil } // SetStoreLimit sets a store limit for a given type and rate. diff --git a/server/core/store.go b/server/core/store.go index c30283cad36..b9e28dc52fe 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -60,16 +60,18 @@ type StoreInfo struct { leaderWeight float64 regionWeight float64 limiter map[storelimit.Type]*storelimit.StoreLimit + minResolvedTS uint64 } // NewStoreInfo creates StoreInfo with meta data. func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { storeInfo := &StoreInfo{ - meta: store, - storeStats: newStoreStats(), - leaderWeight: 1.0, - regionWeight: 1.0, - limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + meta: store, + storeStats: newStoreStats(), + leaderWeight: 1.0, + regionWeight: 1.0, + limiter: make(map[storelimit.Type]*storelimit.StoreLimit), + minResolvedTS: math.MaxUint64, } for _, opt := range opts { opt(storeInfo) @@ -94,6 +96,7 @@ func (s *StoreInfo) Clone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -118,6 +121,7 @@ func (s *StoreInfo) ShallowClone(opts ...StoreCreateOption) *StoreInfo { leaderWeight: s.leaderWeight, regionWeight: s.regionWeight, limiter: s.limiter, + minResolvedTS: s.minResolvedTS, } for _, opt := range opts { @@ -472,6 +476,11 @@ func (s *StoreInfo) GetUptime() time.Duration { return 0 } +// GetMinResolvedTS returns min resolved ts. +func (s *StoreInfo) GetMinResolvedTS() uint64 { + return s.minResolvedTS +} + var ( // If a store's last heartbeat is storeDisconnectDuration ago, the store will // be marked as disconnected state. The value should be greater than tikv's @@ -714,7 +723,7 @@ func (s *StoresInfo) UpdateStoreStatus(storeID uint64, leaderCount int, regionCo } } -// IsStoreContainLabel return if the store contains the given label. +// IsStoreContainLabel returns if the store contains the given label. func IsStoreContainLabel(store *metapb.Store, key, value string) bool { for _, l := range store.GetLabels() { if l.GetKey() == key && l.GetValue() == value { @@ -723,3 +732,8 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool { } return false } + +// IsAvailableForMinResolvedTS returns if the store is available for min resolved ts. +func IsAvailableForMinResolvedTS(s *StoreInfo) bool { + return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0 +} diff --git a/server/core/store_option.go b/server/core/store_option.go index 649dd2feb67..b445e9da735 100644 --- a/server/core/store_option.go +++ b/server/core/store_option.go @@ -217,6 +217,13 @@ func SetNewStoreStats(stats *pdpb.StoreStats) StoreCreateOption { } } +// SetMinResolvedTS sets min resolved ts for the store. +func SetMinResolvedTS(minResolvedTS uint64) StoreCreateOption { + return func(store *StoreInfo) { + store.minResolvedTS = minResolvedTS + } +} + // ResetStoreLimit resets the store limit for a store. func ResetStoreLimit(limitType storelimit.Type, ratePerSec ...float64) StoreCreateOption { return func(store *StoreInfo) { diff --git a/server/grpc_service.go b/server/grpc_service.go index 4a1ea8825e9..c2a09a68de3 100644 --- a/server/grpc_service.go +++ b/server/grpc_service.go @@ -1841,17 +1841,14 @@ func (s *GrpcServer) ReportMinResolvedTS(ctx context.Context, request *pdpb.Repo return &pdpb.ReportMinResolvedTsResponse{Header: s.notBootstrappedHeader()}, nil } - var storage endpoint.MinResolvedTSStorage = s.storage storeID := request.StoreId minResolvedTS := request.MinResolvedTs - - if err := storage.SaveMinResolvedTS(storeID, minResolvedTS); err != nil { + if err := rc.SetMinResolvedTS(storeID, minResolvedTS); err != nil { return nil, err } log.Debug("updated min resolved-ts", zap.Uint64("store", storeID), zap.Uint64("min resolved-ts", minResolvedTS)) - return &pdpb.ReportMinResolvedTsResponse{ Header: s.header(), }, nil diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 4cd5763e3a4..8dc10d6c220 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -99,12 +99,7 @@ func gcSafePointServicePath(serviceID string) string { return path.Join(gcSafePointPath(), "service", serviceID) } -// MinResolvedTSPath returns the min resolved ts with the given store ID. -func MinResolvedTSPath(storeID uint64) string { - return path.Join(minResolvedTS, fmt.Sprintf("%020d", storeID)) -} - -// MinResolvedTSPrefixPath returns the min resolved ts key path prefix. -func MinResolvedTSPrefixPath() string { - return minResolvedTS + "/" +// MinResolvedTSPath returns the min resolved ts path +func MinResolvedTSPath() string { + return path.Join(minResolvedTS) } diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go index fea649dd256..12cf21da40a 100644 --- a/server/storage/endpoint/min_resolved_ts.go +++ b/server/storage/endpoint/min_resolved_ts.go @@ -15,11 +15,7 @@ package endpoint import ( - "math" - "path" "strconv" - - "go.etcd.io/etcd/clientv3" ) // MinResolvedTSPoint is the min resolved ts for a store @@ -31,18 +27,15 @@ type MinResolvedTSPoint struct { // MinResolvedTSStorage defines the storage operations on the min resolved ts. type MinResolvedTSStorage interface { - LoadMinResolvedTS(storeID uint64) (uint64, error) - SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error - LoadClusterMinResolvedTS() (uint64, error) - LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) - RemoveMinResolvedTS(storeID uint64) error + LoadMinResolvedTS() (uint64, error) + SaveMinResolvedTS(minResolvedTS uint64) error } var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) -// LoadMinResolvedTS loads the min resolved ts with the given store ID from storage. -func (se *StorageEndpoint) LoadMinResolvedTS(storeID uint64) (uint64, error) { - value, err := se.Load(MinResolvedTSPath(storeID)) +// LoadMinResolvedTS loads the min resolved ts from storage. +func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { + value, err := se.Load(MinResolvedTSPath()) if err != nil { return 0, err } @@ -56,71 +49,8 @@ func (se *StorageEndpoint) LoadMinResolvedTS(storeID uint64) (uint64, error) { return minResolvedTS, nil } -// SaveMinResolvedTS saves the min resolved ts with the given store ID to storage. -func (se *StorageEndpoint) SaveMinResolvedTS(storeID uint64, minResolvedTS uint64) error { +// SaveMinResolvedTS saves the min resolved ts. +func (se *StorageEndpoint) SaveMinResolvedTS(minResolvedTS uint64) error { value := strconv.FormatUint(minResolvedTS, 16) - return se.Save(MinResolvedTSPath(storeID), value) -} - -// LoadClusterMinResolvedTS returns the min resolved ts for the cluster -func (se *StorageEndpoint) LoadClusterMinResolvedTS() (uint64, error) { - prefix := MinResolvedTSPrefixPath() - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return math.MaxUint64, err - } - if len(keys) == 0 { - // There's no service safepoint. It may be a new cluster, or upgraded from an older version - return 0, nil - } - - min := uint64(math.MaxUint64) - for i := range keys { - var ts uint64 - if ts, err = strconv.ParseUint(values[i], 16, 64); err != nil { - return min, err - } - if ts < min { - min = ts - } - } - return min, nil -} - -// LoadAllMinResolvedTS returns min resolved ts of all stores. -func (se *StorageEndpoint) LoadAllMinResolvedTS() ([]*MinResolvedTSPoint, error) { - prefix := MinResolvedTSPrefixPath() - prefixEnd := clientv3.GetPrefixRangeEnd(prefix) - keys, values, err := se.LoadRange(prefix, prefixEnd, 0) - if err != nil { - return nil, err - } - if len(keys) == 0 { - return []*MinResolvedTSPoint{}, nil - } - - tss := make([]*MinResolvedTSPoint, 0, len(keys)) - for i, key := range keys { - var minResolvedTS, storeID uint64 - if minResolvedTS, err = strconv.ParseUint(values[i], 16, 64); err != nil { - return nil, err - } - if storeID, err = strconv.ParseUint(path.Base(key), 16, 64); err != nil { - return nil, err - } - ts := &MinResolvedTSPoint{ - StoreID: storeID, - MinResolvedTS: minResolvedTS, - } - tss = append(tss, ts) - } - - return tss, nil -} - -// RemoveMinResolvedTS removes min resolved ts for the store -func (se *StorageEndpoint) RemoveMinResolvedTS(storeID uint64) error { - key := MinResolvedTSPath(storeID) - return se.Remove(key) + return se.Save(MinResolvedTSPath(), value) } diff --git a/server/storage/storage_test.go b/server/storage/storage_test.go index feb08f0a3a6..51870a62133 100644 --- a/server/storage/storage_test.go +++ b/server/storage/storage_test.go @@ -279,31 +279,3 @@ func (s *testStorageSuite) TestLoadRegionsExceedRangeLimit(c *C) { } c.Assert(failpoint.Disable("github.com/tikv/pd/server/storage/kv/withRangeLimit"), IsNil) } - -func (s *testStorageSuite) TestMinResolvedTSStorage(c *C) { - storage := NewStorageWithMemoryBackend() - testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} - - r, e := storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - err := storage.SaveMinResolvedTS(storeID, minResolvedTS) - c.Assert(err, IsNil) - minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(minResolvedTS, Equals, minResolvedTS1) - min, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(min, Equals, minResolvedTS) - } - for i := range testData { - storeID := uint64(i + 1) - err := storage.RemoveMinResolvedTS(storeID) - c.Assert(err, IsNil) - } - r, e = storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) -} diff --git a/tests/client/client_test.go b/tests/client/client_test.go index 41d18b987d0..e7301372d47 100644 --- a/tests/client/client_test.go +++ b/tests/client/client_test.go @@ -1164,38 +1164,6 @@ func (s *testClientSuite) TestScatterRegion(c *C) { c.Succeed() } -func (s *testClientSuite) TestReportMinResolvedTS(c *C) { - storage := s.srv.GetStorage() - testData := []uint64{math.MaxUint64, 233333, 23333, 2333, 233, 1} - r, e := storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) - for i, minResolvedTS := range testData { - storeID := uint64(i + 1) - req := &pdpb.ReportMinResolvedTsRequest{ - Header: newHeader(s.srv), - StoreId: storeID, - MinResolvedTs: minResolvedTS, - } - _, err := s.grpcSvr.ReportMinResolvedTS(context.Background(), req) - c.Assert(err, IsNil) - minResolvedTS1, err := storage.LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(minResolvedTS, Equals, minResolvedTS1) - min, err := storage.LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(min, Equals, minResolvedTS) - } - for i := range testData { - storeID := uint64(i + 1) - err := storage.RemoveMinResolvedTS(storeID) - c.Assert(err, IsNil) - } - r, e = storage.LoadClusterMinResolvedTS() - c.Assert(r, Equals, uint64(0)) - c.Assert(e, IsNil) -} - type testConfigTTLSuite struct { ctx context.Context cancel context.CancelFunc diff --git a/tests/client/go.sum b/tests/client/go.sum index 57569d5ee6b..be32c4b8e0c 100644 --- a/tests/client/go.sum +++ b/tests/client/go.sum @@ -403,6 +403,7 @@ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00 h1:C3N3itkduZXDZ github.com/pingcap/failpoint v0.0.0-20210918120811-547c13e3eb00/go.mod h1:4qGtCB0QK0wBzKtFEGDhxXnSnbQApw1gc9siScUl8ew= github.com/pingcap/kvproto v0.0.0-20191211054548-3c6b38ea5107/go.mod h1:WWLmULLO7l8IOcQG+t+ItJ3fEcrL5FxF0Wu+HrMy26w= github.com/pingcap/kvproto v0.0.0-20200411081810-b85805c9476c/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= +github.com/pingcap/kvproto v0.0.0-20220302110454-c696585a961b/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a h1:0ZnJ8JPtPVGG3qF1G9Kz0NYDEj8BraNEJeQlmwUF6BA= github.com/pingcap/kvproto v0.0.0-20220309094445-a78dc9fdb89a/go.mod h1:IOdRDPLyda8GX2hE/jO7gqaCV/PNFh8BZQCQZXfIOqI= github.com/pingcap/log v0.0.0-20191012051959-b742a5d432e9/go.mod h1:4rbK1p9ILyIfb6hU7OG2CiWSqMXnp3JMbiaVJ6mvoY8= diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index dcc27e65e9c..474154e86d8 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -17,6 +17,8 @@ package cluster_test import ( "context" "fmt" + "math" + "strconv" "sync" "testing" "time" @@ -245,7 +247,7 @@ func resetStoreState(c *C, rc *cluster.RaftCluster, storeID uint64, state metapb if state == metapb.StoreState_Offline { rc.SetStoreLimit(storeID, storelimit.RemovePeer, storelimit.Unlimited) } else if state == metapb.StoreState_Tombstone { - rc.CleanTombstoneResidualInfo(storeID) + rc.RemoveStoreLimit(storeID) } } @@ -1116,7 +1118,7 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { c.Assert(err, IsNil) } -func (s *clusterTestSuite) TestMinResolvedTSWithTombstone(c *C) { +func (s *clusterTestSuite) TestMinResolvedTS(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy() c.Assert(err, IsNil) @@ -1130,31 +1132,76 @@ func (s *clusterTestSuite) TestMinResolvedTSWithTombstone(c *C) { bootstrapCluster(c, clusterID, grpcPDClient) rc := leaderServer.GetRaftCluster() c.Assert(rc, NotNil) - - req := &pdpb.GetAllStoresRequest{ - Header: testutil.NewRequestHeader(clusterID), + c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil) + addStoreWithMinResolvedTS := func(c *C, storeID uint64, isTiflash bool, minResolvedTS, expect uint64) { + store := &metapb.Store{ + Id: storeID, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(storeID)), + } + if isTiflash { + store.Labels = []*metapb.StoreLabel{{Key: "engine", Value: "tiflash"}} + } + _, err := putStore(grpcPDClient, clusterID, store) + c.Assert(err, IsNil) + req := &pdpb.ReportMinResolvedTsRequest{ + Header: testutil.NewRequestHeader(clusterID), + StoreId: storeID, + MinResolvedTs: minResolvedTS, + } + _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req) + c.Assert(err, IsNil) + time.Sleep(time.Millisecond * 10) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, expect) } - resp, err := grpcPDClient.GetAllStores(context.Background(), req) - c.Assert(err, IsNil) - c.Assert(resp.Stores, HasLen, 1) - storeID := resp.Stores[0].GetId() - - min := uint64(233) - req2 := &pdpb.ReportMinResolvedTsRequest{ - Header: testutil.NewRequestHeader(clusterID), - StoreId: storeID, - MinResolvedTs: min, + store1TS := uint64(233) + store3TS := store1TS - 10 + // case1: no init + // min resolved ts should be not available + store1 := uint64(1) + status, err := rc.LoadClusterStatus() + c.Assert(status.IsInitialized, IsFalse) + addStoreWithMinResolvedTS(c, store1, false, store1TS, math.MaxUint64) + // case2: add region + // min resolved ts should be available + for i := 0; i < 3; i++ { + region := &metapb.Region{ + Id: uint64(4 + i), + Peers: []*metapb.Peer{{Id: uint64(10 + i), StoreId: store1}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) } - _, err = grpcPDClient.ReportMinResolvedTS(context.Background(), req2) - c.Assert(err, IsNil) - ts0, err := rc.GetStorage().LoadMinResolvedTS(storeID) - c.Assert(err, IsNil) - c.Assert(ts0, Equals, min) - ts1, err := rc.GetStorage().LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(ts1, Equals, min) - resetStoreState(c, rc, storeID, metapb.StoreState_Tombstone) - ts2, err := rc.GetStorage().LoadClusterMinResolvedTS() - c.Assert(err, IsNil) - c.Assert(ts2, Equals, uint64(0)) + c.Assert(rc.GetStore(store1).GetLeaderCount(), Equals, 3) + ts := rc.GetMinResolvedTS() + c.Assert(ts, Equals, store1TS) + // case2: add tiflash store + // min resolved ts should no change + store2 := uint64(2) + addStoreWithMinResolvedTS(c, store2, true, 0, store1TS) + // case4: add new store with less ts but without leader + // min resolved ts should no change + store3 := uint64(3) + addStoreWithMinResolvedTS(c, store3, false, store3TS, store1TS) + // case5: transfer region leader to store 3 + // min resolved ts should change to store 3 + region := &metapb.Region{ + Id: uint64(20), + Peers: []*metapb.Peer{{Id: uint64(21), StoreId: store3}}, + StartKey: []byte{byte(20)}, + EndKey: []byte{byte(21)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + c.Assert(rc.GetStore(store3).GetLeaderCount(), Equals, 1) + ts = rc.GetMinResolvedTS() + c.Assert(ts, Equals, store3TS) + // case6: set tombstone + // min resolved ts should change to store 1 + resetStoreState(c, rc, store3, metapb.StoreState_Tombstone) + time.Sleep(time.Millisecond * 10) + ts = rc.GetMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts, Equals, store1TS) } From e65e1eb823aa5cab91d10a0ea3a8b3eb844117a3 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Mar 2022 19:56:24 +0800 Subject: [PATCH 03/10] address comment Signed-off-by: lhy1024 --- server/api/config_test.go | 1 + server/api/min_resolved_ts_test.go | 6 ++++-- server/cluster/cluster.go | 10 ++++++---- server/config/config.go | 18 ++++++++++++------ server/config/persist_options.go | 5 +++++ server/storage/endpoint/min_resolved_ts.go | 1 - tests/pdctl/config/config_test.go | 6 ++++++ 7 files changed, 34 insertions(+), 13 deletions(-) diff --git a/server/api/config_test.go b/server/api/config_test.go index 31bdb8d4853..e8cd7573588 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -299,6 +299,7 @@ func (s *testConfigSuite) TestConfigPDServer(c *C) { c.Assert(sc.MetricStorage, Equals, "") c.Assert(sc.DashboardAddress, Equals, "auto") c.Assert(sc.FlowRoundByDigit, Equals, int(3)) + c.Assert(sc.SaveMinResolvedTSInterval, Equals, typeutil.NewDuration(0)) c.Assert(sc.MaxResetTSGap.Duration, Equals, 24*time.Hour) } diff --git a/server/api/min_resolved_ts_test.go b/server/api/min_resolved_ts_test.go index 595ea42b1b9..a5624fe41e4 100644 --- a/server/api/min_resolved_ts_test.go +++ b/server/api/min_resolved_ts_test.go @@ -18,6 +18,7 @@ import ( "fmt" . "github.com/pingcap/check" + "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/metapb" "github.com/tikv/pd/pkg/apiutil" "github.com/tikv/pd/server" @@ -32,6 +33,7 @@ type testMinResolvedTSSuite struct { } func (s *testMinResolvedTSSuite) SetUpSuite(c *C) { + c.Assert(failpoint.Enable("github.com/tikv/pd/server/highFrequencyClusterJobs", `return(true)`), IsNil) s.svr, s.cleanup = mustNewServer(c) mustWaitLeader(c, []*server.Server{s.svr}) @@ -51,13 +53,13 @@ func (s *testMinResolvedTSSuite) TestMinResolvedTS(c *C) { storage := s.svr.GetStorage() min := uint64(233) storage.SaveMinResolvedTS(min) - result := &listMinResolvedTS{ + result := &minResolvedTS{ MinResolvedTS: min, } res, err := testDialClient.Get(url) c.Assert(err, IsNil) defer res.Body.Close() - listResp := &listMinResolvedTS{} + listResp := &minResolvedTS{} err = apiutil.ReadJSON(res.Body, listResp) c.Assert(err, IsNil) c.Assert(listResp, DeepEquals, result) diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index 44fccfd76f0..ed6b023273c 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -54,7 +54,6 @@ import ( ) var backgroundJobInterval = 10 * time.Second -var saveMinResolvedTSInterval = 1 * time.Second const ( clientTimeout = 3 * time.Second @@ -252,6 +251,7 @@ func (c *RaftCluster) Start(s Server) error { c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) + saveMinResolvedTSInterval := c.opt.GetSaveMinResolvedTSInterval() c.wg.Add(6) go c.runCoordinator() @@ -1691,9 +1691,11 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error { } func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) { - defer logutil.LogPanic() defer c.wg.Done() - + if saveInterval == 0 { + return + } + defer logutil.LogPanic() ticker := time.NewTicker(saveInterval) defer ticker.Stop() for { @@ -1705,8 +1707,8 @@ func (c *RaftCluster) runMinResolvedTSJob(saveInterval time.Duration) { minResolvedTS := c.GetMinResolvedTS() if minResolvedTS != math.MaxUint64 { c.Lock() - defer c.Unlock() c.storage.SaveMinResolvedTS(minResolvedTS) + c.Unlock() } } } diff --git a/server/config/config.go b/server/config/config.go index a6f4a4288fb..29496421a36 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -228,12 +228,13 @@ const ( defaultLeaderPriorityCheckInterval = time.Minute - defaultUseRegionStorage = true - defaultTraceRegionFlow = true - defaultFlowRoundByDigit = 3 // KB - maxTraceFlowRoundByDigit = 5 // 0.1 MB - defaultMaxResetTSGap = 24 * time.Hour - defaultKeyType = "table" + defaultUseRegionStorage = true + defaultTraceRegionFlow = true + defaultFlowRoundByDigit = 3 // KB + maxTraceFlowRoundByDigit = 5 // 0.1 MB + defaultMaxResetTSGap = 24 * time.Hour + defaultSaveMinResolvedTSInterval = 0 + defaultKeyType = "table" defaultStrictlyMatchLabel = false defaultEnablePlacementRules = true @@ -1102,6 +1103,8 @@ type PDServerConfig struct { TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"` // FlowRoundByDigit used to discretization processing flow information. FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"` + // SaveMinResolvedTSInterval is the interval to save the min resolved ts. + SaveMinResolvedTSInterval typeutil.Duration `toml:"save-min-resolved-ts-interval" json:"save-min-resolved-ts-interval"` } func (c *PDServerConfig) adjust(meta *configMetaData) error { @@ -1124,6 +1127,9 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("flow-round-by-digit") { adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) } + if !meta.IsDefined("save-min-resolved-ts-interval") { + adjustDuration(&c.SaveMinResolvedTSInterval, defaultSaveMinResolvedTSInterval) + } c.migrateConfigurationFromFile(meta) return c.Validate() } diff --git a/server/config/persist_options.go b/server/config/persist_options.go index a8672b6712e..70048fdfb73 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -653,6 +653,11 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa return false } +// GetSaveMinResolvedTSInterval gets the interval for PD to save min resolved ts. +func (o *PersistOptions) GetSaveMinResolvedTSInterval() time.Duration { + return o.GetPDServerConfig().SaveMinResolvedTSInterval.Duration +} + const ttlConfigPrefix = "/config/ttl" // SetTTLData set temporary configuration diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go index 12cf21da40a..e0f3b47107a 100644 --- a/server/storage/endpoint/min_resolved_ts.go +++ b/server/storage/endpoint/min_resolved_ts.go @@ -21,7 +21,6 @@ import ( // MinResolvedTSPoint is the min resolved ts for a store // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. type MinResolvedTSPoint struct { - StoreID uint64 `json:"store_id"` MinResolvedTS uint64 `json:"min_resolved_ts"` } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 799a9e47151..00897fa70e3 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -186,6 +186,12 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) + // config set save-min-resolved-ts-interval + args = []string{"-u", pdAddr, "config", "set", "save-min-resolved-ts-interval", "1s"} + _, err = pdctl.ExecuteCommand(cmd, args...) + c.Assert(err, IsNil) + c.Assert(svr.GetPDServerConfig().SaveMinResolvedTSInterval, Equals, typeutil.NewDuration(time.Second)) + // test config read and write testItems := []testItem{ {"leader-schedule-limit", uint64(64), func(scheduleConfig *config.ScheduleConfig) interface{} { From 4916c6addadee5ffcdc4b5f6f45414cc50d02ff6 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Mar 2022 20:09:12 +0800 Subject: [PATCH 04/10] address comment Signed-off-by: lhy1024 --- server/api/min_resolved_ts.go | 15 +++++++-------- server/api/router.go | 4 ++-- 2 files changed, 9 insertions(+), 10 deletions(-) diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index a28e9c8df8a..d7ad4455c5e 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -34,25 +34,24 @@ func newMinResolvedTSHandler(svr *server.Server, rd *render.Render) *minResolved } // NOTE: This type is exported by HTTP API. Please pay more attention when modifying it. -type listMinResolvedTS struct { +type minResolvedTS struct { MinResolvedTS uint64 `json:"min_resolved_ts"` } // @Tags minresolvedts // @Summary Get min resolved ts. // @Produce json -// @Success 200 {array} listMinResolvedTS +// @Success 200 {array} minResolvedTS // @Failure 500 {string} string "PD server failed to proceed the request." // @Router /min-resolved-ts [get] -func (h *minResolvedTSHandler) List(w http.ResponseWriter, r *http.Request) { +func (h *minResolvedTSHandler) Get(w http.ResponseWriter, r *http.Request) { storage := h.svr.GetStorage() - minResolvedTS, err := storage.LoadMinResolvedTS() + value, err := storage.LoadMinResolvedTS() if err != nil { h.rd.JSON(w, http.StatusInternalServerError, err.Error()) return } - list := listMinResolvedTS{ - MinResolvedTS: minResolvedTS, - } - h.rd.JSON(w, http.StatusOK, list) + h.rd.JSON(w, http.StatusOK, minResolvedTS{ + MinResolvedTS: value, + }) } diff --git a/server/api/router.go b/server/api/router.go index aaada96e600..40d546af530 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -352,8 +352,8 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { registerFunc(apiRouter, "DeleteGCSafePoint", "/gc/safepoint/{service_id}", serviceGCSafepointHandler.Delete, setMethods("DELETE"), setAuditBackend(localLog)) // min resolved ts API - MinResolvedTSHandler := newMinResolvedTSHandler(svr, rd) - registerFunc(apiRouter, "GetMinResolvedTS", "/min-resolved-ts", MinResolvedTSHandler.List, setMethods("GET"), setAuditBackend(localLog)) + minResolvedTSHandler := newMinResolvedTSHandler(svr, rd) + registerFunc(apiRouter, "GetMinResolvedTS", "/min-resolved-ts", minResolvedTSHandler.Get, setMethods("GET")) // unsafe admin operation API unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) From a6902cc64371555d9a22e966b8746403179624ab Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Mon, 14 Mar 2022 20:45:41 +0800 Subject: [PATCH 05/10] fix test Signed-off-by: lhy1024 --- server/api/router.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/api/router.go b/server/api/router.go index 54515bd8322..6e3d3249ec3 100644 --- a/server/api/router.go +++ b/server/api/router.go @@ -352,7 +352,7 @@ func createRouter(prefix string, svr *server.Server) *mux.Router { // min resolved ts API minResolvedTSHandler := newMinResolvedTSHandler(svr, rd) - registerFunc(apiRouter, "GetMinResolvedTS", "/min-resolved-ts", minResolvedTSHandler.Get, setMethods("GET")) + registerFunc(apiRouter, "/min-resolved-ts", minResolvedTSHandler.Get, setMethods("GET")) // unsafe admin operation API unsafeOperationHandler := newUnsafeOperationHandler(svr, rd) From 859def82d7deddc605437ff646d114822c737d04 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 15 Mar 2022 12:22:09 +0800 Subject: [PATCH 06/10] address comment Signed-off-by: lhy1024 --- server/storage/endpoint/min_resolved_ts.go | 11 ++--------- 1 file changed, 2 insertions(+), 9 deletions(-) diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go index e0f3b47107a..49e60e38980 100644 --- a/server/storage/endpoint/min_resolved_ts.go +++ b/server/storage/endpoint/min_resolved_ts.go @@ -35,17 +35,10 @@ var _ MinResolvedTSStorage = (*StorageEndpoint)(nil) // LoadMinResolvedTS loads the min resolved ts from storage. func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { value, err := se.Load(MinResolvedTSPath()) - if err != nil { + if err != nil || value == "" { return 0, err } - if value == "" { - return 0, nil - } - minResolvedTS, err := strconv.ParseUint(value, 16, 64) - if err != nil { - return 0, err - } - return minResolvedTS, nil + return strconv.ParseUint(value, 16, 64) } // SaveMinResolvedTS saves the min resolved ts. From 0a684b0b04c2dea0fecd8523fee68bea8ef2f4dd Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 15 Mar 2022 12:37:58 +0800 Subject: [PATCH 07/10] wrap err Signed-off-by: lhy1024 --- server/storage/endpoint/min_resolved_ts.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/server/storage/endpoint/min_resolved_ts.go b/server/storage/endpoint/min_resolved_ts.go index 49e60e38980..a8dd5c48538 100644 --- a/server/storage/endpoint/min_resolved_ts.go +++ b/server/storage/endpoint/min_resolved_ts.go @@ -16,6 +16,8 @@ package endpoint import ( "strconv" + + "github.com/tikv/pd/pkg/errs" ) // MinResolvedTSPoint is the min resolved ts for a store @@ -38,7 +40,11 @@ func (se *StorageEndpoint) LoadMinResolvedTS() (uint64, error) { if err != nil || value == "" { return 0, err } - return strconv.ParseUint(value, 16, 64) + minResolvedTS, err := strconv.ParseUint(value, 16, 64) + if err != nil { + return 0, errs.ErrStrconvParseUint.Wrap(err).GenWithStackByArgs() + } + return minResolvedTS, nil } // SaveMinResolvedTS saves the min resolved ts. From 60894ee35d6dd623a61f20cabe3d089e8d12d234 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 15 Mar 2022 18:05:12 +0800 Subject: [PATCH 08/10] address comment Signed-off-by: lhy1024 --- server/storage/endpoint/key_path.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/storage/endpoint/key_path.go b/server/storage/endpoint/key_path.go index 8dc10d6c220..1f5e05601cf 100644 --- a/server/storage/endpoint/key_path.go +++ b/server/storage/endpoint/key_path.go @@ -101,5 +101,5 @@ func gcSafePointServicePath(serviceID string) string { // MinResolvedTSPath returns the min resolved ts path func MinResolvedTSPath() string { - return path.Join(minResolvedTS) + return path.Join(clusterPath, minResolvedTS) } From ea87c0c8f841e18b68f864007c461f8f99be4ba7 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Tue, 15 Mar 2022 20:26:14 +0800 Subject: [PATCH 09/10] address comment Signed-off-by: lhy1024 --- server/api/config_test.go | 2 +- server/api/min_resolved_ts.go | 2 +- server/cluster/cluster.go | 6 +++--- server/config/config.go | 22 +++++++++++----------- server/config/persist_options.go | 6 +++--- server/core/store.go | 2 ++ tests/pdctl/config/config_test.go | 6 +++--- 7 files changed, 24 insertions(+), 22 deletions(-) diff --git a/server/api/config_test.go b/server/api/config_test.go index e8cd7573588..efa31ba05c2 100644 --- a/server/api/config_test.go +++ b/server/api/config_test.go @@ -299,7 +299,7 @@ func (s *testConfigSuite) TestConfigPDServer(c *C) { c.Assert(sc.MetricStorage, Equals, "") c.Assert(sc.DashboardAddress, Equals, "auto") c.Assert(sc.FlowRoundByDigit, Equals, int(3)) - c.Assert(sc.SaveMinResolvedTSInterval, Equals, typeutil.NewDuration(0)) + c.Assert(sc.MinResolvedTSPersistenceInterval, Equals, typeutil.NewDuration(0)) c.Assert(sc.MaxResetTSGap.Duration, Equals, 24*time.Hour) } diff --git a/server/api/min_resolved_ts.go b/server/api/min_resolved_ts.go index d7ad4455c5e..33f4c68c4c9 100644 --- a/server/api/min_resolved_ts.go +++ b/server/api/min_resolved_ts.go @@ -39,7 +39,7 @@ type minResolvedTS struct { } // @Tags minresolvedts -// @Summary Get min resolved ts. +// @Summary Get cluster-level min resolved ts. // @Produce json // @Success 200 {array} minResolvedTS // @Failure 500 {string} string "PD server failed to proceed the request." diff --git a/server/cluster/cluster.go b/server/cluster/cluster.go index ed6b023273c..7ea33714831 100644 --- a/server/cluster/cluster.go +++ b/server/cluster/cluster.go @@ -251,19 +251,19 @@ func (c *RaftCluster) Start(s Server) error { c.regionStats = statistics.NewRegionStatistics(c.opt, c.ruleManager) c.limiter = NewStoreLimiter(s.GetPersistOptions()) c.unsafeRecoveryController = newUnsafeRecoveryController(cluster) - saveMinResolvedTSInterval := c.opt.GetSaveMinResolvedTSInterval() + minResolvedTSPersistenceInterval := c.opt.GetMinResolvedTSPersistenceInterval() c.wg.Add(6) go c.runCoordinator() failpoint.Inject("highFrequencyClusterJobs", func() { backgroundJobInterval = 100 * time.Microsecond - saveMinResolvedTSInterval = 1 * time.Microsecond + minResolvedTSPersistenceInterval = 1 * time.Microsecond }) go c.runBackgroundJobs(backgroundJobInterval) go c.runStatsBackgroundJobs() go c.syncRegions() go c.runReplicationMode() - go c.runMinResolvedTSJob(saveMinResolvedTSInterval) + go c.runMinResolvedTSJob(minResolvedTSPersistenceInterval) c.running = true return nil diff --git a/server/config/config.go b/server/config/config.go index 29496421a36..54313f0970f 100644 --- a/server/config/config.go +++ b/server/config/config.go @@ -228,13 +228,13 @@ const ( defaultLeaderPriorityCheckInterval = time.Minute - defaultUseRegionStorage = true - defaultTraceRegionFlow = true - defaultFlowRoundByDigit = 3 // KB - maxTraceFlowRoundByDigit = 5 // 0.1 MB - defaultMaxResetTSGap = 24 * time.Hour - defaultSaveMinResolvedTSInterval = 0 - defaultKeyType = "table" + defaultUseRegionStorage = true + defaultTraceRegionFlow = true + defaultFlowRoundByDigit = 3 // KB + maxTraceFlowRoundByDigit = 5 // 0.1 MB + defaultMaxResetTSGap = 24 * time.Hour + defaultMinResolvedTSPersistenceInterval = 0 + defaultKeyType = "table" defaultStrictlyMatchLabel = false defaultEnablePlacementRules = true @@ -1103,8 +1103,8 @@ type PDServerConfig struct { TraceRegionFlow bool `toml:"trace-region-flow" json:"trace-region-flow,string,omitempty"` // FlowRoundByDigit used to discretization processing flow information. FlowRoundByDigit int `toml:"flow-round-by-digit" json:"flow-round-by-digit"` - // SaveMinResolvedTSInterval is the interval to save the min resolved ts. - SaveMinResolvedTSInterval typeutil.Duration `toml:"save-min-resolved-ts-interval" json:"save-min-resolved-ts-interval"` + // MinResolvedTSPersistenceInterval is the interval to save the min resolved ts. + MinResolvedTSPersistenceInterval typeutil.Duration `toml:"min-resolved-ts-persistence-interval" json:"min-resolved-ts-persistence-interval"` } func (c *PDServerConfig) adjust(meta *configMetaData) error { @@ -1127,8 +1127,8 @@ func (c *PDServerConfig) adjust(meta *configMetaData) error { if !meta.IsDefined("flow-round-by-digit") { adjustInt(&c.FlowRoundByDigit, defaultFlowRoundByDigit) } - if !meta.IsDefined("save-min-resolved-ts-interval") { - adjustDuration(&c.SaveMinResolvedTSInterval, defaultSaveMinResolvedTSInterval) + if !meta.IsDefined("min-resolved-ts-persistence-interval") { + adjustDuration(&c.MinResolvedTSPersistenceInterval, defaultMinResolvedTSPersistenceInterval) } c.migrateConfigurationFromFile(meta) return c.Validate() diff --git a/server/config/persist_options.go b/server/config/persist_options.go index 70048fdfb73..d23ed7bf147 100644 --- a/server/config/persist_options.go +++ b/server/config/persist_options.go @@ -653,9 +653,9 @@ func (o *PersistOptions) CheckLabelProperty(typ string, labels []*metapb.StoreLa return false } -// GetSaveMinResolvedTSInterval gets the interval for PD to save min resolved ts. -func (o *PersistOptions) GetSaveMinResolvedTSInterval() time.Duration { - return o.GetPDServerConfig().SaveMinResolvedTSInterval.Duration +// GetMinResolvedTSPersistenceInterval gets the interval for PD to save min resolved ts. +func (o *PersistOptions) GetMinResolvedTSPersistenceInterval() time.Duration { + return o.GetPDServerConfig().MinResolvedTSPersistenceInterval.Duration } const ttlConfigPrefix = "/config/ttl" diff --git a/server/core/store.go b/server/core/store.go index b9e28dc52fe..cbfedb054ec 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -735,5 +735,7 @@ func IsStoreContainLabel(store *metapb.Store, key, value string) bool { // IsAvailableForMinResolvedTS returns if the store is available for min resolved ts. func IsAvailableForMinResolvedTS(s *StoreInfo) bool { + // If a store is tombstone or no leader, it is not meaningful for min resolved ts. + // And we will skip tiflash, because it does not report min resolved ts. return !s.IsRemoved() && !IsStoreContainLabel(s.GetMeta(), EngineKey, EngineTiFlash) && s.GetLeaderCount() != 0 } diff --git a/tests/pdctl/config/config_test.go b/tests/pdctl/config/config_test.go index 00897fa70e3..b39b95c254f 100644 --- a/tests/pdctl/config/config_test.go +++ b/tests/pdctl/config/config_test.go @@ -186,11 +186,11 @@ func (s *configTestSuite) TestConfig(c *C) { c.Assert(json.Unmarshal(output, &labelPropertyCfg), IsNil) c.Assert(labelPropertyCfg, DeepEquals, svr.GetLabelProperty()) - // config set save-min-resolved-ts-interval - args = []string{"-u", pdAddr, "config", "set", "save-min-resolved-ts-interval", "1s"} + // config set min-resolved-ts-persistence-interval + args = []string{"-u", pdAddr, "config", "set", "min-resolved-ts-persistence-interval", "1s"} _, err = pdctl.ExecuteCommand(cmd, args...) c.Assert(err, IsNil) - c.Assert(svr.GetPDServerConfig().SaveMinResolvedTSInterval, Equals, typeutil.NewDuration(time.Second)) + c.Assert(svr.GetPDServerConfig().MinResolvedTSPersistenceInterval, Equals, typeutil.NewDuration(time.Second)) // test config read and write testItems := []testItem{ From 5d6c96094c59d86fbf807f15d6fc695f788b1129 Mon Sep 17 00:00:00 2001 From: lhy1024 Date: Wed, 16 Mar 2022 19:11:20 +0800 Subject: [PATCH 10/10] add more test for store with leader but no report Signed-off-by: lhy1024 --- server/core/store.go | 2 +- tests/server/cluster/cluster_test.go | 56 ++++++++++++++++++---------- 2 files changed, 37 insertions(+), 21 deletions(-) diff --git a/server/core/store.go b/server/core/store.go index cbfedb054ec..b46d7ce0b54 100644 --- a/server/core/store.go +++ b/server/core/store.go @@ -71,7 +71,7 @@ func NewStoreInfo(store *metapb.Store, opts ...StoreCreateOption) *StoreInfo { leaderWeight: 1.0, regionWeight: 1.0, limiter: make(map[storelimit.Type]*storelimit.StoreLimit), - minResolvedTS: math.MaxUint64, + minResolvedTS: 0, } for _, opt := range opts { opt(storeInfo) diff --git a/tests/server/cluster/cluster_test.go b/tests/server/cluster/cluster_test.go index 474154e86d8..3f426ffb50f 100644 --- a/tests/server/cluster/cluster_test.go +++ b/tests/server/cluster/cluster_test.go @@ -37,6 +37,7 @@ import ( "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/core" "github.com/tikv/pd/server/core/storelimit" + "github.com/tikv/pd/server/id" syncer "github.com/tikv/pd/server/region_syncer" "github.com/tikv/pd/server/schedule/operator" "github.com/tikv/pd/server/schedulers" @@ -1118,6 +1119,23 @@ func (s *clusterTestSuite) TestStaleTermHeartbeat(c *C) { c.Assert(err, IsNil) } +func (s *clusterTestSuite) putRegionWithLeader(c *C, rc *cluster.RaftCluster, id id.Allocator, storeID uint64) { + for i := 0; i < 3; i++ { + regionID, err := id.Alloc() + c.Assert(err, IsNil) + peerID, err := id.Alloc() + c.Assert(err, IsNil) + region := &metapb.Region{ + Id: regionID, + Peers: []*metapb.Peer{{Id: peerID, StoreId: storeID}}, + StartKey: []byte{byte(i)}, + EndKey: []byte{byte(i + 1)}, + } + rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) + } + c.Assert(rc.GetStore(storeID).GetLeaderCount(), Equals, 3) +} + func (s *clusterTestSuite) TestMinResolvedTS(c *C) { tc, err := tests.NewTestCluster(s.ctx, 1) defer tc.Destroy() @@ -1127,6 +1145,7 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { c.Assert(err, IsNil) tc.WaitLeader() leaderServer := tc.GetServer(tc.GetLeader()) + id := leaderServer.GetAllocator() grpcPDClient := testutil.MustNewGrpcClient(c, leaderServer.GetAddr()) clusterID := leaderServer.GetClusterID() bootstrapCluster(c, clusterID, grpcPDClient) @@ -1163,18 +1182,9 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { status, err := rc.LoadClusterStatus() c.Assert(status.IsInitialized, IsFalse) addStoreWithMinResolvedTS(c, store1, false, store1TS, math.MaxUint64) - // case2: add region + // case2: add leader to store1 // min resolved ts should be available - for i := 0; i < 3; i++ { - region := &metapb.Region{ - Id: uint64(4 + i), - Peers: []*metapb.Peer{{Id: uint64(10 + i), StoreId: store1}}, - StartKey: []byte{byte(i)}, - EndKey: []byte{byte(i + 1)}, - } - rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) - } - c.Assert(rc.GetStore(store1).GetLeaderCount(), Equals, 3) + s.putRegionWithLeader(c, rc, id, store1) ts := rc.GetMinResolvedTS() c.Assert(ts, Equals, store1TS) // case2: add tiflash store @@ -1185,16 +1195,9 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { // min resolved ts should no change store3 := uint64(3) addStoreWithMinResolvedTS(c, store3, false, store3TS, store1TS) - // case5: transfer region leader to store 3 + // case5: add leader to store 3, store 3 has less ts than store 1. // min resolved ts should change to store 3 - region := &metapb.Region{ - Id: uint64(20), - Peers: []*metapb.Peer{{Id: uint64(21), StoreId: store3}}, - StartKey: []byte{byte(20)}, - EndKey: []byte{byte(21)}, - } - rc.HandleRegionHeartbeat(core.NewRegionInfo(region, region.Peers[0])) - c.Assert(rc.GetStore(store3).GetLeaderCount(), Equals, 1) + s.putRegionWithLeader(c, rc, id, store3) ts = rc.GetMinResolvedTS() c.Assert(ts, Equals, store3TS) // case6: set tombstone @@ -1204,4 +1207,17 @@ func (s *clusterTestSuite) TestMinResolvedTS(c *C) { ts = rc.GetMinResolvedTS() c.Assert(err, IsNil) c.Assert(ts, Equals, store1TS) + // case7: add a store with leader but no report min resolved ts + // min resolved ts should be zero + store4 := uint64(4) + _, err = putStore(grpcPDClient, clusterID, &metapb.Store{ + Id: store4, + Version: "v6.0.0", + Address: "127.0.0.1:" + strconv.Itoa(int(store4)), + }) + c.Assert(err, IsNil) + s.putRegionWithLeader(c, rc, id, store4) + ts = rc.GetMinResolvedTS() + c.Assert(err, IsNil) + c.Assert(ts, Equals, uint64(0)) }