Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor: support tidb memory debug mode #35322

Merged
merged 27 commits into from
Jul 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
4d356ab
memory debug mode
wshwsh12 Apr 27, 2022
c2fa0e2
debug debug mode
wshwsh12 Jun 1, 2022
9b1dc4f
log all tracker
wshwsh12 Jun 6, 2022
223c8fc
format
wshwsh12 Jun 6, 2022
8a3d5a0
Merge remote-tracking branch 'upstream/master' into tidb_memory_debug…
wshwsh12 Jun 13, 2022
cd4399b
fmt
wshwsh12 Jun 13, 2022
ff6b1ef
Merge remote-tracking branch 'upstream/master' into tidb_memory_debug…
wshwsh12 Jul 7, 2022
2765c9b
Merge remote-tracking branch 'upstream/master' into tidb_memory_debug…
wshwsh12 Jul 8, 2022
3ca8663
address comments
wshwsh12 Jul 8, 2022
249055c
fix test
wshwsh12 Jul 8, 2022
0acd1dd
Merge remote-tracking branch 'upstream/master' into tidb_memory_debug…
wshwsh12 Jul 11, 2022
0326579
add comments
wshwsh12 Jul 11, 2022
e36f194
error
wshwsh12 Jul 11, 2022
6363022
address comments
wshwsh12 Jul 11, 2022
9c8adfc
Merge branch 'master' into tidb_memory_debug_mode
wshwsh12 Jul 11, 2022
2891647
address comments
wshwsh12 Jul 12, 2022
3ebb617
test default value
wshwsh12 Jul 12, 2022
38b7569
fix
wshwsh12 Jul 12, 2022
f26bce9
polish
XuHuaiyu Jul 12, 2022
153f29a
Merge remote-tracking branch 'xhy/tidb_memory_debug_mode' into tidb_m…
wshwsh12 Jul 12, 2022
0dee399
fmt
wshwsh12 Jul 12, 2022
4ca5878
fix
wshwsh12 Jul 12, 2022
6b16b72
polish
wshwsh12 Jul 12, 2022
3ebd6af
Merge branch 'master' into tidb_memory_debug_mode
wshwsh12 Jul 12, 2022
7c40a38
Merge branch 'master' into tidb_memory_debug_mode
wshwsh12 Jul 13, 2022
9b9c949
Merge branch 'master' into tidb_memory_debug_mode
ti-chi-bot Jul 13, 2022
b20f342
Merge branch 'master' into tidb_memory_debug_mode
ti-chi-bot Jul 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions executor/explain.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,22 @@ package executor

import (
"context"
"os"
"path/filepath"
"runtime"
rpprof "runtime/pprof"
"strconv"
"sync"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
)

// ExplainExec represents an explain executor.
Expand Down Expand Up @@ -89,6 +100,24 @@ func (e *ExplainExec) executeAnalyzeExec(ctx context.Context) (err error) {
}
}
}()
if minHeapInUse, alarmRatio := e.ctx.GetSessionVars().MemoryDebugModeMinHeapInUse, e.ctx.GetSessionVars().MemoryDebugModeAlarmRatio; minHeapInUse != 0 && alarmRatio != 0 {
memoryDebugModeCtx, cancel := context.WithCancel(ctx)
waitGroup := sync.WaitGroup{}
waitGroup.Add(1)
defer func() {
// Notify and wait debug goroutine exit.
cancel()
waitGroup.Wait()
}()
go (&memoryDebugModeHandler{
ctx: memoryDebugModeCtx,
minHeapInUse: mathutil.Abs(minHeapInUse),
alarmRatio: alarmRatio,
autoGC: minHeapInUse > 0,
memTracker: e.ctx.GetSessionVars().StmtCtx.MemTracker,
wg: &waitGroup,
}).run()
}
e.executed = true
chk := newFirstChunk(e.analyzeExec)
for {
Expand Down Expand Up @@ -123,3 +152,126 @@ func (e *ExplainExec) getAnalyzeExecToExecutedNoDelay() Executor {
}
return nil
}

type memoryDebugModeHandler struct {
ctx context.Context
minHeapInUse int64
alarmRatio int64
autoGC bool
wg *sync.WaitGroup
memTracker *memory.Tracker

infoField []zap.Field
}

func (h *memoryDebugModeHandler) fetchCurrentMemoryUsage(gc bool) (heapInUse, trackedMem uint64) {
if gc {
runtime.GC()
}
instanceStats := &runtime.MemStats{}
runtime.ReadMemStats(instanceStats)
heapInUse = instanceStats.HeapInuse
trackedMem = uint64(h.memTracker.BytesConsumed())
return
}

func (h *memoryDebugModeHandler) genInfo(status string, needProfile bool, heapInUse, trackedMem int64) (fields []zap.Field, err error) {
var fileName string
h.infoField = h.infoField[:0]
h.infoField = append(h.infoField, zap.String("sql", status))
h.infoField = append(h.infoField, zap.String("heap in use", memory.FormatBytes(heapInUse)))
h.infoField = append(h.infoField, zap.String("tracked memory", memory.FormatBytes(trackedMem)))
if needProfile {
fileName, err = getHeapProfile()
h.infoField = append(h.infoField, zap.String("heap profile", fileName))
}
return h.infoField, err
}

func (h *memoryDebugModeHandler) run() {
var err error
var fields []zap.Field
defer func() {
heapInUse, trackedMem := h.fetchCurrentMemoryUsage(true)
if err == nil {
fields, err := h.genInfo("finished", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Info("Memory Debug Mode", fields...)
if err != nil {
logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err))
}
} else {
fields, err := h.genInfo("debug_mode_error", false, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Error("Memory Debug Mode", fields...)
logutil.BgLogger().Error("Memory Debug Mode Exit", zap.Error(err))
}
h.wg.Done()
}()

logutil.BgLogger().Info("Memory Debug Mode",
zap.String("sql", "started"),
zap.Bool("autoGC", h.autoGC),
zap.String("minHeapInUse", memory.FormatBytes(h.minHeapInUse)),
zap.Int64("alarmRatio", h.alarmRatio),
)
ticker, loop := time.NewTicker(5*time.Second), 0
for {
select {
case <-h.ctx.Done():
return
case <-ticker.C:
heapInUse, trackedMem := h.fetchCurrentMemoryUsage(h.autoGC)
loop++
if loop%6 == 0 {
fields, err = h.genInfo("running", false, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Info("Memory Debug Mode", fields...)
if err != nil {
return
}
}

if !h.autoGC {
if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse {
fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Warn("Memory Debug Mode", fields...)
if err != nil {
return
}
}
} else {
if heapInUse > uint64(h.minHeapInUse) && trackedMem/100*uint64(100+h.alarmRatio) < heapInUse {
fields, err = h.genInfo("warning", true, int64(heapInUse), int64(trackedMem))
logutil.BgLogger().Warn("Memory Debug Mode", fields...)
if err != nil {
return
}
ts := h.memTracker.SearchTrackerConsumedMoreThanNBytes(h.minHeapInUse / 5)
logs := make([]zap.Field, 0, len(ts))
for _, t := range ts {
logs = append(logs, zap.String("Executor_"+strconv.Itoa(t.Label()), memory.FormatBytes(t.BytesConsumed())))
}
logutil.BgLogger().Warn("Memory Debug Mode, Log all trackers that consumes more than threshold * 20%", logs...)
}
}
}
}
}

func getHeapProfile() (fileName string, err error) {
tempDir := filepath.Join(config.GetGlobalConfig().TempStoragePath, "record")
timeString := time.Now().Format(time.RFC3339)
fileName = filepath.Join(tempDir, "heapGC"+timeString)
f, err := os.Create(fileName)
if err != nil {
return "", err
}
p := rpprof.Lookup("heap")
err = p.WriteTo(f, 0)
if err != nil {
return "", err
}
err = f.Close()
if err != nil {
return "", err
}
return fileName, nil
}
7 changes: 7 additions & 0 deletions planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -1238,6 +1238,13 @@ func (e *Explain) RenderResult() error {
if e.Rows == nil || e.Analyze {
flat := FlattenPhysicalPlan(e.TargetPlan, true)
e.explainFlatPlanInRowFormat(flat)
if e.Analyze &&
e.SCtx().GetSessionVars().MemoryDebugModeMinHeapInUse != 0 &&
e.SCtx().GetSessionVars().MemoryDebugModeAlarmRatio > 0 {
row := e.Rows[0]
tracker := e.SCtx().GetSessionVars().StmtCtx.MemTracker
row[7] = row[7] + "(Total: " + tracker.FormatBytes(tracker.MaxConsumed()) + ")"
}
}
case types.ExplainFormatDOT:
if physicalPlan, ok := e.TargetPlan.(PhysicalPlan); ok {
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1165,6 +1165,12 @@ type SessionVars struct {

// RequestSourceType is the type of inner request.
RequestSourceType string

// MemoryDebugModeMinHeapInUse indicated the minimum heapInUse threshold that triggers the memoryDebugMode.
MemoryDebugModeMinHeapInUse int64
// MemoryDebugModeAlarmRatio indicated the allowable bias ratio of memory tracking accuracy check.
// When `(memory trakced by tidb) * (1+MemoryDebugModeAlarmRatio) < actual heapInUse`, an alarm log will be recorded.
MemoryDebugModeAlarmRatio int64
}

// InitStatementContext initializes a StatementContext, the object is reused to reduce allocation.
Expand Down
8 changes: 8 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1667,6 +1667,14 @@ var defaultSysVars = []*SysVar{
metrics.ToggleSimplifiedMode(TiDBOptOn(s))
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeMinHeapInUse, Value: strconv.Itoa(0), Type: TypeInt, MinValue: math.MinInt64, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeMinHeapInUse = TidbOptInt64(val, 0)
return nil
}},
{Scope: ScopeSession, Name: TiDBMemoryDebugModeAlarmRatio, Value: strconv.Itoa(0), Type: TypeInt, MinValue: 0, MaxValue: math.MaxInt64, SetSession: func(s *SessionVars, val string) error {
s.MemoryDebugModeAlarmRatio = TidbOptInt64(val, 0)
return nil
}},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
10 changes: 10 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1059,3 +1059,13 @@ func TestTiDBCommitterConcurrency(t *testing.T) {
require.Equal(t, val, fmt.Sprintf("%d", expected))
require.NoError(t, err)
}

func TestDefaultMemoryDebugModeValue(t *testing.T) {
vars := NewSessionVars()
val, err := GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeMinHeapInUse)
require.NoError(t, err)
require.Equal(t, val, "0")
val, err = GetSessionOrGlobalSystemVar(vars, TiDBMemoryDebugModeAlarmRatio)
require.NoError(t, err)
require.Equal(t, val, "0")
}
10 changes: 10 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -684,6 +684,16 @@ const (

// TiDBSimplifiedMetrics controls whether to unregister some unused metrics.
TiDBSimplifiedMetrics = "tidb_simplified_metrics"

// TiDBMemoryDebugModeMinHeapInUse is used to set tidb memory debug mode trigger threshold.
// When set to 0, the function is disabled.
// When set to a negative integer, use memory debug mode to detect the issue of frequent allocation and release of memory.
// We do not actively trigger gc, and check whether the `tracker memory * (1+bias ratio) > heap in use` each 5s.
// When set to a positive integer, use memory debug mode to detect the issue of memory tracking inaccurate.
// We trigger runtime.GC() each 5s, and check whether the `tracker memory * (1+bias ratio) > heap in use`.
TiDBMemoryDebugModeMinHeapInUse = "tidb_memory_debug_mode_min_heap_inuse"
// TiDBMemoryDebugModeAlarmRatio is used set tidb memory debug mode bias ratio. Treat memory bias less than this ratio as noise.
TiDBMemoryDebugModeAlarmRatio = "tidb_memory_debug_mode_alarm_ratio"
)

// TiDB vars that have only global scope
Expand Down
14 changes: 14 additions & 0 deletions util/memory/tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,20 @@ func (t *Tracker) SearchTrackerWithoutLock(label int) *Tracker {
return nil
}

// SearchTrackerConsumedMoreThanNBytes searches the specific tracker that consumes more than NBytes.
func (t *Tracker) SearchTrackerConsumedMoreThanNBytes(limit int64) (res []*Tracker) {
t.mu.Lock()
defer t.mu.Unlock()
for _, childSlice := range t.mu.children {
for _, tracker := range childSlice {
if tracker.BytesConsumed() > limit {
res = append(res, tracker)
}
}
}
return
}

// String returns the string representation of this Tracker tree.
func (t *Tracker) String() string {
buffer := bytes.NewBufferString("\n")
Expand Down