From b0567e92a3e562fb9f3acdf96c408d17892dd08c Mon Sep 17 00:00:00 2001 From: Hu# Date: Mon, 13 Mar 2023 15:06:41 +0800 Subject: [PATCH] This is an automated cherry-pick of #6123 ref tikv/pd#6099 Signed-off-by: ti-chi-bot --- pkg/cache/ttl.go | 6 + pkg/gctuner/memory_limit_tuner.go | 149 +++++ pkg/mcs/discovery/register.go | 120 ++++ pkg/mcs/resource_manager/server/manager.go | 330 ++++++++++ pkg/mcs/resource_manager/server/server.go | 495 ++++++++++++++ pkg/mcs/tso/server/grpc_service.go | 354 ++++++++++ pkg/mcs/tso/server/server.go | 715 +++++++++++++++++++++ pkg/systimemon/systimemon.go | 2 + server/election/lease.go | 8 + server/region_syncer/client.go | 7 + server/schedule/region_scatterer.go | 22 + server/tso/allocator_manager.go | 12 + server/tso/global_allocator.go | 7 + 13 files changed, 2227 insertions(+) create mode 100644 pkg/gctuner/memory_limit_tuner.go create mode 100644 pkg/mcs/discovery/register.go create mode 100644 pkg/mcs/resource_manager/server/manager.go create mode 100644 pkg/mcs/resource_manager/server/server.go create mode 100644 pkg/mcs/tso/server/grpc_service.go create mode 100644 pkg/mcs/tso/server/server.go diff --git a/pkg/cache/ttl.go b/pkg/cache/ttl.go index 11dd1248370b..741e5847c190 100644 --- a/pkg/cache/ttl.go +++ b/pkg/cache/ttl.go @@ -19,7 +19,12 @@ import ( "time" "github.com/pingcap/log" +<<<<<<< HEAD "github.com/tikv/pd/pkg/syncutil" +======= + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)) "go.uber.org/zap" ) @@ -142,6 +147,7 @@ func (c *ttlCache) Clear() { } func (c *ttlCache) doGC() { + defer logutil.LogPanic() ticker := time.NewTicker(c.gcInterval) defer ticker.Stop() diff --git a/pkg/gctuner/memory_limit_tuner.go b/pkg/gctuner/memory_limit_tuner.go new file mode 100644 index 000000000000..77a8f9db4c62 --- /dev/null +++ b/pkg/gctuner/memory_limit_tuner.go @@ -0,0 +1,149 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package gctuner + +import ( + "math" + "runtime/debug" + "time" + + "github.com/pingcap/failpoint" + "github.com/pingcap/log" + util "github.com/tikv/pd/pkg/gogc" + "github.com/tikv/pd/pkg/memory" + "github.com/tikv/pd/pkg/utils/logutil" + atomicutil "go.uber.org/atomic" + "go.uber.org/zap" +) + +// GlobalMemoryLimitTuner only allow one memory limit tuner in one process +var GlobalMemoryLimitTuner = &memoryLimitTuner{} + +// Go runtime trigger GC when hit memory limit which managed via runtime/debug.SetMemoryLimit. +// So we can change memory limit dynamically to avoid frequent GC when memory usage is greater than the limit. +type memoryLimitTuner struct { + finalizer *finalizer + isTuning atomicutil.Bool + percentage atomicutil.Float64 + waitingReset atomicutil.Bool + nextGCTriggeredByMemoryLimit atomicutil.Bool +} + +// fallbackPercentage indicates the fallback memory limit percentage when turning. +const fallbackPercentage float64 = 1.1 + +func setMemoryLimit(limit int64) int64 { + ret := debug.SetMemoryLimit(limit) + if limit >= 0 { + log.Info("debug.SetMemoryLimit", zap.Int64("limit", limit), zap.Int64("ret", ret)) + } + return ret +} + +// tuning check the memory nextGC and judge whether this GC is trigger by memory limit. +// Go runtime ensure that it will be called serially. +func (t *memoryLimitTuner) tuning() { + if !t.isTuning.Load() { + return + } + r := memory.ForceReadMemStats() + gogc := util.GetGOGC() + ratio := float64(100+gogc) / 100 + // This `if` checks whether the **last** GC was triggered by MemoryLimit as far as possible. + // If the **last** GC was triggered by MemoryLimit, we'll set MemoryLimit to MAXVALUE to return control back to GOGC + // to avoid frequent GC when memory usage fluctuates above and below MemoryLimit. + // The logic we judge whether the **last** GC was triggered by MemoryLimit is as follows: + // suppose `NextGC` = `HeapInUse * (100 + GOGC) / 100)`, + // - If NextGC < MemoryLimit, the **next** GC will **not** be triggered by MemoryLimit thus we do not care about + // why the **last** GC is triggered. And MemoryLimit will not be reset this time. + // - Only if NextGC >= MemoryLimit , the **next** GC will be triggered by MemoryLimit. Thus, we need to reset + // MemoryLimit after the **next** GC happens if needed. + if float64(r.HeapInuse)*ratio > float64(setMemoryLimit(-1)) { + if t.nextGCTriggeredByMemoryLimit.Load() && t.waitingReset.CompareAndSwap(false, true) { + go func() { + defer logutil.LogPanic() + memory.MemoryLimitGCLast.Store(time.Now()) + memory.MemoryLimitGCTotal.Add(1) + setMemoryLimit(t.calcMemoryLimit(fallbackPercentage)) + resetInterval := 1 * time.Minute // Wait 1 minute and set back, to avoid frequent GC + failpoint.Inject("testMemoryLimitTuner", func(val failpoint.Value) { + if val, ok := val.(bool); val && ok { + resetInterval = 1 * time.Second + } + }) + time.Sleep(resetInterval) + setMemoryLimit(t.calcMemoryLimit(t.GetPercentage())) + for !t.waitingReset.CompareAndSwap(true, false) { + continue + } + }() + memory.TriggerMemoryLimitGC.Store(true) + } + t.nextGCTriggeredByMemoryLimit.Store(true) + } else { + t.nextGCTriggeredByMemoryLimit.Store(false) + memory.TriggerMemoryLimitGC.Store(false) + } +} + +// Start starts the memory limit tuner. +func (t *memoryLimitTuner) Start() { + log.Debug("memoryLimitTuner start") + t.finalizer = newFinalizer(t.tuning) // Start tuning +} + +// Stop stops the memory limit tuner. +func (t *memoryLimitTuner) Stop() { + t.finalizer.stop() + log.Info("memoryLimitTuner stop") +} + +// SetPercentage set the percentage for memory limit tuner. +func (t *memoryLimitTuner) SetPercentage(percentage float64) { + t.percentage.Store(percentage) +} + +// GetPercentage get the percentage from memory limit tuner. +func (t *memoryLimitTuner) GetPercentage() float64 { + return t.percentage.Load() +} + +// UpdateMemoryLimit updates the memory limit. +// This function should be called when `tidb_server_memory_limit` or `tidb_server_memory_limit_gc_trigger` is modified. +func (t *memoryLimitTuner) UpdateMemoryLimit() { + var memoryLimit = t.calcMemoryLimit(t.GetPercentage()) + if memoryLimit == math.MaxInt64 { + t.isTuning.Store(false) + memoryLimit = initGOMemoryLimitValue + } else { + t.isTuning.Store(true) + } + setMemoryLimit(memoryLimit) +} + +func (*memoryLimitTuner) calcMemoryLimit(percentage float64) int64 { + memoryLimit := int64(float64(memory.ServerMemoryLimit.Load()) * percentage) // `tidb_server_memory_limit` * `tidb_server_memory_limit_gc_trigger` + if memoryLimit == 0 { + memoryLimit = math.MaxInt64 + } + return memoryLimit +} + +var initGOMemoryLimitValue int64 + +func init() { + initGOMemoryLimitValue = setMemoryLimit(-1) + GlobalMemoryLimitTuner.Start() +} diff --git a/pkg/mcs/discovery/register.go b/pkg/mcs/discovery/register.go new file mode 100644 index 000000000000..fd99f3fcca77 --- /dev/null +++ b/pkg/mcs/discovery/register.go @@ -0,0 +1,120 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package discovery + +import ( + "context" + "fmt" + "time" + + "github.com/pingcap/log" + "github.com/tikv/pd/pkg/utils/logutil" + "go.etcd.io/etcd/clientv3" + "go.uber.org/zap" +) + +// DefaultLeaseInSeconds is the default lease time in seconds. +const DefaultLeaseInSeconds = 3 + +// ServiceRegister is used to register the service to etcd. +type ServiceRegister struct { + ctx context.Context + cancel context.CancelFunc + cli *clientv3.Client + key string + value string + ttl int64 +} + +// NewServiceRegister creates a new ServiceRegister. +func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister { + cctx, cancel := context.WithCancel(ctx) + serviceKey := registryPath(serviceName, serviceAddr) + return &ServiceRegister{ + ctx: cctx, + cancel: cancel, + cli: cli, + key: serviceKey, + value: serializedValue, + ttl: ttl, + } +} + +// Register registers the service to etcd. +func (sr *ServiceRegister) Register() error { + resp, err := sr.cli.Grant(sr.ctx, sr.ttl) + if err != nil { + sr.cancel() + return fmt.Errorf("grant lease failed: %v", err) + } + + if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { + sr.cancel() + return fmt.Errorf("put the key %s failed: %v", sr.key, err) + } + + kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID) + if err != nil { + sr.cancel() + return fmt.Errorf("keepalive failed: %v", err) + } + go func() { + defer logutil.LogPanic() + for { + select { + case <-sr.ctx.Done(): + log.Info("exit register process", zap.String("key", sr.key)) + return + case _, ok := <-kresp: + if !ok { + log.Error("keep alive failed", zap.String("key", sr.key)) + // retry + t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2) + for { + select { + case <-sr.ctx.Done(): + log.Info("exit register process", zap.String("key", sr.key)) + return + default: + } + + <-t.C + resp, err := sr.cli.Grant(sr.ctx, sr.ttl) + if err != nil { + log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err)) + continue + } + + if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { + log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err)) + continue + } + } + } + } + } + }() + + return nil +} + +// Deregister deregisters the service from etcd. +func (sr *ServiceRegister) Deregister() error { + sr.cancel() + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(sr.ttl)*time.Second) + defer cancel() + _, err := sr.cli.Delete(ctx, sr.key) + return err +} diff --git a/pkg/mcs/resource_manager/server/manager.go b/pkg/mcs/resource_manager/server/manager.go new file mode 100644 index 000000000000..25439ad74e9b --- /dev/null +++ b/pkg/mcs/resource_manager/server/manager.go @@ -0,0 +1,330 @@ +// Copyright 2022 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "encoding/json" + "sort" + "sync" + "time" + + "github.com/gogo/protobuf/proto" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + rmpb "github.com/pingcap/kvproto/pkg/resource_manager" + "github.com/pingcap/log" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" +) + +const ( + defaultConsumptionChanSize = 1024 + metricsCleanupInterval = time.Minute + metricsCleanupTimeout = 20 * time.Minute +) + +// Manager is the manager of resource group. +type Manager struct { + sync.RWMutex + srv bs.Server + ruConfig *RequestUnitConfig + groups map[string]*ResourceGroup + storage endpoint.ResourceGroupStorage + // consumptionChan is used to send the consumption + // info to the background metrics flusher. + consumptionDispatcher chan struct { + resourceGroupName string + *rmpb.Consumption + } + // record update time of each resource group + consumptionRecord map[string]time.Time +} + +// RUConfigProvider is used to get RU config from the given +// `bs.server` without modifying its interface. +type RUConfigProvider interface { + GetRequestUnitConfig() *RequestUnitConfig +} + +// NewManager returns a new manager base on the given server, +// which should implement the `RUConfigProvider` interface. +func NewManager[T RUConfigProvider](srv bs.Server) *Manager { + m := &Manager{ + ruConfig: srv.(T).GetRequestUnitConfig(), + groups: make(map[string]*ResourceGroup), + consumptionDispatcher: make(chan struct { + resourceGroupName string + *rmpb.Consumption + }, defaultConsumptionChanSize), + consumptionRecord: make(map[string]time.Time), + } + // The first initialization after the server is started. + srv.AddStartCallback(func() { + log.Info("resource group manager starts to initialize", zap.String("name", srv.Name())) + m.storage = endpoint.NewStorageEndpoint( + kv.NewEtcdKVBase(srv.GetClient(), "resource_group"), + nil, + ) + m.srv = srv + }) + // The second initialization after becoming serving. + srv.AddServiceReadyCallback(m.Init) + return m +} + +// GetBasicServer returns the basic server. +func (m *Manager) GetBasicServer() bs.Server { + return m.srv +} + +// Init initializes the resource group manager. +func (m *Manager) Init(ctx context.Context) { + // Store the RU model config into the storage. + m.storage.SaveRequestUnitConfig(m.ruConfig) + // Load resource group meta info from storage. + m.groups = make(map[string]*ResourceGroup) + handler := func(k, v string) { + group := &rmpb.ResourceGroup{} + if err := proto.Unmarshal([]byte(v), group); err != nil { + log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + panic(err) + } + m.groups[group.Name] = FromProtoResourceGroup(group) + } + m.storage.LoadResourceGroupSettings(handler) + // Load resource group states from storage. + tokenHandler := func(k, v string) { + tokens := &GroupStates{} + if err := json.Unmarshal([]byte(v), tokens); err != nil { + log.Error("err", zap.Error(err), zap.String("k", k), zap.String("v", v)) + panic(err) + } + if group, ok := m.groups[k]; ok { + group.SetStatesIntoResourceGroup(tokens) + } + } + m.storage.LoadResourceGroupStates(tokenHandler) + // Start the background metrics flusher. + go m.backgroundMetricsFlush(ctx) + go func() { + defer logutil.LogPanic() + m.persistLoop(ctx) + }() + log.Info("resource group manager finishes initialization") +} + +// AddResourceGroup puts a resource group. +func (m *Manager) AddResourceGroup(group *ResourceGroup) error { + m.RLock() + _, ok := m.groups[group.Name] + m.RUnlock() + if ok { + return errors.New("this group already exists") + } + err := group.CheckAndInit() + if err != nil { + return err + } + m.Lock() + defer m.Unlock() + if err := group.persistSettings(m.storage); err != nil { + return err + } + if err := group.persistStates(m.storage); err != nil { + return err + } + m.groups[group.Name] = group + return nil +} + +// ModifyResourceGroup modifies an existing resource group. +func (m *Manager) ModifyResourceGroup(group *rmpb.ResourceGroup) error { + if group == nil || group.Name == "" { + return errors.New("invalid group name") + } + m.Lock() + curGroup, ok := m.groups[group.Name] + m.Unlock() + if !ok { + return errors.New("not exists the group") + } + + err := curGroup.PatchSettings(group) + if err != nil { + return err + } + return curGroup.persistSettings(m.storage) +} + +// DeleteResourceGroup deletes a resource group. +func (m *Manager) DeleteResourceGroup(name string) error { + if err := m.storage.DeleteResourceGroupSetting(name); err != nil { + return err + } + m.Lock() + delete(m.groups, name) + m.Unlock() + return nil +} + +// GetResourceGroup returns a copy of a resource group. +func (m *Manager) GetResourceGroup(name string) *ResourceGroup { + m.RLock() + defer m.RUnlock() + if group, ok := m.groups[name]; ok { + return group.Copy() + } + return nil +} + +// GetMutableResourceGroup returns a mutable resource group. +func (m *Manager) GetMutableResourceGroup(name string) *ResourceGroup { + m.RLock() + defer m.RUnlock() + if group, ok := m.groups[name]; ok { + return group + } + return nil +} + +// GetResourceGroupList returns copies of resource group list. +func (m *Manager) GetResourceGroupList() []*ResourceGroup { + m.RLock() + res := make([]*ResourceGroup, 0, len(m.groups)) + for _, group := range m.groups { + res = append(res, group.Copy()) + } + m.RUnlock() + sort.Slice(res, func(i, j int) bool { + return res[i].Name < res[j].Name + }) + return res +} + +func (m *Manager) persistLoop(ctx context.Context) { + ticker := time.NewTicker(time.Minute) + failpoint.Inject("fastPersist", func() { + ticker.Stop() + ticker = time.NewTicker(100 * time.Millisecond) + }) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + m.persistResourceGroupRunningState() + } + } +} + +func (m *Manager) persistResourceGroupRunningState() { + m.RLock() + keys := make([]string, 0, len(m.groups)) + for k := range m.groups { + keys = append(keys, k) + } + m.RUnlock() + for idx := 0; idx < len(keys); idx++ { + m.RLock() + group, ok := m.groups[keys[idx]] + m.RUnlock() + if ok { + group.persistStates(m.storage) + } + } +} + +// Receive the consumption and flush it to the metrics. +func (m *Manager) backgroundMetricsFlush(ctx context.Context) { + defer logutil.LogPanic() + ticker := time.NewTicker(metricsCleanupInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case consumptionInfo := <-m.consumptionDispatcher: + consumption := consumptionInfo.Consumption + if consumption == nil { + continue + } + var ( + name = consumptionInfo.resourceGroupName + rruMetrics = readRequestUnitCost.WithLabelValues(name) + wruMetrics = writeRequestUnitCost.WithLabelValues(name) + sqlLayerRuMetrics = sqlLayerRequestUnitCost.WithLabelValues(name) + readByteMetrics = readByteCost.WithLabelValues(name) + writeByteMetrics = writeByteCost.WithLabelValues(name) + kvCPUMetrics = kvCPUCost.WithLabelValues(name) + sqlCPUMetrics = sqlCPUCost.WithLabelValues(name) + readRequestCountMetrics = requestCount.WithLabelValues(name, readTypeLabel) + writeRequestCountMetrics = requestCount.WithLabelValues(name, writeTypeLabel) + ) + // RU info. + if consumption.RRU != 0 { + rruMetrics.Observe(consumption.RRU) + } + if consumption.WRU != 0 { + wruMetrics.Observe(consumption.WRU) + } + // Byte info. + if consumption.ReadBytes != 0 { + readByteMetrics.Observe(consumption.ReadBytes) + } + if consumption.WriteBytes != 0 { + writeByteMetrics.Observe(consumption.WriteBytes) + } + // CPU time info. + if consumption.TotalCpuTimeMs > 0 { + if consumption.SqlLayerCpuTimeMs > 0 { + sqlLayerRuMetrics.Add(consumption.SqlLayerCpuTimeMs * m.ruConfig.CPUMsCost) + sqlCPUMetrics.Observe(consumption.SqlLayerCpuTimeMs) + } + kvCPUMetrics.Observe(consumption.TotalCpuTimeMs - consumption.SqlLayerCpuTimeMs) + } + // RPC count info. + if consumption.KvReadRpcCount != 0 { + readRequestCountMetrics.Add(consumption.KvReadRpcCount) + } + if consumption.KvWriteRpcCount != 0 { + writeRequestCountMetrics.Add(consumption.KvWriteRpcCount) + } + + m.consumptionRecord[name] = time.Now() + + case <-ticker.C: + // Clean up the metrics that have not been updated for a long time. + for name, lastTime := range m.consumptionRecord { + if time.Since(lastTime) > metricsCleanupTimeout { + readRequestUnitCost.DeleteLabelValues(name) + writeRequestUnitCost.DeleteLabelValues(name) + sqlLayerRequestUnitCost.DeleteLabelValues(name) + readByteCost.DeleteLabelValues(name) + writeByteCost.DeleteLabelValues(name) + kvCPUCost.DeleteLabelValues(name) + sqlCPUCost.DeleteLabelValues(name) + requestCount.DeleteLabelValues(name, readTypeLabel) + requestCount.DeleteLabelValues(name, writeTypeLabel) + delete(m.consumptionRecord, name) + } + } + } + } +} diff --git a/pkg/mcs/resource_manager/server/server.go b/pkg/mcs/resource_manager/server/server.go new file mode 100644 index 000000000000..e526ab8ae2f0 --- /dev/null +++ b/pkg/mcs/resource_manager/server/server.go @@ -0,0 +1,495 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "os/signal" + "path" + "strings" + "sync" + "sync/atomic" + "syscall" + "time" + + grpcprometheus "github.com/grpc-ecosystem/go-grpc-prometheus" + "github.com/pingcap/log" + "github.com/soheilhy/cmux" + "github.com/spf13/cobra" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/discovery" + "github.com/tikv/pd/pkg/mcs/utils" + "github.com/tikv/pd/pkg/member" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/memberutil" + "github.com/tikv/pd/pkg/utils/metricutil" + "github.com/tikv/pd/pkg/versioninfo" + "go.etcd.io/etcd/clientv3" + "go.etcd.io/etcd/pkg/types" + "go.uber.org/zap" + "google.golang.org/grpc" +) + +const ( + // resourceManagerPrimaryPrefix defines the key prefix for keyspace group primary election. + // The entire key is in the format of "/ms//resource-manager//primary" + // in which is 5 digits integer with leading zeros. For now we use 0 as the default cluster id. + resourceManagerPrimaryPrefix = "/ms/0/resource-manager" +) + +// Server is the resource manager server, and it implements bs.Server. +type Server struct { + // Server state. 0 is not serving, 1 is serving. + isServing int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + cfg *Config + clusterID uint64 + name string + listenURL *url.URL + + // for the primary election of resource manager + participant *member.Participant + etcdClient *clientv3.Client + httpClient *http.Client + + muxListener net.Listener + service *Service + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes leader. + primaryCallbacks []func(context.Context) + + serviceRegister *discovery.ServiceRegister +} + +// Name returns the unique etcd name for this server in etcd cluster. +func (s *Server) Name() string { + return s.name +} + +// Context returns the context. +func (s *Server) Context() context.Context { + return s.ctx +} + +// Run runs the Resource Manager server. +func (s *Server) Run() (err error) { + if err = s.initClient(); err != nil { + return err + } + if err = s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(1) + go s.primaryElectionLoop() +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit resource manager primary election loop") + return + } + + primary, rev, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + log.Info("start to watch the primary/leader", zap.Stringer("resource-manager-primary", primary)) + // WatchLeader will keep looping and never return unless the primary/leader has changed. + s.participant.WatchLeader(s.serverLoopCtx, primary, rev) + log.Info("the resource manager primary/leader has changed, try to re-campaign a primary/leader") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name)) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign resource manager primary/leader meets error due to txn conflict, another resource manager server may campaign successfully", + zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name)) + } else { + log.Error("campaign resource manager primary/leader meets error due to etcd error", + zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable Resource Manager service. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the leadership, after this, Resource Manager could be ready to provide service. + s.participant.KeepLeader(ctx) + log.Info("campaign resource manager primary ok", zap.String("campaign-resource-manager-primary-name", s.participant.Member().Name)) + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + log.Info("resource manager primary is ready to serve", zap.String("resource-manager-primary-name", s.participant.Member().Name)) + + leaderTicker := time.NewTicker(utils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the resource manager primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) { + // server is already closed + return + } + + log.Info("closing resource manager server ...") + s.serviceRegister.Deregister() + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + + log.Info("resource manager server is closed") +} + +// GetRequestUnitConfig returns the RU config. +func (s *Server) GetRequestUnitConfig() *RequestUnitConfig { + return &s.cfg.RequestUnit +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing returns whether the server is the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return s.participant.IsLeader() +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return atomic.LoadInt64(&s.isServing) == 0 +} + +// AddServiceReadyCallback adds callbacks when the server becomes the leader, if there is embedded etcd, or the primary otherwise. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + u, err := types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, []url.URL(u)) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + gs := grpc.NewServer() + s.service.RegisterGRPCService(gs) + err := gs.Serve(l) + log.Info("gRPC server stop serving") + + // Attempt graceful stop (waits for pending RPCs), but force a stop if + // it doesn't happen in a reasonable amount of time. + done := make(chan struct{}) + go func() { + defer logutil.LogPanic() + log.Info("try to gracefully stop the server now") + gs.GracefulStop() + close(done) + }() + select { + case <-done: + case <-time.After(utils.DefaultGRPCGracefulStopTimeout): + log.Info("stopping grpc gracefully is taking longer than expected and force stopping now", zap.Duration("default", utils.DefaultGRPCGracefulStopTimeout)) + gs.Stop() + } + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + handler, _ := SetUpRestHandler(s.service) + hs := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + err := hs.Serve(l) + log.Info("http server stop serving") + + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + if err := hs.Shutdown(ctx); err != nil { + log.Error("http server shutdown encountered problem", errs.ZapError(err)) + } else { + log.Info("all http(s) requests finished") + } + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(err)) + } +} + +func (s *Server) startGRPCAndHTTPServers(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := mux.Match(cmux.Any()) + + s.serverLoopWg.Add(2) + go s.startGRPCServer(grpcL) + go s.startHTTPServer(httpL) + + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stop serving", errs.ZapError(err)) + } else { + log.Fatal("mux stop serving unexpectedly", errs.ZapError(err)) + } + } +} + +// GetPrimary returns the primary member. +func (s *Server) GetPrimary() bs.MemberProvider { + return s.participant.GetLeader() +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = etcdutil.GetClusterID(s.etcdClient, utils.ClusterIDPath); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + // The independent Resource Manager service still reuses PD version info since PD and Resource Manager are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + s.participant = member.NewParticipant(s.etcdClient, uniqueID) + s.participant.InitInfo(uniqueName, path.Join(resourceManagerPrimaryPrefix, fmt.Sprintf("%05d", 0)), "primary", "keyspace group primary election", s.cfg.ListenAddr) + s.participant.SetMemberDeployPath(s.participant.ID()) + s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion) + s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash) + + s.service = &Service{ + ctx: s.ctx, + manager: NewManager[*Server](s), + } + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(s.muxListener) + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + atomic.StoreInt64(&s.isServing, 1) + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "resource_manager", s.cfg.ListenAddr, s.cfg.ListenAddr, discovery.DefaultLeaseInSeconds) + s.serviceRegister.Register() + return nil +} + +// NewServer creates a new resource manager server. +func NewServer(ctx context.Context, cfg *Config) *Server { + return &Server{ + name: cfg.Name, + ctx: ctx, + cfg: cfg, + } +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("resource manager") + log.Info("resource manager config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := NewServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/mcs/tso/server/grpc_service.go b/pkg/mcs/tso/server/grpc_service.go new file mode 100644 index 000000000000..c650c4910ad3 --- /dev/null +++ b/pkg/mcs/tso/server/grpc_service.go @@ -0,0 +1,354 @@ +// Copyright 2023 TiKV Project Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "context" + "io" + "net/http" + "time" + + "github.com/pingcap/kvproto/pkg/pdpb" + "github.com/pingcap/kvproto/pkg/tsopb" + "github.com/pingcap/log" + "github.com/pkg/errors" + bs "github.com/tikv/pd/pkg/basicserver" + "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/mcs/registry" + "github.com/tikv/pd/pkg/utils/apiutil" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/logutil" + "go.uber.org/zap" + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +const ( + // tso + maxMergeTSORequests = 10000 + defaultTSOProxyTimeout = 3 * time.Second +) + +// gRPC errors +var ( + ErrNotStarted = status.Errorf(codes.Unavailable, "server not started") +) + +var _ tsopb.TSOServer = (*Service)(nil) + +// SetUpRestHandler is a hook to sets up the REST service. +var SetUpRestHandler = func(srv *Service) (http.Handler, apiutil.APIServiceGroup) { + return dummyRestService{}, apiutil.APIServiceGroup{} +} + +type dummyRestService struct{} + +func (d dummyRestService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusNotImplemented) + w.Write([]byte("not implemented")) +} + +// Service is the TSO grpc service. +type Service struct { + *Server +} + +// NewService creates a new TSO service. +func NewService(svr bs.Server) registry.RegistrableService { + server, ok := svr.(*Server) + if !ok { + log.Fatal("create tso server failed") + } + return &Service{ + Server: server, + } +} + +// RegisterGRPCService registers the service to gRPC server. +func (s *Service) RegisterGRPCService(g *grpc.Server) { + tsopb.RegisterTSOServer(g, s) +} + +// RegisterRESTHandler registers the service to REST server. +func (s *Service) RegisterRESTHandler(userDefineHandlers map[string]http.Handler) { + handler, group := SetUpRestHandler(s) + apiutil.RegisterUserDefinedHandlers(userDefineHandlers, &group, handler) +} + +// Tso returns a stream of timestamps +func (s *Service) Tso(stream tsopb.TSO_TsoServer) error { + var ( + doneCh chan struct{} + errCh chan error + ) + ctx, cancel := context.WithCancel(stream.Context()) + defer cancel() + for { + // Prevent unnecessary performance overhead of the channel. + if errCh != nil { + select { + case err := <-errCh: + return errors.WithStack(err) + default: + } + } + request, err := stream.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return errors.WithStack(err) + } + + streamCtx := stream.Context() + forwardedHost := grpcutil.GetForwardedHost(streamCtx) + if !s.IsLocalRequest(forwardedHost) { + if errCh == nil { + doneCh = make(chan struct{}) + defer close(doneCh) + errCh = make(chan error) + } + s.dispatchTSORequest(ctx, &tsoRequest{ + forwardedHost, + request, + stream, + }, forwardedHost, doneCh, errCh) + continue + } + + start := time.Now() + // TSO uses leader lease to determine validity. No need to check leader here. + if s.IsClosed() { + return status.Errorf(codes.Unknown, "server not started") + } + if request.GetHeader().GetClusterId() != s.clusterID { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, request.GetHeader().GetClusterId()) + } + count := request.GetCount() + ts, err := s.tsoAllocatorManager.HandleTSORequest(request.GetDcLocation(), count) + if err != nil { + return status.Errorf(codes.Unknown, err.Error()) + } + tsoHandleDuration.Observe(time.Since(start).Seconds()) + response := &tsopb.TsoResponse{ + Header: s.header(), + Timestamp: &ts, + Count: count, + } + if err := stream.Send(response); err != nil { + return errors.WithStack(err) + } + } +} + +func (s *Service) header() *tsopb.ResponseHeader { + if s.clusterID == 0 { + return s.wrapErrorToHeader(tsopb.ErrorType_NOT_BOOTSTRAPPED, "cluster id is not ready") + } + return &tsopb.ResponseHeader{ClusterId: s.clusterID} +} + +func (s *Service) wrapErrorToHeader(errorType tsopb.ErrorType, message string) *tsopb.ResponseHeader { + return s.errorHeader(&tsopb.Error{ + Type: errorType, + Message: message, + }) +} + +func (s *Service) errorHeader(err *tsopb.Error) *tsopb.ResponseHeader { + return &tsopb.ResponseHeader{ + ClusterId: s.clusterID, + Error: err, + } +} + +type tsoRequest struct { + forwardedHost string + request *tsopb.TsoRequest + stream tsopb.TSO_TsoServer +} + +func (s *Service) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) { + tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests)) + if !loaded { + tsDeadlineCh := make(chan deadline, 1) + go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh) + go watchTSDeadline(ctx, tsDeadlineCh) + } + tsoRequestChInterface.(chan *tsoRequest) <- request +} + +func (s *Service) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) { + defer logutil.LogPanic() + dispatcherCtx, ctxCancel := context.WithCancel(ctx) + defer ctxCancel() + defer s.tsoDispatcher.Delete(forwardedHost) + + var ( + forwardStream tsopb.TSO_TsoClient + cancel context.CancelFunc + ) + client, err := s.GetDelegateClient(ctx, forwardedHost) + if err != nil { + goto errHandling + } + log.Info("create tso forward stream", zap.String("forwarded-host", forwardedHost)) + forwardStream, cancel, err = s.CreateTsoForwardStream(client) +errHandling: + if err != nil || forwardStream == nil { + log.Error("create tso forwarding stream error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCCreateStream, err)) + select { + case <-dispatcherCtx.Done(): + return + case _, ok := <-doneCh: + if !ok { + return + } + case errCh <- err: + close(errCh) + return + } + } + defer cancel() + + requests := make([]*tsoRequest, maxMergeTSORequests+1) + for { + select { + case first := <-tsoRequestCh: + pendingTSOReqCount := len(tsoRequestCh) + 1 + requests[0] = first + for i := 1; i < pendingTSOReqCount; i++ { + requests[i] = <-tsoRequestCh + } + done := make(chan struct{}) + dl := deadline{ + timer: time.After(defaultTSOProxyTimeout), + done: done, + cancel: cancel, + } + select { + case tsDeadlineCh <- dl: + case <-dispatcherCtx.Done(): + return + } + err = s.processTSORequests(forwardStream, requests[:pendingTSOReqCount]) + close(done) + if err != nil { + log.Error("proxy forward tso error", zap.String("forwarded-host", forwardedHost), errs.ZapError(errs.ErrGRPCSend, err)) + select { + case <-dispatcherCtx.Done(): + return + case _, ok := <-doneCh: + if !ok { + return + } + case errCh <- err: + close(errCh) + return + } + } + case <-dispatcherCtx.Done(): + return + } + } +} + +func (s *Service) processTSORequests(forwardStream tsopb.TSO_TsoClient, requests []*tsoRequest) error { + start := time.Now() + // Merge the requests + count := uint32(0) + for _, request := range requests { + count += request.request.GetCount() + } + req := &tsopb.TsoRequest{ + Header: requests[0].request.GetHeader(), + Count: count, + // TODO: support Local TSO proxy forwarding. + DcLocation: requests[0].request.GetDcLocation(), + } + // Send to the leader stream. + if err := forwardStream.Send(req); err != nil { + return err + } + resp, err := forwardStream.Recv() + if err != nil { + return err + } + tsoProxyHandleDuration.Observe(time.Since(start).Seconds()) + tsoProxyBatchSize.Observe(float64(count)) + // Split the response + physical, logical, suffixBits := resp.GetTimestamp().GetPhysical(), resp.GetTimestamp().GetLogical(), resp.GetTimestamp().GetSuffixBits() + // `logical` is the largest ts's logical part here, we need to do the subtracting before we finish each TSO request. + // This is different from the logic of client batch, for example, if we have a largest ts whose logical part is 10, + // count is 5, then the splitting results should be 5 and 10. + firstLogical := addLogical(logical, -int64(count), suffixBits) + return s.finishTSORequest(requests, physical, firstLogical, suffixBits) +} + +// Because of the suffix, we need to shift the count before we add it to the logical part. +func addLogical(logical, count int64, suffixBits uint32) int64 { + return logical + count</tso//primary" in which + // is 5 digits integer with leading zeros. For now we use 0 as the default cluster id. + tsoPrimaryPrefix = "/ms/0/tso" +) + +var _ bs.Server = (*Server)(nil) +var _ tso.Member = (*member.Participant)(nil) + +// Server is the TSO server, and it implements bs.Server. +type Server struct { + diagnosticspb.DiagnosticsServer + + // Server state. 0 is not serving, 1 is serving. + isServing int64 + // Server start timestamp + startTimestamp int64 + + ctx context.Context + serverLoopCtx context.Context + serverLoopCancel func() + serverLoopWg sync.WaitGroup + + handler *Handler + + cfg *Config + clusterID uint64 + defaultGroupRootPath string + defaultGroupStorage endpoint.TSOStorage + listenURL *url.URL + backendUrls []url.URL + + // for the primary election in the TSO cluster + participant *member.Participant + // etcd client + etcdClient *clientv3.Client + // http client + httpClient *http.Client + + muxListener net.Listener + service *Service + tsoAllocatorManager *tso.AllocatorManager + // Store as map[string]*grpc.ClientConn + clientConns sync.Map + // Store as map[string]chan *tsoRequest + tsoDispatcher sync.Map + + // Callback functions for different stages + // startCallbacks will be called after the server is started. + startCallbacks []func() + // primaryCallbacks will be called after the server becomes the primary. + primaryCallbacks []func(context.Context) + serviceRegister *discovery.ServiceRegister +} + +// Implement the following methods defined in bs.Server + +// Name returns the unique Name for this server in the TSO cluster. +func (s *Server) Name() string { + return s.cfg.Name +} + +// Context returns the context of server. +func (s *Server) Context() context.Context { + return s.ctx +} + +// GetHandler returns the handler. +func (s *Server) GetHandler() *Handler { + return s.handler +} + +// GetBasicServer returns the basic server. +func (s *Server) GetBasicServer() bs.Server { + return s +} + +// Run runs the TSO server. +func (s *Server) Run() error { + go systimemon.StartMonitor(s.ctx, time.Now, func() { + log.Error("system time jumps backward", errs.ZapError(errs.ErrIncorrectSystemTime)) + timeJumpBackCounter.Inc() + }) + + if err := s.initClient(); err != nil { + return err + } + if err := s.startServer(); err != nil { + return err + } + + s.startServerLoop() + + return nil +} + +func (s *Server) startServerLoop() { + s.serverLoopCtx, s.serverLoopCancel = context.WithCancel(s.ctx) + s.serverLoopWg.Add(2) + go s.primaryElectionLoop() + go s.tsoAllocatorLoop() +} + +// tsoAllocatorLoop is used to run the TSO Allocator updating daemon. +func (s *Server) tsoAllocatorLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + ctx, cancel := context.WithCancel(s.serverLoopCtx) + defer cancel() + s.tsoAllocatorManager.AllocatorDaemon(ctx) + log.Info("tso server is closed, exit allocator loop") +} + +func (s *Server) primaryElectionLoop() { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + for { + if s.IsClosed() { + log.Info("server is closed, exit tso primary election loop") + return + } + + primary, rev, checkAgain := s.participant.CheckLeader() + if checkAgain { + continue + } + if primary != nil { + // TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected + // go s.tsoAllocatorManager.ClusterDCLocationChecker() + + log.Info("start to watch the primary/leader", zap.Stringer("tso-primary", primary)) + // WatchLeader will keep looping and never return unless the primary/leader has changed. + s.participant.WatchLeader(s.serverLoopCtx, primary, rev) + log.Info("the tso primary/leader has changed, try to re-campaign a primary/leader") + } + + s.campaignLeader() + } +} + +func (s *Server) campaignLeader() { + log.Info("start to campaign the primary/leader", zap.String("campaign-tso-primary-name", s.participant.Member().Name)) + if err := s.participant.CampaignLeader(s.cfg.LeaderLease); err != nil { + if err.Error() == errs.ErrEtcdTxnConflict.Error() { + log.Info("campaign tso primary/leader meets error due to txn conflict, another tso server may campaign successfully", + zap.String("campaign-tso-primary-name", s.participant.Member().Name)) + } else { + log.Error("campaign tso primary/leader meets error due to etcd error", + zap.String("campaign-tso-primary-name", s.participant.Member().Name), + errs.ZapError(err)) + } + return + } + + // Start keepalive the leadership and enable TSO service. + // TSO service is strictly enabled/disabled by the leader lease for 2 reasons: + // 1. lease based approach is not affected by thread pause, slow runtime schedule, etc. + // 2. load region could be slow. Based on lease we can recover TSO service faster. + ctx, cancel := context.WithCancel(s.serverLoopCtx) + var resetLeaderOnce sync.Once + defer resetLeaderOnce.Do(func() { + cancel() + s.participant.ResetLeader() + }) + + // maintain the the leadership, after this, TSO can be service. + s.participant.KeepLeader(ctx) + log.Info("campaign tso primary ok", zap.String("campaign-tso-primary-name", s.participant.Member().Name)) + + allocator, err := s.tsoAllocatorManager.GetAllocator(tso.GlobalDCLocation) + if err != nil { + log.Error("failed to get the global tso allocator", errs.ZapError(err)) + return + } + log.Info("initializing the global tso allocator") + if err := allocator.Initialize(0); err != nil { + log.Error("failed to initialize the global tso allocator", errs.ZapError(err)) + return + } + defer func() { + s.tsoAllocatorManager.ResetAllocatorGroup(tso.GlobalDCLocation) + }() + + log.Info("triggering the primary callback functions") + for _, cb := range s.primaryCallbacks { + cb(ctx) + } + + s.participant.EnableLeader() + // TODO: if enable-local-tso is true, check the cluster dc-location after the primary/leader is elected + // go s.tsoAllocatorManager.ClusterDCLocationChecker() + log.Info("tso primary is ready to serve", zap.String("tso-primary-name", s.participant.Member().Name)) + + leaderTicker := time.NewTicker(utils.LeaderTickInterval) + defer leaderTicker.Stop() + + for { + select { + case <-leaderTicker.C: + if !s.participant.IsLeader() { + log.Info("no longer a primary/leader because lease has expired, the tso primary/leader will step down") + return + } + case <-ctx.Done(): + // Server is closed and it should return nil. + log.Info("server is closed") + return + } + } +} + +// Close closes the server. +func (s *Server) Close() { + if !atomic.CompareAndSwapInt64(&s.isServing, 1, 0) { + // server is already closed + return + } + + log.Info("closing tso server ...") + s.serviceRegister.Deregister() + // TODO: double check when muxListener is closed, grpc.Server.serve() and http.Server.serve() + // will also close with error cmux.ErrListenerClosed. + s.muxListener.Close() + s.serverLoopCancel() + s.serverLoopWg.Wait() + + if s.etcdClient != nil { + if err := s.etcdClient.Close(); err != nil { + log.Error("close etcd client meet error", errs.ZapError(errs.ErrCloseEtcdClient, err)) + } + } + + if s.httpClient != nil { + s.httpClient.CloseIdleConnections() + } + log.Info("tso server is closed") +} + +// GetClient returns builtin etcd client. +func (s *Server) GetClient() *clientv3.Client { + return s.etcdClient +} + +// GetHTTPClient returns builtin http client. +func (s *Server) GetHTTPClient() *http.Client { + return s.httpClient +} + +// AddStartCallback adds a callback in the startServer phase. +func (s *Server) AddStartCallback(callbacks ...func()) { + s.startCallbacks = append(s.startCallbacks, callbacks...) +} + +// IsServing implements basicserver. It returns whether the server is the leader +// if there is embedded etcd, or the primary otherwise. +func (s *Server) IsServing() bool { + return s.participant.IsLeader() && atomic.LoadInt64(&s.isServing) == 1 +} + +// GetPrimary returns the primary provider of this tso server. +func (s *Server) GetPrimary() bs.MemberProvider { + return s.participant.GetLeader() +} + +// AddServiceReadyCallback implements basicserver. It adds callbacks when the server becomes the primary. +func (s *Server) AddServiceReadyCallback(callbacks ...func(context.Context)) { + s.primaryCallbacks = append(s.primaryCallbacks, callbacks...) +} + +// Implement the other methods + +// ClusterID returns the cluster ID of this server. +func (s *Server) ClusterID() uint64 { + return s.clusterID +} + +// IsClosed checks if the server loop is closed +func (s *Server) IsClosed() bool { + return atomic.LoadInt64(&s.isServing) == 0 +} + +// GetTSOAllocatorManager returns the manager of TSO Allocator. +func (s *Server) GetTSOAllocatorManager() *tso.AllocatorManager { + return s.tsoAllocatorManager +} + +// GetTSODispatcher gets the TSO Dispatcher +func (s *Server) GetTSODispatcher() *sync.Map { + return &s.tsoDispatcher +} + +// IsLocalRequest checks if the forwarded host is the current host +func (s *Server) IsLocalRequest(forwardedHost string) bool { + // TODO: Check if the forwarded host is the current host. + // The logic is depending on etcd service mode -- if the TSO service + // uses the embedded etcd, check against ClientUrls; otherwise check + // against the cluster membership. + return forwardedHost == "" +} + +// CreateTsoForwardStream creates the forward stream +func (s *Server) CreateTsoForwardStream(client *grpc.ClientConn) (tsopb.TSO_TsoClient, context.CancelFunc, error) { + done := make(chan struct{}) + ctx, cancel := context.WithCancel(s.ctx) + go checkStream(ctx, cancel, done) + forwardStream, err := tsopb.NewTSOClient(client).Tso(ctx) + done <- struct{}{} + return forwardStream, cancel, err +} + +// GetDelegateClient returns grpc client connection talking to the forwarded host +func (s *Server) GetDelegateClient(ctx context.Context, forwardedHost string) (*grpc.ClientConn, error) { + client, ok := s.clientConns.Load(forwardedHost) + if !ok { + tlsConfig, err := s.GetTLSConfig().ToTLSConfig() + if err != nil { + return nil, err + } + cc, err := grpcutil.GetClientConn(ctx, forwardedHost, tlsConfig) + if err != nil { + return nil, err + } + client = cc + s.clientConns.Store(forwardedHost, cc) + } + return client.(*grpc.ClientConn), nil +} + +// ValidateInternalRequest checks if server is closed, which is used to validate +// the gRPC communication between TSO servers internally. +// TODO: Check if the sender is from the global TSO allocator +func (s *Server) ValidateInternalRequest(_ *tsopb.RequestHeader, _ bool) error { + if s.IsClosed() { + return ErrNotStarted + } + return nil +} + +// ValidateRequest checks if the keyspace replica is the primary and clusterID is matched. +// TODO: Check if the keyspace replica is the primary +func (s *Server) ValidateRequest(header *tsopb.RequestHeader) error { + if s.IsClosed() { + return ErrNotStarted + } + if header.GetClusterId() != s.clusterID { + return status.Errorf(codes.FailedPrecondition, "mismatch cluster id, need %d but got %d", s.clusterID, header.GetClusterId()) + } + return nil +} + +// GetGlobalTS returns global tso. +func (s *Server) GetGlobalTS() (uint64, error) { + ts, err := s.tsoAllocatorManager.GetGlobalTSO() + if err != nil { + return 0, err + } + return tsoutil.GenerateTS(ts), nil +} + +// GetExternalTS returns external timestamp from the cache or the persistent storage. +// TODO: Implement GetExternalTS +func (s *Server) GetExternalTS() uint64 { + return 0 +} + +// SetExternalTS saves external timestamp to cache and the persistent storage. +// TODO: Implement SetExternalTS +func (s *Server) SetExternalTS(externalTS uint64) error { + return nil +} + +func checkStream(streamCtx context.Context, cancel context.CancelFunc, done chan struct{}) { + defer logutil.LogPanic() + select { + case <-done: + return + case <-time.After(3 * time.Second): + cancel() + case <-streamCtx.Done(): + } + <-done +} + +// GetListenURL gets the listen URL. +func (s *Server) GetListenURL() *url.URL { + return s.listenURL +} + +// GetConfig gets the config. +func (s *Server) GetConfig() *Config { + return s.cfg +} + +// GetTLSConfig gets the security config. +func (s *Server) GetTLSConfig() *grpcutil.TLSConfig { + return &s.cfg.Security.TLSConfig +} + +func (s *Server) initClient() error { + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.backendUrls, err = types.NewURLs(strings.Split(s.cfg.BackendEndpoints, ",")) + if err != nil { + return err + } + s.etcdClient, s.httpClient, err = etcdutil.CreateClients(tlsConfig, s.backendUrls) + return err +} + +func (s *Server) startGRPCServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + gs := grpc.NewServer() + s.service.RegisterGRPCService(gs) + diagnosticspb.RegisterDiagnosticsServer(gs, s) + serverr := gs.Serve(l) + log.Info("grpc server stopped serving") + + // Attempt graceful stop (waits for pending RPCs), but force a stop if + // it doesn't happen in a reasonable amount of time. + done := make(chan struct{}) + go func() { + defer logutil.LogPanic() + log.Info("try to gracefully stop the server now") + gs.GracefulStop() + close(done) + }() + select { + case <-done: + case <-time.After(utils.DefaultGRPCGracefulStopTimeout): + log.Info("stopping grpc gracefully is taking longer than expected and force stopping now") + gs.Stop() + } + + if s.IsClosed() { + log.Info("grpc server stopped") + } else { + log.Fatal("grpc server stopped unexpectedly", errs.ZapError(serverr)) + } +} + +func (s *Server) startHTTPServer(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + handler, _ := SetUpRestHandler(s.service) + hs := &http.Server{ + Handler: handler, + ReadTimeout: 5 * time.Minute, + ReadHeaderTimeout: 5 * time.Second, + } + serverr := hs.Serve(l) + log.Info("http server stopped serving") + + ctx, cancel := context.WithTimeout(context.Background(), utils.DefaultHTTPGracefulShutdownTimeout) + defer cancel() + if err := hs.Shutdown(ctx); err != nil { + log.Error("http server shutdown encountered problem", errs.ZapError(err)) + } else { + log.Info("all http(s) requests finished") + } + if s.IsClosed() { + log.Info("http server stopped") + } else { + log.Fatal("http server stopped unexpectedly", errs.ZapError(serverr)) + } +} + +func (s *Server) startGRPCAndHTTPServers(l net.Listener) { + defer logutil.LogPanic() + defer s.serverLoopWg.Done() + + mux := cmux.New(l) + grpcL := mux.MatchWithWriters(cmux.HTTP2MatchHeaderFieldSendSettings("content-type", "application/grpc")) + httpL := mux.Match(cmux.Any()) + + s.serverLoopWg.Add(2) + go s.startGRPCServer(grpcL) + go s.startHTTPServer(httpL) + + if err := mux.Serve(); err != nil { + if s.IsClosed() { + log.Info("mux stop serving", errs.ZapError(err)) + } else { + log.Panic("mux stop serving unexpectedly", errs.ZapError(err)) + } + } +} + +func (s *Server) startServer() (err error) { + if s.clusterID, err = etcdutil.GetClusterID(s.etcdClient, utils.ClusterIDPath); err != nil { + return err + } + log.Info("init cluster id", zap.Uint64("cluster-id", s.clusterID)) + + // It may lose accuracy if use float64 to store uint64. So we store the cluster id in label. + metadataGauge.WithLabelValues(fmt.Sprintf("cluster%d", s.clusterID)).Set(0) + // The independent TSO service still reuses PD version info since PD and TSO are just + // different service modes provided by the same pd-server binary + serverInfo.WithLabelValues(versioninfo.PDReleaseVersion, versioninfo.PDGitHash).Set(float64(time.Now().Unix())) + s.defaultGroupRootPath = path.Join(pdRootPath, strconv.FormatUint(s.clusterID, 10)) + + // TODO: Figure out how we should generated the unique id and name passed to Participant. + // For now, set the name to be listen address and generate the unique id from the name with sha256. + uniqueName := s.cfg.ListenAddr + uniqueID := memberutil.GenerateUniqueID(uniqueName) + log.Info("joining primary election", zap.String("participant-name", uniqueName), zap.Uint64("participant-id", uniqueID)) + + s.participant = member.NewParticipant(s.etcdClient, uniqueID) + s.participant.InitInfo(uniqueName, path.Join(tsoPrimaryPrefix, fmt.Sprintf("%05d", 0)), "primary", "keyspace group primary election", s.cfg.ListenAddr) + s.participant.SetMemberDeployPath(s.participant.ID()) + s.participant.SetMemberBinaryVersion(s.participant.ID(), versioninfo.PDReleaseVersion) + s.participant.SetMemberGitHash(s.participant.ID(), versioninfo.PDGitHash) + + s.defaultGroupStorage = endpoint.NewStorageEndpoint(kv.NewEtcdKVBase(s.GetClient(), s.defaultGroupRootPath), nil) + s.tsoAllocatorManager = tso.NewAllocatorManager( + s.participant, s.defaultGroupRootPath, s.defaultGroupStorage, s.cfg.IsLocalTSOEnabled(), s.cfg.GetTSOSaveInterval(), s.cfg.GetTSOUpdatePhysicalInterval(), + s.cfg.GetTLSConfig(), func() time.Duration { return s.cfg.MaxResetTSGap.Duration }) + // Set up the Global TSO Allocator here, it will be initialized once this TSO participant campaigns leader successfully. + s.tsoAllocatorManager.SetUpAllocator(s.ctx, tso.GlobalDCLocation, s.participant.GetLeadership()) + + s.service = &Service{Server: s} + + tlsConfig, err := s.cfg.Security.ToTLSConfig() + if err != nil { + return err + } + s.listenURL, err = url.Parse(s.cfg.ListenAddr) + if err != nil { + return err + } + if tlsConfig != nil { + s.muxListener, err = tls.Listen(utils.TCPNetworkStr, s.listenURL.Host, tlsConfig) + } else { + s.muxListener, err = net.Listen(utils.TCPNetworkStr, s.listenURL.Host) + } + if err != nil { + return err + } + + s.serverLoopWg.Add(1) + go s.startGRPCAndHTTPServers(s.muxListener) + + // Run callbacks + log.Info("triggering the start callback functions") + for _, cb := range s.startCallbacks { + cb() + } + + // Server has started. + atomic.StoreInt64(&s.isServing, 1) + s.serviceRegister = discovery.NewServiceRegister(s.ctx, s.etcdClient, "tso", s.cfg.ListenAddr, s.cfg.ListenAddr, discovery.DefaultLeaseInSeconds) + s.serviceRegister.Register() + return nil +} + +// CreateServer creates the Server +func CreateServer(ctx context.Context, cfg *Config) *Server { + rand.New(rand.NewSource(time.Now().UnixNano())) + svr := &Server{ + DiagnosticsServer: sysutil.NewDiagnosticsServer(cfg.Log.File.Filename), + startTimestamp: time.Now().Unix(), + cfg: cfg, + ctx: ctx, + } + svr.handler = newHandler(svr) + return svr +} + +// CreateServerWrapper encapsulates the configuration/log/metrics initialization and create the server +func CreateServerWrapper(cmd *cobra.Command, args []string) { + cmd.Flags().Parse(args) + cfg := NewConfig() + flagSet := cmd.Flags() + err := cfg.Parse(flagSet) + defer logutil.LogPanic() + + if err != nil { + cmd.Println(err) + return + } + + if printVersion, err := flagSet.GetBool("version"); err != nil { + cmd.Println(err) + return + } else if printVersion { + versioninfo.Print() + exit(0) + } + + // New zap logger + err = logutil.SetupLogger(cfg.Log, &cfg.Logger, &cfg.LogProps, cfg.Security.RedactInfoLog) + if err == nil { + log.ReplaceGlobals(cfg.Logger, cfg.LogProps) + } else { + log.Fatal("initialize logger error", errs.ZapError(err)) + } + // Flushing any buffered log entries + defer log.Sync() + + versioninfo.Log("TSO") + log.Info("TSO config", zap.Reflect("config", cfg)) + + grpcprometheus.EnableHandlingTimeHistogram() + metricutil.Push(&cfg.Metric) + + ctx, cancel := context.WithCancel(context.Background()) + svr := CreateServer(ctx, cfg) + + sc := make(chan os.Signal, 1) + signal.Notify(sc, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGTERM, + syscall.SIGQUIT) + + var sig os.Signal + go func() { + sig = <-sc + cancel() + }() + + if err := svr.Run(); err != nil { + log.Fatal("run server failed", errs.ZapError(err)) + } + + <-ctx.Done() + log.Info("got signal to exit", zap.String("signal", sig.String())) + + svr.Close() + switch sig { + case syscall.SIGTERM: + exit(0) + default: + exit(1) + } +} + +func exit(code int) { + log.Sync() + os.Exit(code) +} diff --git a/pkg/systimemon/systimemon.go b/pkg/systimemon/systimemon.go index a3124312fa22..75fc5e68d8b1 100644 --- a/pkg/systimemon/systimemon.go +++ b/pkg/systimemon/systimemon.go @@ -20,11 +20,13 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" + "github.com/tikv/pd/pkg/utils/logutil" "go.uber.org/zap" ) // StartMonitor calls systimeErrHandler if system time jump backward. func StartMonitor(ctx context.Context, now func() time.Time, systimeErrHandler func()) { + defer logutil.LogPanic() log.Info("start system time monitor") tick := time.NewTicker(100 * time.Millisecond) defer tick.Stop() diff --git a/server/election/lease.go b/server/election/lease.go index cb5d13ddb333..dc6f9a00edfc 100644 --- a/server/election/lease.go +++ b/server/election/lease.go @@ -21,8 +21,14 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" +<<<<<<< HEAD:server/election/lease.go "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/typeutil" +======= + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/typeutil" +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)):pkg/election/lease.go "go.etcd.io/etcd/clientv3" "go.uber.org/zap" ) @@ -129,6 +135,7 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c ch := make(chan time.Time) go func() { + defer logutil.LogPanic() ticker := time.NewTicker(interval) defer ticker.Stop() @@ -137,6 +144,7 @@ func (l *lease) keepAliveWorker(ctx context.Context, interval time.Duration) <-c for { go func() { + defer logutil.LogPanic() start := time.Now() ctx1, cancel := context.WithTimeout(ctx, l.leaseTimeout) defer cancel() diff --git a/server/region_syncer/client.go b/server/region_syncer/client.go index 3f2419ad7204..c29c3483d6cf 100644 --- a/server/region_syncer/client.go +++ b/server/region_syncer/client.go @@ -23,8 +23,14 @@ import ( "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" +<<<<<<< HEAD "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/server/core" +======= + "github.com/tikv/pd/pkg/storage" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/logutil" +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)) "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/backoff" @@ -116,6 +122,7 @@ func (s *RegionSyncer) StartSyncWithLeader(addr string) { ctx := s.mu.clientCtx go func() { + defer logutil.LogPanic() defer s.wg.Done() // used to load region from kv storage to cache storage. bc := s.server.GetBasicCluster() diff --git a/server/schedule/region_scatterer.go b/server/schedule/region_scatterer.go index 5200370c96f3..bfbcf3d00c08 100644 --- a/server/schedule/region_scatterer.go +++ b/server/schedule/region_scatterer.go @@ -368,6 +368,28 @@ func (r *RegionScatterer) scatterRegion(region *core.RegionInfo, group string) * return op } +<<<<<<< HEAD:server/schedule/region_scatterer.go +======= +func allowLeader(fit *placement.RegionFit, peer *metapb.Peer) bool { + switch peer.GetRole() { + case metapb.PeerRole_Learner, metapb.PeerRole_DemotingVoter: + return false + } + if peer.IsWitness { + return false + } + peerFit := fit.GetRuleFit(peer.GetId()) + if peerFit == nil || peerFit.Rule == nil || peerFit.Rule.IsWitness { + return false + } + switch peerFit.Rule.Role { + case placement.Voter, placement.Leader: + return true + } + return false +} + +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)):pkg/schedule/region_scatterer.go func isSameDistribution(region *core.RegionInfo, targetPeers map[uint64]*metapb.Peer, targetLeader uint64) bool { peers := region.GetPeers() for _, peer := range peers { diff --git a/server/tso/allocator_manager.go b/server/tso/allocator_manager.go index f0689ee47137..bb50e11d4ef1 100644 --- a/server/tso/allocator_manager.go +++ b/server/tso/allocator_manager.go @@ -31,11 +31,20 @@ import ( "github.com/tikv/pd/pkg/etcdutil" "github.com/tikv/pd/pkg/grpcutil" "github.com/tikv/pd/pkg/slice" +<<<<<<< HEAD:server/tso/allocator_manager.go "github.com/tikv/pd/pkg/syncutil" "github.com/tikv/pd/server/config" "github.com/tikv/pd/server/election" "github.com/tikv/pd/server/member" "github.com/tikv/pd/server/storage/kv" +======= + "github.com/tikv/pd/pkg/storage/endpoint" + "github.com/tikv/pd/pkg/storage/kv" + "github.com/tikv/pd/pkg/utils/etcdutil" + "github.com/tikv/pd/pkg/utils/grpcutil" + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/syncutil" +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)):pkg/tso/allocator_manager.go "go.etcd.io/etcd/clientv3" "go.uber.org/zap" "google.golang.org/grpc" @@ -341,6 +350,7 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string { // similar logic with leaderLoop in server/server.go func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) { + defer logutil.LogPanic() defer log.Info("server is closed, return local tso allocator leader loop", zap.String("dc-location", allocator.GetDCLocation()), zap.String("local-tso-allocator-name", am.member.Member().Name)) @@ -592,6 +602,7 @@ func (am *AllocatorManager) allocatorUpdater() { // updateAllocator is used to update the allocator in the group. func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { + defer logutil.LogPanic() defer am.wg.Done() select { case <-ag.ctx.Done(): @@ -642,6 +653,7 @@ func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) { // ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info // and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations. func (am *AllocatorManager) ClusterDCLocationChecker() { + defer logutil.LogPanic() // Wait for the PD leader to be elected out. if am.member.GetLeader() == nil { return diff --git a/server/tso/global_allocator.go b/server/tso/global_allocator.go index 02d7b7438c82..ea25b2d88ae0 100644 --- a/server/tso/global_allocator.go +++ b/server/tso/global_allocator.go @@ -26,9 +26,15 @@ import ( "github.com/pingcap/log" "github.com/tikv/pd/pkg/errs" "github.com/tikv/pd/pkg/slice" +<<<<<<< HEAD:server/tso/global_allocator.go "github.com/tikv/pd/pkg/tsoutil" "github.com/tikv/pd/pkg/typeutil" "github.com/tikv/pd/server/election" +======= + "github.com/tikv/pd/pkg/utils/logutil" + "github.com/tikv/pd/pkg/utils/tsoutil" + "github.com/tikv/pd/pkg/utils/typeutil" +>>>>>>> 5bdfa717f (*: add defer to logs the panic reason and stack (#6123)):pkg/tso/global_allocator.go "go.uber.org/zap" "google.golang.org/grpc" ) @@ -334,6 +340,7 @@ func (gta *GlobalTSOAllocator) SyncMaxTS( // Send SyncMaxTSRequest to all allocator leaders concurrently. wg.Add(1) go func(ctx context.Context, conn *grpc.ClientConn, respCh chan<- *syncResp) { + defer logutil.LogPanic() defer wg.Done() syncMaxTSResp := &syncResp{} syncCtx, cancel := context.WithTimeout(ctx, rpcTimeout)