Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: support global memory control for tidb #37794

Merged
merged 33 commits into from
Sep 26, 2022
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
80f544f
temp
wshwsh12 Sep 2, 2022
92f8385
support server-memory-limit-sess-min-size
wshwsh12 Sep 13, 2022
a0e7727
support config
wshwsh12 Sep 13, 2022
9576c18
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 13, 2022
bee18e2
polish config && add some tests
wshwsh12 Sep 13, 2022
6f468c9
support wait sql cancelled and GC
wshwsh12 Sep 13, 2022
87673bf
Merge branch 'master' into server-memory-quota
wshwsh12 Sep 14, 2022
a5a9053
fix
wshwsh12 Sep 14, 2022
ea05d04
Merge branch 'master' into server-memory-quota
wshwsh12 Sep 14, 2022
b198b25
fix
wshwsh12 Sep 14, 2022
46a1b10
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 15, 2022
54ba480
add ut
wshwsh12 Sep 15, 2022
80c6f9e
fix
wshwsh12 Sep 15, 2022
3a19a02
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 15, 2022
e818081
fix
wshwsh12 Sep 20, 2022
a730889
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 20, 2022
74c21fa
fix
wshwsh12 Sep 20, 2022
2840262
fix
wshwsh12 Sep 20, 2022
1600d1d
lint
wshwsh12 Sep 20, 2022
757463d
Merge branch 'master' into server-memory-quota
wshwsh12 Sep 20, 2022
795c192
address comments
wshwsh12 Sep 21, 2022
75d0191
address comments
wshwsh12 Sep 21, 2022
9826ae0
address comments
wshwsh12 Sep 21, 2022
667f838
remove useless check
wshwsh12 Sep 21, 2022
d2001fb
fix bug
wshwsh12 Sep 21, 2022
3114a47
lint
wshwsh12 Sep 21, 2022
1e46db7
address comments
wshwsh12 Sep 21, 2022
cf5c4f7
address comments
wshwsh12 Sep 21, 2022
089c6ac
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 22, 2022
29bf319
Merge remote-tracking branch 'upstream/master' into server-memory-quota
wshwsh12 Sep 25, 2022
8fea589
add comment for time.sleep
wshwsh12 Sep 26, 2022
57c1ada
Merge branch 'master' into server-memory-quota
ti-chi-bot Sep 26, 2022
9da15d2
Merge branch 'master' into server-memory-quota
ti-chi-bot Sep 26, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1942,6 +1942,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
} else {
sc.InitMemTracker(memory.LabelForSQLText, vars.MemQuotaQuery)
sc.MemTracker.AttachToGlobalTracker(GlobalMemoryUsageTracker)
sc.MemTracker.IsSession, sc.MemTracker.SessionID = true, vars.ConnectionID
}

sc.InitDiskTracker(memory.LabelForSQLText, -1)
Expand Down
63 changes: 63 additions & 0 deletions executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6001,3 +6001,66 @@ func TestBinaryStrNumericOperator(t *testing.T) {
// there should be no warning.
tk.MustQuery("show warnings").Check(testkit.Rows())
}

func TestGlobalMemoryControl(t *testing.T) {
store, dom := testkit.CreateMockStoreAndDomain(t)

tk0 := testkit.NewTestKit(t, store)
tk0.MustExec("set global tidb_mem_oom_action = 'cancel'")
tk0.MustExec("set global tidb_server_memory_quota = 512 << 20")
tk0.MustExec("set global tidb_server_memory_limit_sess_min_size = 128")

tk1 := testkit.NewTestKit(t, store)
tracker1 := tk1.Session().GetSessionVars().StmtCtx.MemTracker

tk2 := testkit.NewTestKit(t, store)
tracker2 := tk2.Session().GetSessionVars().StmtCtx.MemTracker

tk3 := testkit.NewTestKit(t, store)
tracker3 := tk3.Session().GetSessionVars().StmtCtx.MemTracker

sm := &testkit.MockSessionManager{
PS: []*util.ProcessInfo{tk1.Session().ShowProcess(), tk2.Session().ShowProcess(), tk3.Session().ShowProcess()},
}
dom.ExpensiveQueryHandle().SetSessionManager(sm)
go dom.ExpensiveQueryHandle().Run()

tracker1.Consume(100 << 20) // 100 MB
tracker2.Consume(200 << 20) // 200 MB
tracker3.Consume(300 << 20) // 300 MB

test := make([]int, 128<<20) // Keep 1GB HeapInUse
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is virtual memory instead of real allocation before you visit the data.
OS will handle the logical -> physical memory address mapping and allocate the physical pages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This only simulates the situation of heapinuse growth.
In normally, TiDB will use the memory immediately after allocation.

time.Sleep(500 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the purpose of this Sleep?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check goroutine checks the memory usage every 100ms. The Sleep() make sure that Top1Tracker can be Canceled.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's better to add some comment for it.
Sleep slows down the test, and 0.5s is long enough for UT


// Kill Top1
require.False(t, tracker1.IsKilled.Load())
require.False(t, tracker2.IsKilled.Load())
require.True(t, tracker3.IsKilled.Load())
require.Equal(t, memory.MemUsageTop1Tracker.Load(), tracker3)
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker3.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
tracker2.Consume(300 << 20) // Sum 500MB, Not Panic, Waiting t3 cancel finish.
time.Sleep(500 * time.Millisecond)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It takes time to cancel?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for cancel

require.False(t, tracker2.IsKilled.Load())
// Kill Finished
tracker3.Consume(-(300 << 20))
// Simulated SQL is Canceled and the time is updated
sm.PSMu.Lock()
ps := *sm.PS[2]
ps.Time = time.Now()
sm.PS[2] = &ps
sm.PSMu.Unlock()
time.Sleep(500 * time.Millisecond)
// Kill the Next SQL
util.WithRecovery( // Next Consume() will panic and cancel the SQL
func() {
tracker2.Consume(1)
}, func(r interface{}) {
require.True(t, strings.Contains(r.(string), "Out Of Memory Quota!"))
})
require.Equal(t, test[0], 0) // Keep 1GB HeapInUse
}
48 changes: 48 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -697,6 +697,54 @@ var defaultSysVars = []*SysVar{
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBServerMemoryQuota, Value: strconv.FormatUint(DefTiDBServerMemoryQuota, 10), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64,
GetGlobal: func(s *SessionVars) (string, error) {
return memory.ServerMemoryQuota.String(), nil
},
Validation: func(s *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
intVal, err := strconv.ParseUint(normalizedValue, 10, 64)
if err != nil {
return "", err
}
if intVal > 0 && intVal < (512<<20) { // 512 MB
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.GenWithStackByArgs(TiDBServerMemoryQuota, originalValue))
intVal = 512 << 20
}
return strconv.FormatUint(intVal, 10), nil
},
SetGlobal: func(s *SessionVars, val string) error {
intVal, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return err
}
memory.ServerMemoryQuota.Store(intVal)
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBServerMemoryLimitSessMinSize, Value: strconv.FormatUint(DefTiDBServerMemoryLimitSessMinSize, 10), Type: TypeUnsigned, MinValue: 0, MaxValue: math.MaxUint64,
GetGlobal: func(s *SessionVars) (string, error) {
return memory.ServerMemoryLimitSessMinSize.String(), nil
},
Validation: func(s *SessionVars, normalizedValue string, originalValue string, scope ScopeFlag) (string, error) {
intVal, err := strconv.ParseUint(normalizedValue, 10, 64)
if err != nil {
return "", err
}
if intVal > 0 && intVal < 128 { // 128 Bytes
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.GenWithStackByArgs(TiDBServerMemoryLimitSessMinSize, originalValue))
intVal = 128
}
return strconv.FormatUint(intVal, 10), nil
},
SetGlobal: func(s *SessionVars, val string) error {
intVal, err := strconv.ParseUint(val, 10, 64)
if err != nil {
return err
}
memory.ServerMemoryLimitSessMinSize.Store(intVal)
return nil
},
},
{Scope: ScopeGlobal, Name: TiDBEnableColumnTracking, Value: BoolToOnOff(DefTiDBEnableColumnTracking), Type: TypeBool, GetGlobal: func(s *SessionVars) (string, error) {
return BoolToOnOff(EnableColumnTracking.Load()), nil
}, SetGlobal: func(s *SessionVars, val string) error {
Expand Down
78 changes: 78 additions & 0 deletions sessionctx/variable/sysvar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package variable
import (
"encoding/json"
"fmt"
"math"
"strconv"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -742,3 +743,80 @@ func TestSetTIDBDiskQuota(t *testing.T) {
require.NoError(t, err)
require.Equal(t, strconv.FormatInt(pb, 10), val)
}

func TestTiDBServerMemoryQuota(t *testing.T) {
vars := NewSessionVars()
mock := NewMockGlobalAccessor4Tests()
mock.SessionVars = vars
vars.GlobalVarsAccessor = mock
var (
mb uint64 = 1 << 20
err error
val string
)
// Test tidb_server_memory_quota
serverMemoryQuota := GetSysVar(TiDBServerMemoryQuota)
// Check default value
require.Equal(t, serverMemoryQuota.Value, strconv.FormatUint(DefTiDBServerMemoryQuota, 10))

// MinValue is 512 MB
err = mock.SetGlobalSysVar(TiDBServerMemoryQuota, strconv.FormatUint(100*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryQuota)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(512*mb, 10), val)

// Test Close
err = mock.SetGlobalSysVar(TiDBServerMemoryQuota, strconv.FormatUint(0, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryQuota)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(0, 10), val)

// Test MaxValue
err = mock.SetGlobalSysVar(TiDBServerMemoryQuota, strconv.FormatUint(math.MaxUint64, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryQuota)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(math.MaxUint64, 10), val)

// Test Normal Value
err = mock.SetGlobalSysVar(TiDBServerMemoryQuota, strconv.FormatUint(1024*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryQuota)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(1024*mb, 10), val)

// Test tidb_server_memory_limit_sess_min_size
serverMemoryLimitSessMinSize := GetSysVar(TiDBServerMemoryLimitSessMinSize)
// Check default value
require.Equal(t, serverMemoryLimitSessMinSize.Value, strconv.FormatUint(DefTiDBServerMemoryLimitSessMinSize, 10))

// MinValue is 128 Bytes
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(100, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(128, 10), val)

// Test Close
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(0, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(0, 10), val)

// Test MaxValue
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(math.MaxUint64, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(math.MaxUint64, 10), val)

// Test Normal Value
err = mock.SetGlobalSysVar(TiDBServerMemoryLimitSessMinSize, strconv.FormatUint(200*mb, 10))
require.NoError(t, err)
val, err = mock.GetGlobalSysVar(TiDBServerMemoryLimitSessMinSize)
require.NoError(t, err)
require.Equal(t, strconv.FormatUint(200*mb, 10), val)
}
11 changes: 11 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx/variable/featuretag/concurrencyddl"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/paging"
"go.uber.org/atomic"
)
Expand Down Expand Up @@ -822,6 +824,10 @@ const (
TiDBDDLEnableFastReorg = "tidb_ddl_enable_fast_reorg"
// TiDBDDLDiskQuota used to set disk quota for lightning add index.
TiDBDDLDiskQuota = "tidb_ddl_disk_quota"
// TiDBServerMemoryQuota indicates the memory limit of the tidb-server instance.
TiDBServerMemoryQuota = "tidb_server_memory_quota"
// TiDBServerMemoryLimitSessMinSize indicates the minimum memory usage of the session that can be controlled.
TiDBServerMemoryLimitSessMinSize = "tidb_server_memory_limit_sess_min_size"
)

// TiDB intentional limits
Expand Down Expand Up @@ -1054,6 +1060,7 @@ const (
DefTiDBForeignKeyChecks = false
DefTiDBOptRangeMaxSize = 0
DefTiDBCostModelVer = 1
DefTiDBServerMemoryLimitSessMinSize = 128 << 20
)

// Process global variables.
Expand Down Expand Up @@ -1103,6 +1110,10 @@ var (
DDLDiskQuota = atomic.NewUint64(DefTiDBDDLDiskQuota)
// EnableForeignKey indicates whether to enable foreign key feature.
EnableForeignKey = atomic.NewBool(false)

// DefTiDBServerMemoryQuota indicates the default value of TiDBServerMemoryQuota(TotalMem * 80%).
// It should be a const and shouldn't be modified after tidb is started.
DefTiDBServerMemoryQuota = mathutil.Max(memory.GetMemTotalIgnoreErr()/10*8, 512<<20)
)

var (
Expand Down
5 changes: 5 additions & 0 deletions testkit/mocksessionmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
// MockSessionManager is a mocked session manager which is used for test.
type MockSessionManager struct {
PS []*util.ProcessInfo
PSMu sync.RWMutex
SerID uint64
TxnInfo []*txninfo.TxnInfo
conn map[uint64]session.Session
Expand All @@ -39,6 +40,8 @@ func (msm *MockSessionManager) ShowTxnList() []*txninfo.TxnInfo {

// ShowProcessList implements the SessionManager.ShowProcessList interface.
func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
ret := make(map[uint64]*util.ProcessInfo)
for _, item := range msm.PS {
ret[item.ID] = item
Expand All @@ -48,6 +51,8 @@ func (msm *MockSessionManager) ShowProcessList() map[uint64]*util.ProcessInfo {

// GetProcessInfo implements the SessionManager.GetProcessInfo interface.
func (msm *MockSessionManager) GetProcessInfo(id uint64) (*util.ProcessInfo, bool) {
msm.PSMu.RLock()
defer msm.PSMu.RUnlock()
for _, item := range msm.PS {
if item.ID == id {
return item, true
Expand Down
3 changes: 3 additions & 0 deletions util/expensivequery/expensivequery.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)
Expand Down Expand Up @@ -57,6 +58,7 @@ func (eqh *Handle) Run() {
defer ticker.Stop()
sm := eqh.sm.Load().(util.SessionManager)
record := &memoryUsageAlarm{}
serverMemoryQuota := &serverMemoryQuota{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to use an individual goroutine to monitor the global memory status? The expensive_query worker handles too many things

for {
select {
case <-ticker.C:
Expand Down Expand Up @@ -91,6 +93,7 @@ func (eqh *Handle) Run() {
if record.err == nil {
record.alarm4ExcessiveMemUsage(sm)
}
serverMemoryQuota.CheckQuotaAndKill(memory.ServerMemoryQuota.Load(), sm)
case <-eqh.exitCh:
return
}
Expand Down
58 changes: 58 additions & 0 deletions util/expensivequery/server_memory_quota.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package expensivequery

import (
"runtime"
"time"

"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/memory"
)

type serverMemoryQuota struct {
sqlStartTime time.Time
sessionID uint64
}

func (s *serverMemoryQuota) CheckQuotaAndKill(bt uint64, sm util.SessionManager) {
if s.sessionID != 0 {
if info, ok := sm.GetProcessInfo(s.sessionID); ok {
if info.Time == s.sqlStartTime {
return // Wait killing finished.
}
}
s.sessionID = 0
s.sqlStartTime = time.Time{}
//nolint: all_revive,revive
runtime.GC()
}

if bt == 0 {
return
}
instanceStats := &runtime.MemStats{}
runtime.ReadMemStats(instanceStats)
if instanceStats.HeapInuse > bt {
t := memory.MemUsageTop1Tracker.Load()
if t != nil && uint64(t.BytesConsumed()) > memory.ServerMemoryLimitSessMinSize.Load() {
if info, ok := sm.GetProcessInfo(t.SessionID); ok {
s.sessionID = t.SessionID
s.sqlStartTime = info.Time
t.IsKilled.Store(true)
}
}
}
}
8 changes: 8 additions & 0 deletions util/memory/meminfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,14 @@ var MemTotal func() (uint64, error)
// MemUsed returns the total used amount of RAM on this system
var MemUsed func() (uint64, error)

// GetMemTotalIgnoreErr returns the total amount of RAM on this system/container. If error occurs, return 0.
func GetMemTotalIgnoreErr() uint64 {
if memTotal, err := MemTotal(); err == nil {
return memTotal
}
return 0
}

// MemTotalNormal returns the total amount of RAM on this system in non-container environment.
func MemTotalNormal() (uint64, error) {
total, t := memLimit.get()
Expand Down
Loading