From b5de819d00be8630e237ce4fd78d854bcb71e9c5 Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Fri, 15 Apr 2022 12:48:35 +0800 Subject: [PATCH] util: fix memory.reArrangeFallback cpu usage (#30414) close pingcap/tidb#30353 --- executor/executor_test.go | 14 +++----------- executor/sort.go | 3 +++ session/session_test.go | 4 ++-- store/copr/coprocessor.go | 1 + util/chunk/row_container.go | 1 + util/memory/action.go | 21 ++++++++++++++++++++- util/memory/tracker.go | 8 +++++++- util/memory/tracker_test.go | 19 +++++++++++++++++++ 8 files changed, 56 insertions(+), 15 deletions(-) diff --git a/executor/executor_test.go b/executor/executor_test.go index fbee188352cee..1cf2ab5636da5 100644 --- a/executor/executor_test.go +++ b/executor/executor_test.go @@ -3294,17 +3294,9 @@ func TestOOMActionPriority(t *testing.T) { tk.MustExec("create table t4(a int)") tk.MustExec("insert into t4 values(1)") tk.MustQuery("select * from t0 join t1 join t2 join t3 join t4 order by t0.a").Check(testkit.Rows("1 1 1 1 1")) - action := tk.Session().GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest() - // check the first 5 actions is rate limit. - for i := 0; i < 5; i++ { - require.Equal(t, int64(memory.DefRateLimitPriority), action.GetPriority()) - action = action.GetFallback() - } - for action.GetFallback() != nil { - require.Equal(t, int64(memory.DefSpillPriority), action.GetPriority()) - action = action.GetFallback() - } - require.Equal(t, int64(memory.DefLogPriority), action.GetPriority()) + action := tk.Session().GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(true) + // All actions are finished and removed. + require.Equal(t, action.GetPriority(), int64(memory.DefLogPriority)) } func TestTrackAggMemoryUsage(t *testing.T) { diff --git a/executor/sort.go b/executor/sort.go index 80d098dbd5145..d60f45afc0ca4 100644 --- a/executor/sort.go +++ b/executor/sort.go @@ -77,6 +77,9 @@ func (e *SortExec) Close() error { e.memTracker = nil e.diskTracker = nil e.multiWayMerge = nil + if e.spillAction != nil { + e.spillAction.SetFinished() + } e.spillAction = nil return e.children[0].Close() } diff --git a/session/session_test.go b/session/session_test.go index 596fd40be6e7e..13d19127c371b 100644 --- a/session/session_test.go +++ b/session/session_test.go @@ -3965,7 +3965,7 @@ func (s *testSessionSuite2) TestSetEnableRateLimitAction(c *C) { tk.MustExec("create table tmp123(id int)") tk.MustQuery("select * from tmp123;") haveRateLimitAction := false - action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest() + action := tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(false) for ; action != nil; action = action.GetFallback() { if action.GetPriority() == memory.DefRateLimitPriority { haveRateLimitAction = true @@ -3985,7 +3985,7 @@ func (s *testSessionSuite2) TestSetEnableRateLimitAction(c *C) { result.Check(testkit.Rows("0")) haveRateLimitAction = false - action = tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest() + action = tk.Se.GetSessionVars().StmtCtx.MemTracker.GetFallbackForTest(false) for ; action != nil; action = action.GetFallback() { if action.GetPriority() == memory.DefRateLimitPriority { haveRateLimitAction = true diff --git a/store/copr/coprocessor.go b/store/copr/coprocessor.go index dba00aa935c60..a6e08a1ccef0d 100644 --- a/store/copr/coprocessor.go +++ b/store/copr/coprocessor.go @@ -1313,6 +1313,7 @@ func (e *rateLimitAction) close() { e.conditionLock() defer e.conditionUnlock() e.cond.exceeded = false + e.SetFinished() } func (e *rateLimitAction) setEnabled(enabled bool) { diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 731b84f6c8ca7..3407022f4718f 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -277,6 +277,7 @@ func (c *RowContainer) Close() (err error) { // Set status to spilledYet to avoid spilling. c.actionSpill.setStatus(spilledYet) c.actionSpill.cond.Broadcast() + c.actionSpill.SetFinished() } if c.alreadySpilled() { err = c.m.records.inDisk.Close() diff --git a/util/memory/action.go b/util/memory/action.go index b4c9fb18f4277..80b945e212c45 100644 --- a/util/memory/action.go +++ b/util/memory/action.go @@ -17,6 +17,7 @@ package memory import ( "fmt" "sync" + "sync/atomic" "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/util/dbterror" @@ -40,11 +41,16 @@ type ActionOnExceed interface { GetFallback() ActionOnExceed // GetPriority get the priority of the Action. GetPriority() int64 + // SetFinished sets the finished state of the Action. + SetFinished() + // IsFinished returns the finished state of the Action. + IsFinished() bool } // BaseOOMAction manages the fallback action for all Action. type BaseOOMAction struct { fallbackAction ActionOnExceed + finished int32 } // SetFallback sets a fallback action which will be triggered if itself has @@ -53,8 +59,21 @@ func (b *BaseOOMAction) SetFallback(a ActionOnExceed) { b.fallbackAction = a } -// GetFallback get the fallback action of the Action. +// SetFinished sets the finished state of the Action. +func (b *BaseOOMAction) SetFinished() { + atomic.StoreInt32(&b.finished, 1) +} + +// IsFinished returns the finished state of the Action. +func (b *BaseOOMAction) IsFinished() bool { + return atomic.LoadInt32(&b.finished) == 1 +} + +// GetFallback get the fallback action and remove finished fallback. func (b *BaseOOMAction) GetFallback() ActionOnExceed { + for b.fallbackAction != nil && b.fallbackAction.IsFinished() { + b.SetFallback(b.fallbackAction.GetFallback()) + } return b.fallbackAction } diff --git a/util/memory/tracker.go b/util/memory/tracker.go index d081cdf4efc15..bf76cbba37567 100644 --- a/util/memory/tracker.go +++ b/util/memory/tracker.go @@ -173,9 +173,12 @@ func (t *Tracker) FallbackOldAndSetNewActionForSoftLimit(a ActionOnExceed) { } // GetFallbackForTest get the oom action used by test. -func (t *Tracker) GetFallbackForTest() ActionOnExceed { +func (t *Tracker) GetFallbackForTest(ignoreFinishedAction bool) ActionOnExceed { t.actionMuForHardLimit.Lock() defer t.actionMuForHardLimit.Unlock() + if t.actionMuForHardLimit.actionOnExceed != nil && t.actionMuForHardLimit.actionOnExceed.IsFinished() && ignoreFinishedAction { + t.actionMuForHardLimit.actionOnExceed = t.actionMuForHardLimit.actionOnExceed.GetFallback() + } return t.actionMuForHardLimit.actionOnExceed } @@ -332,6 +335,9 @@ func (t *Tracker) Consume(bytes int64) { tryAction := func(mu *actionMu, tracker *Tracker) { mu.Lock() defer mu.Unlock() + for mu.actionOnExceed != nil && mu.actionOnExceed.IsFinished() { + mu.actionOnExceed = mu.actionOnExceed.GetFallback() + } if mu.actionOnExceed != nil { mu.actionOnExceed.Action(tracker) } diff --git a/util/memory/tracker_test.go b/util/memory/tracker_test.go index f0ce70b55f56c..3bb03b032bb98 100644 --- a/util/memory/tracker_test.go +++ b/util/memory/tracker_test.go @@ -116,6 +116,25 @@ func TestOOMAction(t *testing.T) { require.True(t, action1.called) require.True(t, action2.called) // SoftLimit fallback require.True(t, action3.called) // HardLimit + + // test fallback + action1 = &mockAction{} + action2 = &mockAction{} + action3 = &mockAction{} + action4 := &mockAction{} + action5 := &mockAction{} + tracker.SetActionOnExceed(action1) + tracker.FallbackOldAndSetNewAction(action2) + tracker.FallbackOldAndSetNewAction(action3) + tracker.FallbackOldAndSetNewAction(action4) + tracker.FallbackOldAndSetNewAction(action5) + require.Equal(t, action1, tracker.actionMuForHardLimit.actionOnExceed) + require.Equal(t, action2, tracker.actionMuForHardLimit.actionOnExceed.GetFallback()) + action2.SetFinished() + require.Equal(t, action3, tracker.actionMuForHardLimit.actionOnExceed.GetFallback()) + action3.SetFinished() + action4.SetFinished() + require.Equal(t, action5, tracker.actionMuForHardLimit.actionOnExceed.GetFallback()) } type mockAction struct {