Skip to content

Commit

Permalink
kv: reduce the distsql concurrency if the runaway query action is coo…
Browse files Browse the repository at this point in the history
…l down

Signed-off-by: nolouch <[email protected]>
  • Loading branch information
nolouch committed Aug 8, 2024
1 parent 4cfc182 commit b68a63a
Show file tree
Hide file tree
Showing 5 changed files with 143 additions and 7 deletions.
28 changes: 21 additions & 7 deletions pkg/domain/resourcegroup/runaway.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,20 @@ func (rm *RunawayManager) GetWatchList() []*QuarantineRecord {
return ret
}

// CheckAction is used to check current action of the query.
func (rm *RunawayChecker) CheckAction() rmpb.RunawayAction {
if rm == nil || rm.setting == nil {
return rmpb.RunawayAction_NoneAction
}
if rm.markedByRule.Load() {
return rm.setting.Action
}
if rm.markedByWatch.Load() {
return rm.watchAction
}
return rmpb.RunawayAction_NoneAction
}

// AddWatch is used to add watch items from system table.
func (rm *RunawayManager) AddWatch(record *QuarantineRecord) {
ttl := time.Until(record.EndTime)
Expand Down Expand Up @@ -473,7 +487,7 @@ type RunawayChecker struct {
setting *rmpb.RunawaySettings

markedByRule atomic.Bool
markedByWatch bool
markedByWatch atomic.Bool
watchAction rmpb.RunawayAction
}

Expand All @@ -486,7 +500,7 @@ func newRunawayChecker(manager *RunawayManager, resourceGroupName string, settin
planDigest: planDigest,
setting: setting,
markedByRule: atomic.Bool{},
markedByWatch: false,
markedByWatch: atomic.Bool{},
}
if setting != nil {
c.deadline = startTime.Add(time.Duration(setting.Rule.ExecElapsedTimeMs) * time.Millisecond)
Expand All @@ -505,7 +519,7 @@ func (r *RunawayChecker) BeforeExecutor() error {
if action == rmpb.RunawayAction_NoneAction && r.setting != nil {
action = r.setting.Action
}
r.markedByWatch = true
r.markedByWatch.Store(true)
now := time.Now()
r.watchAction = action
r.markRunaway(RunawayMatchTypeWatch, action, &now)
Expand All @@ -527,13 +541,13 @@ func (r *RunawayChecker) BeforeExecutor() error {

// BeforeCopRequest checks runaway and modifies the request if necessary before sending coprocessor request.
func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
if r.setting == nil && !r.markedByWatch {
if r.setting == nil && !r.markedByWatch.Load() {
return nil
}
marked := r.markedByRule.Load()
if !marked {
// note: now we don't check whether query is in watch list again.
if r.markedByWatch {
if r.markedByWatch.Load() {
if r.watchAction == rmpb.RunawayAction_CoolDown {
req.ResourceControlContext.OverridePriority = 1 // set priority to lowest
}
Expand All @@ -553,7 +567,7 @@ func (r *RunawayChecker) BeforeCopRequest(req *tikvrpc.Request) error {
// execution time exceeds the threshold, mark the query as runaway
if r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
if !r.markedByWatch {
if !r.markedByWatch.Load() {
r.markQuarantine(&now)
}
}
Expand Down Expand Up @@ -581,7 +595,7 @@ func (r *RunawayChecker) CheckCopRespError(err error) error {
now := time.Now()
if r.deadline.Before(now) && r.markedByRule.CompareAndSwap(false, true) {
r.markRunaway(RunawayMatchTypeIdentify, r.setting.Action, &now)
if !r.markedByWatch {
if !r.markedByWatch.Load() {
r.markQuarantine(&now)
}
return exeerrors.ErrResourceGroupQueryRunawayInterrupted
Expand Down
1 change: 1 addition & 0 deletions pkg/store/copr/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ go_library(
"@com_github_pingcap_kvproto//pkg/kvrpcpb",
"@com_github_pingcap_kvproto//pkg/metapb",
"@com_github_pingcap_kvproto//pkg/mpp",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_pingcap_log//:log",
"@com_github_pingcap_tipb//go-tipb",
"@com_github_tikv_client_go_v2//config",
Expand Down
6 changes: 6 additions & 0 deletions pkg/store/copr/copr_test/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,22 @@ go_test(
"main_test.go",
],
flaky = True,
shard_count = 3,
deps = [
"//pkg/config",
"//pkg/domain/resourcegroup",
"//pkg/kv",
"//pkg/store/copr",
"//pkg/store/mockstore",
"//pkg/testkit/testmain",
"//pkg/testkit/testsetup",
"@com_github_pingcap_kvproto//pkg/meta_storagepb",
"@com_github_pingcap_kvproto//pkg/resource_manager",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//testutils",
"@com_github_tikv_client_go_v2//tikv",
"@com_github_tikv_pd_client//:client",
"@com_github_tikv_pd_client//resource_group/controller",
"@org_uber_go_goleak//:goleak",
],
)
105 changes: 105 additions & 0 deletions pkg/store/copr/copr_test/coprocessor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,23 @@
package copr_test

import (
"bytes"
"context"
"encoding/json"
"errors"
"testing"
"time"

"github.com/pingcap/kvproto/pkg/meta_storagepb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/store/copr"
"github.com/pingcap/tidb/pkg/store/mockstore"
"github.com/stretchr/testify/require"
"github.com/tikv/client-go/v2/testutils"
pd "github.com/tikv/pd/client"
rmclient "github.com/tikv/pd/client/resource_group/controller"
)

func TestBuildCopIteratorWithRowCountHint(t *testing.T) {
Expand Down Expand Up @@ -182,3 +191,99 @@ func TestBuildCopIteratorWithBatchStoreCopr(t *testing.T) {
require.Equal(t, len(tasks[0].ToPBBatchTasks()), 1)
require.Equal(t, len(tasks[1].ToPBBatchTasks()), 0)
}

type mockResourceGroupProvider struct {
rmclient.ResourceGroupProvider
cfg rmclient.Config
}

func (p *mockResourceGroupProvider) Get(ctx context.Context, key []byte, opts ...pd.OpOption) (*meta_storagepb.GetResponse, error) {
if !bytes.Equal(pd.ControllerConfigPathPrefixBytes, key) {
return nil, errors.New("unsupported configPath")
}
payload, _ := json.Marshal(&p.cfg)
return &meta_storagepb.GetResponse{
Count: 1,
Kvs: []*meta_storagepb.KeyValue{
{
Key: key,
Value: payload,
},
},
}, nil
}

func (p *mockResourceGroupProvider) GetResourceGroup(ctx context.Context, name string, opts ...pd.GetResourceGroupOption) (*rmpb.ResourceGroup, error) {
group1 := "rg1"
if name == group1 {
return &rmpb.ResourceGroup{
Name: group1,
Mode: rmpb.GroupMode_RUMode,
RUSettings: &rmpb.GroupRequestUnitSettings{
RU: &rmpb.TokenBucket{
Settings: &rmpb.TokenLimitSettings{
FillRate: 2000,
BurstLimit: 2000,
},
},
},
RunawaySettings: &rmpb.RunawaySettings{
Rule: &rmpb.RunawayRule{
ExecElapsedTimeMs: 1000,
},
Action: rmpb.RunawayAction_DryRun,
},
}, nil
}
return nil, errors.New("not found")
}

func TestBuildCopIteratorWithRunawayChecker(t *testing.T) {
// nil --- 'g' --- 'n' --- 't' --- nil
// <- 0 -> <- 1 -> <- 2 -> <- 3 ->
store, err := mockstore.NewMockStore(
mockstore.WithClusterInspector(func(c testutils.Cluster) {
mockstore.BootstrapWithMultiRegions(c, []byte("g"), []byte("n"), []byte("t"))
}),
)
require.NoError(t, err)
defer require.NoError(t, store.Close())
copClient := store.GetClient().(*copr.CopClient)
ctx := context.Background()
killed := uint32(0)
vars := kv.NewVariables(&killed)
opt := &kv.ClientSendOption{}
mockPrivider := &mockResourceGroupProvider{
cfg: *rmclient.DefaultConfig(),
}

ranges := copr.BuildKeyRanges("a", "c", "d", "e", "h", "x", "y", "z")
resourceCtl, err := rmclient.NewResourceGroupController(context.Background(), 1, mockPrivider, nil)
require.NoError(t, err)
manager := resourcegroup.NewRunawayManager(resourceCtl, "mock://test")
defer manager.Stop()

sql := "select * from t"
group1 := "rg1"
checker := manager.DeriveChecker(group1, sql, "", "", time.Now())
manager.AddWatch(&resourcegroup.QuarantineRecord{
ID: 1,
ResourceGroupName: group1,
Watch: rmpb.RunawayWatchType_Exact,
WatchText: sql,
Action: rmpb.RunawayAction_CoolDown,
})
req := &kv.Request{
Tp: kv.ReqTypeDAG,
KeyRanges: kv.NewNonParitionedKeyRangesWithHint(ranges, []int{1, 1, 3, 3}),
Concurrency: 15,
RunawayChecker: checker,
ResourceGroupName: group1,
}
checker.BeforeExecutor()
it, errRes := copClient.BuildCopIterator(ctx, req, vars, opt)
require.Nil(t, errRes)
concurrency, smallTaskConcurrency := it.GetConcurrency()
require.Equal(t, concurrency, 1)
require.Equal(t, smallTaskConcurrency, 0)
}
10 changes: 10 additions & 0 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
rmpb "github.com/pingcap/kvproto/pkg/resource_manager"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain/infosync"
"github.com/pingcap/tidb/pkg/domain/resourcegroup"
Expand Down Expand Up @@ -219,6 +220,15 @@ func (c *CopClient) BuildCopIterator(ctx context.Context, req *kv.Request, vars
it.concurrency = 1
}

// if the request is triggered cool down by the runaway checker, we need to adjust the concurrency, let the sql run slowly.\
if len(req.ResourceGroupName) > 0 && req.ResourceGroupName != resourcegroup.DefaultResourceGroupName && req.RunawayChecker != nil {
action := req.RunawayChecker.CheckAction()
if action == rmpb.RunawayAction_CoolDown {
it.concurrency = 1
it.smallTaskConcurrency = 0
}
}

if it.req.KeepOrder {
if it.smallTaskConcurrency > 20 {
it.smallTaskConcurrency = 20
Expand Down

0 comments on commit b68a63a

Please sign in to comment.