Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

executor,store/copr: fix index merge, distsql request's key ranges should be sorted #36633

Merged
merged 13 commits into from
Jul 29, 2022
8 changes: 8 additions & 0 deletions executor/distsql.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -304,6 +305,9 @@ func (e *IndexReaderExecutor) open(ctx context.Context, kvRanges []kv.KeyRange)

e.memTracker = memory.NewTracker(e.id, -1)
e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)
slices.SortFunc(kvRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
var builder distsql.RequestBuilder
builder.SetKeyRanges(kvRanges).
SetDAGRequest(e.dagPB).
Expand Down Expand Up @@ -612,6 +616,10 @@ func (e *IndexLookUpExecutor) startIndexWorker(ctx context.Context, workCh chan<
}

// init kvReq, result and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(kvRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(kvRange).Build()
if err != nil {
worker.syncErr(err)
Expand Down
47 changes: 47 additions & 0 deletions executor/distsql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"testing"
"time"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
Expand Down Expand Up @@ -581,3 +582,49 @@ func TestAdaptiveClosestRead(t *testing.T) {
// 2 IndexScan with cost 19/56, 2 TableReader with cost 32.5/65.
checkMetrics("select/* +USE_INDEX_MERGE(t) */ id from t use index(`idx_v_s1`) use index(idx_s2) where (s1 < 3 and v > 0) or s2 = 3;", 3, 1)
}

func TestCoprocessorPagingReqKeyRangeSorted(t *testing.T) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added test can only cover (literally) the added code.
It can't prevent regression in the future when code changes.

store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)

require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging", "return"))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/store/copr/checkKeyRangeSortedForPaging"))
}()

tk.MustExec("use test")
tk.MustExec("CREATE TABLE `UK_COLLATION19523` (" +
"`COL1` binary(1) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"UNIQUE KEY `U_COL1` (`COL1`)" +
") ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_general_ci")

tk.MustExec("prepare stmt from 'SELECT/*+ HASH_JOIN(t1, t2) */ * FROM UK_COLLATION19523 t1 JOIN UK_COLLATION19523 t2 ON t1.col1 > t2.col1 WHERE t1.col1 IN (?, ?, ?) AND t2.col1 < ?;';")
tk.MustExec("set @a=0x4F, @b=0xF8, @c=NULL, @d=0xBF;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0x00, @b=0xD2, @c=9179987834981541375, @d=0xF8;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")

tk.MustExec("CREATE TABLE `IDT_COLLATION26873` (" +
"`COL1` varbinary(20) DEFAULT NULL," +
"`COL2` varchar(20) COLLATE utf8_general_ci DEFAULT NULL," +
"`COL4` datetime DEFAULT NULL," +
"`COL3` bigint(20) DEFAULT NULL," +
"`COL5` float DEFAULT NULL," +
"KEY `U_COL1` (`COL1`))")
tk.MustExec("prepare stmt from 'SELECT/*+ INL_JOIN(t1, t2) */ t2.* FROM IDT_COLLATION26873 t1 LEFT JOIN IDT_COLLATION26873 t2 ON t1.col1 = t2.col1 WHERE t1.col1 < ? AND t1.col1 IN (?, ?, ?);';")
tk.MustExec("set @a=NULL, @b=NULL, @c=NULL, @d=NULL;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xE3253A6AC72A3A168EAF0E34A4779A947872CCCD, @b=0xD67BB26504EE152C2C356D7F6CAD897F03462963, @c=NULL, @d=0xDE735FEB375A4CF33479A39CA925470BFB229DB4;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=2606738829406840179, @b=1468233589368287363, @c=5174008984061521089, @d=7727946571160309462;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=NULL, @c=6864108002939154648, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
tk.MustExec("set @a=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @b=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @c=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE, @d=0xFCABFE6198B6323EE8A46247EDD33830453B1BDE;")
tk.MustExec("execute stmt using @a,@b,@c,@d;")
}
5 changes: 5 additions & 0 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/pingcap/tidb/util/ranger"
"github.com/pingcap/tipb/go-tipb"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var (
Expand Down Expand Up @@ -324,6 +325,10 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
}

// init kvReq and worker for this partition
// The key ranges should be ordered.
slices.SortFunc(keyRange, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
kvReq, err := builder.SetKeyRanges(keyRange).Build()
if err != nil {
worker.syncErr(e.resultCh, err)
Expand Down
14 changes: 14 additions & 0 deletions store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package copr

import (
"bytes"
"context"
"fmt"
"strconv"
Expand Down Expand Up @@ -51,6 +52,7 @@ import (
"github.com/tikv/client-go/v2/txnkv/txnsnapshot"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
"golang.org/x/exp/slices"
)

var coprCacheCounterEvict = tidbmetrics.DistSQLCoprCacheCounter.WithLabelValues("evict")
Expand Down Expand Up @@ -92,6 +94,18 @@ func (c *CopClient) Send(ctx context.Context, req *kv.Request, variables interfa
// coprocessor request but type is not DAG
req.Paging = false
}

failpoint.Inject("checkKeyRangeSortedForPaging", func(_ failpoint.Value) {
if req.Paging {
isSorted := slices.IsSortedFunc(req.KeyRanges, func(i, j kv.KeyRange) bool {
return bytes.Compare(i.StartKey, j.StartKey) < 0
})
if !isSorted {
logutil.BgLogger().Fatal("distsql request key range not sorted!")
}
}
})

ctx = context.WithValue(ctx, tikv.TxnStartKey(), req.StartTs)
ctx = context.WithValue(ctx, util.RequestSourceKey, req.RequestSource)
bo := backoff.NewBackofferWithVars(ctx, copBuildTaskMaxBackoff, vars)
Expand Down