forked from tikv/pd
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
This is an automated cherry-pick of tikv#6123
ref tikv#6099 Signed-off-by: ti-chi-bot <[email protected]>
- Loading branch information
1 parent
a6e77c1
commit b0567e9
Showing
13 changed files
with
2,227 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,149 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package gctuner | ||
|
||
import ( | ||
"math" | ||
"runtime/debug" | ||
"time" | ||
|
||
"github.com/pingcap/failpoint" | ||
"github.com/pingcap/log" | ||
util "github.com/tikv/pd/pkg/gogc" | ||
"github.com/tikv/pd/pkg/memory" | ||
"github.com/tikv/pd/pkg/utils/logutil" | ||
atomicutil "go.uber.org/atomic" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// GlobalMemoryLimitTuner only allow one memory limit tuner in one process | ||
var GlobalMemoryLimitTuner = &memoryLimitTuner{} | ||
|
||
// Go runtime trigger GC when hit memory limit which managed via runtime/debug.SetMemoryLimit. | ||
// So we can change memory limit dynamically to avoid frequent GC when memory usage is greater than the limit. | ||
type memoryLimitTuner struct { | ||
finalizer *finalizer | ||
isTuning atomicutil.Bool | ||
percentage atomicutil.Float64 | ||
waitingReset atomicutil.Bool | ||
nextGCTriggeredByMemoryLimit atomicutil.Bool | ||
} | ||
|
||
// fallbackPercentage indicates the fallback memory limit percentage when turning. | ||
const fallbackPercentage float64 = 1.1 | ||
|
||
func setMemoryLimit(limit int64) int64 { | ||
ret := debug.SetMemoryLimit(limit) | ||
if limit >= 0 { | ||
log.Info("debug.SetMemoryLimit", zap.Int64("limit", limit), zap.Int64("ret", ret)) | ||
} | ||
return ret | ||
} | ||
|
||
// tuning check the memory nextGC and judge whether this GC is trigger by memory limit. | ||
// Go runtime ensure that it will be called serially. | ||
func (t *memoryLimitTuner) tuning() { | ||
if !t.isTuning.Load() { | ||
return | ||
} | ||
r := memory.ForceReadMemStats() | ||
gogc := util.GetGOGC() | ||
ratio := float64(100+gogc) / 100 | ||
// This `if` checks whether the **last** GC was triggered by MemoryLimit as far as possible. | ||
// If the **last** GC was triggered by MemoryLimit, we'll set MemoryLimit to MAXVALUE to return control back to GOGC | ||
// to avoid frequent GC when memory usage fluctuates above and below MemoryLimit. | ||
// The logic we judge whether the **last** GC was triggered by MemoryLimit is as follows: | ||
// suppose `NextGC` = `HeapInUse * (100 + GOGC) / 100)`, | ||
// - If NextGC < MemoryLimit, the **next** GC will **not** be triggered by MemoryLimit thus we do not care about | ||
// why the **last** GC is triggered. And MemoryLimit will not be reset this time. | ||
// - Only if NextGC >= MemoryLimit , the **next** GC will be triggered by MemoryLimit. Thus, we need to reset | ||
// MemoryLimit after the **next** GC happens if needed. | ||
if float64(r.HeapInuse)*ratio > float64(setMemoryLimit(-1)) { | ||
if t.nextGCTriggeredByMemoryLimit.Load() && t.waitingReset.CompareAndSwap(false, true) { | ||
go func() { | ||
defer logutil.LogPanic() | ||
memory.MemoryLimitGCLast.Store(time.Now()) | ||
memory.MemoryLimitGCTotal.Add(1) | ||
setMemoryLimit(t.calcMemoryLimit(fallbackPercentage)) | ||
resetInterval := 1 * time.Minute // Wait 1 minute and set back, to avoid frequent GC | ||
failpoint.Inject("testMemoryLimitTuner", func(val failpoint.Value) { | ||
if val, ok := val.(bool); val && ok { | ||
resetInterval = 1 * time.Second | ||
} | ||
}) | ||
time.Sleep(resetInterval) | ||
setMemoryLimit(t.calcMemoryLimit(t.GetPercentage())) | ||
for !t.waitingReset.CompareAndSwap(true, false) { | ||
continue | ||
} | ||
}() | ||
memory.TriggerMemoryLimitGC.Store(true) | ||
} | ||
t.nextGCTriggeredByMemoryLimit.Store(true) | ||
} else { | ||
t.nextGCTriggeredByMemoryLimit.Store(false) | ||
memory.TriggerMemoryLimitGC.Store(false) | ||
} | ||
} | ||
|
||
// Start starts the memory limit tuner. | ||
func (t *memoryLimitTuner) Start() { | ||
log.Debug("memoryLimitTuner start") | ||
t.finalizer = newFinalizer(t.tuning) // Start tuning | ||
} | ||
|
||
// Stop stops the memory limit tuner. | ||
func (t *memoryLimitTuner) Stop() { | ||
t.finalizer.stop() | ||
log.Info("memoryLimitTuner stop") | ||
} | ||
|
||
// SetPercentage set the percentage for memory limit tuner. | ||
func (t *memoryLimitTuner) SetPercentage(percentage float64) { | ||
t.percentage.Store(percentage) | ||
} | ||
|
||
// GetPercentage get the percentage from memory limit tuner. | ||
func (t *memoryLimitTuner) GetPercentage() float64 { | ||
return t.percentage.Load() | ||
} | ||
|
||
// UpdateMemoryLimit updates the memory limit. | ||
// This function should be called when `tidb_server_memory_limit` or `tidb_server_memory_limit_gc_trigger` is modified. | ||
func (t *memoryLimitTuner) UpdateMemoryLimit() { | ||
var memoryLimit = t.calcMemoryLimit(t.GetPercentage()) | ||
if memoryLimit == math.MaxInt64 { | ||
t.isTuning.Store(false) | ||
memoryLimit = initGOMemoryLimitValue | ||
} else { | ||
t.isTuning.Store(true) | ||
} | ||
setMemoryLimit(memoryLimit) | ||
} | ||
|
||
func (*memoryLimitTuner) calcMemoryLimit(percentage float64) int64 { | ||
memoryLimit := int64(float64(memory.ServerMemoryLimit.Load()) * percentage) // `tidb_server_memory_limit` * `tidb_server_memory_limit_gc_trigger` | ||
if memoryLimit == 0 { | ||
memoryLimit = math.MaxInt64 | ||
} | ||
return memoryLimit | ||
} | ||
|
||
var initGOMemoryLimitValue int64 | ||
|
||
func init() { | ||
initGOMemoryLimitValue = setMemoryLimit(-1) | ||
GlobalMemoryLimitTuner.Start() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,120 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package discovery | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/pingcap/log" | ||
"github.com/tikv/pd/pkg/utils/logutil" | ||
"go.etcd.io/etcd/clientv3" | ||
"go.uber.org/zap" | ||
) | ||
|
||
// DefaultLeaseInSeconds is the default lease time in seconds. | ||
const DefaultLeaseInSeconds = 3 | ||
|
||
// ServiceRegister is used to register the service to etcd. | ||
type ServiceRegister struct { | ||
ctx context.Context | ||
cancel context.CancelFunc | ||
cli *clientv3.Client | ||
key string | ||
value string | ||
ttl int64 | ||
} | ||
|
||
// NewServiceRegister creates a new ServiceRegister. | ||
func NewServiceRegister(ctx context.Context, cli *clientv3.Client, serviceName, serviceAddr, serializedValue string, ttl int64) *ServiceRegister { | ||
cctx, cancel := context.WithCancel(ctx) | ||
serviceKey := registryPath(serviceName, serviceAddr) | ||
return &ServiceRegister{ | ||
ctx: cctx, | ||
cancel: cancel, | ||
cli: cli, | ||
key: serviceKey, | ||
value: serializedValue, | ||
ttl: ttl, | ||
} | ||
} | ||
|
||
// Register registers the service to etcd. | ||
func (sr *ServiceRegister) Register() error { | ||
resp, err := sr.cli.Grant(sr.ctx, sr.ttl) | ||
if err != nil { | ||
sr.cancel() | ||
return fmt.Errorf("grant lease failed: %v", err) | ||
} | ||
|
||
if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { | ||
sr.cancel() | ||
return fmt.Errorf("put the key %s failed: %v", sr.key, err) | ||
} | ||
|
||
kresp, err := sr.cli.KeepAlive(sr.ctx, resp.ID) | ||
if err != nil { | ||
sr.cancel() | ||
return fmt.Errorf("keepalive failed: %v", err) | ||
} | ||
go func() { | ||
defer logutil.LogPanic() | ||
for { | ||
select { | ||
case <-sr.ctx.Done(): | ||
log.Info("exit register process", zap.String("key", sr.key)) | ||
return | ||
case _, ok := <-kresp: | ||
if !ok { | ||
log.Error("keep alive failed", zap.String("key", sr.key)) | ||
// retry | ||
t := time.NewTicker(time.Duration(sr.ttl) * time.Second / 2) | ||
for { | ||
select { | ||
case <-sr.ctx.Done(): | ||
log.Info("exit register process", zap.String("key", sr.key)) | ||
return | ||
default: | ||
} | ||
|
||
<-t.C | ||
resp, err := sr.cli.Grant(sr.ctx, sr.ttl) | ||
if err != nil { | ||
log.Error("grant lease failed", zap.String("key", sr.key), zap.Error(err)) | ||
continue | ||
} | ||
|
||
if _, err := sr.cli.Put(sr.ctx, sr.key, sr.value, clientv3.WithLease(resp.ID)); err != nil { | ||
log.Error("put the key failed", zap.String("key", sr.key), zap.Error(err)) | ||
continue | ||
} | ||
} | ||
} | ||
} | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// Deregister deregisters the service from etcd. | ||
func (sr *ServiceRegister) Deregister() error { | ||
sr.cancel() | ||
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(sr.ttl)*time.Second) | ||
defer cancel() | ||
_, err := sr.cli.Delete(ctx, sr.key) | ||
return err | ||
} |
Oops, something went wrong.