Skip to content

Commit

Permalink
executor: split TestIndexMergeReaderPanic to avoid case timeout (#43689)
Browse files Browse the repository at this point in the history
close #43688
  • Loading branch information
guo-shaoge authored May 10, 2023
1 parent ce1b6f9 commit f803438
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 70 deletions.
28 changes: 0 additions & 28 deletions executor/index_merge_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,16 +338,6 @@ func (e *IndexMergeReaderExecutor) startPartialIndexWorker(ctx context.Context,
util.WithRecovery(
func() {
failpoint.Inject("testIndexMergePanicPartialIndexWorker", nil)
failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) {
select {
case <-ctx.Done():
failpoint.Return()
case <-e.finished:
failpoint.Return()
case <-exitCh:
failpoint.Return()
}
})
worker := &partialIndexWorker{
stats: e.stats,
idxID: e.getPartitalPlanID(workID),
Expand Down Expand Up @@ -463,16 +453,6 @@ func (e *IndexMergeReaderExecutor) startPartialTableWorker(ctx context.Context,
util.WithRecovery(
func() {
failpoint.Inject("testIndexMergePanicPartialTableWorker", nil)
failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) {
select {
case <-ctx.Done():
failpoint.Return()
case <-e.finished:
failpoint.Return()
case <-exitCh:
failpoint.Return()
}
})
var err error
partialTableReader := &TableReaderExecutor{
baseExecutor: newBaseExecutor(e.ctx, ts.Schema(), e.getPartitalPlanID(workID)),
Expand Down Expand Up @@ -1624,14 +1604,6 @@ func (w *indexMergeTableScanWorker) pickAndExecTask(ctx context.Context, task **
// Make sure panic failpoint is after fetch task from workCh.
// Otherwise cannot send error to task.doneCh.
failpoint.Inject("testIndexMergePanicTableScanWorker", nil)
failpoint.Inject("mockSleepBeforeStartTableReader", func(_ failpoint.Value) {
select {
case <-ctx.Done():
failpoint.Return()
case <-w.finished:
failpoint.Return()
}
})
execStart := time.Now()
err := w.executeTask(ctx, *task)
if w.stats != nil {
Expand Down
2 changes: 1 addition & 1 deletion executor/test/indexmergereadtest/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ go_test(
],
flaky = True,
race = "on",
shard_count = 22,
shard_count = 29,
deps = [
"//config",
"//meta/autoid",
Expand Down
136 changes: 95 additions & 41 deletions executor/test/indexmergereadtest/index_merge_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -843,7 +843,7 @@ func TestIndexMergeProcessWorkerHang(t *testing.T) {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeProcessWorkerIntersectionHang"))
}

func TestIndexMergePanic(t *testing.T) {
func TestIndexMergePanic1(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)

Expand All @@ -855,55 +855,109 @@ func TestIndexMergePanic(t *testing.T) {
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly", "return(true)"))
tk.MustExec("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c1 < 100 or c2 < 100")
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/testIndexMergeResultChCloseEarly"))
}

setupPartitionTableHelper(tk)

var indexMergePanicRunSQL = func(t *testing.T, tk *testkit.TestKit, fp string) {
minV := 200
maxV := 1000
runSQL := func(fp string) {
var sql string
v1 := rand.Intn(maxV-minV) + minV
v2 := rand.Intn(maxV-minV) + minV
if !strings.Contains(fp, "Intersection") {
sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2)
} else {
sql = fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c3 < %d and c2 < %d", v1, v2)
}
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")
err := tk.QueryToErr(sql)
require.Contains(t, err.Error(), fp)
var sql string
v1 := rand.Intn(maxV-minV) + minV
v2 := rand.Intn(maxV-minV) + minV
if !strings.Contains(fp, "Intersection") {
sql = fmt.Sprintf("select /*+ use_index_merge(t1) */ c1 from t1 where c1 < %d or c2 < %d;", v1, v2)
} else {
sql = fmt.Sprintf("select /*+ use_index_merge(t1, primary, c2, c3) */ c1 from t1 where c3 < %d and c2 < %d", v1, v2)
}
res := tk.MustQuery("explain " + sql).Rows()
require.Contains(t, res[1][0], "IndexMerge")
err := tk.QueryToErr(sql)
require.Contains(t, err.Error(), fp)
}

packagePath := "github.com/pingcap/tidb/executor/"
panicFPPaths := []string{
packagePath + "testIndexMergePanicPartialIndexWorker",
packagePath + "testIndexMergePanicPartialTableWorker",
func TestIndexMergePanicPartialIndexWorker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

packagePath + "testIndexMergePanicProcessWorkerUnion",
packagePath + "testIndexMergePanicProcessWorkerIntersection",
packagePath + "testIndexMergePanicPartitionTableIntersectionWorker",
fp := "github.com/pingcap/tidb/executor/testIndexMergePanicPartialIndexWorker"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
}

packagePath + "testIndexMergePanicTableScanWorker",
func TestIndexMergePanicPartialTableWorker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

fp := "github.com/pingcap/tidb/executor/testIndexMergePanicPartialTableWorker"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
for _, fp := range panicFPPaths {
fmt.Println("handling failpoint: ", fp)
if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") {
// When mockSleepBeforeStartTableReader is enabled, will not read real data. This is to avoid leaking goroutines in coprocessor.
// But should disable mockSleepBeforeStartTableReader for testIndexMergePanicTableScanWorker.
// Because finalTableScanWorker need task.doneCh to pass error, so need partialIndexWorker/partialTableWorker runs normally.
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader", "return(1000)"))
}
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
runSQL(fp)
require.NoError(t, failpoint.Disable(fp))
}
if !strings.Contains(fp, "testIndexMergePanicTableScanWorker") {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/executor/mockSleepBeforeStartTableReader"))
}
}

func TestIndexMergePanicPartialProcessWorkerUnion(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

fp := "github.com/pingcap/tidb/executor/testIndexMergePanicProcessWorkerUnion"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergePanicPartialProcessWorkerIntersection(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

fp := "github.com/pingcap/tidb/executor/testIndexMergePanicProcessWorkerIntersection"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergePanicPartitionTableIntersectionWorker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

fp := "github.com/pingcap/tidb/executor/testIndexMergePanicPartitionTableIntersectionWorker"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergePanicTableScanWorker(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

fp := "github.com/pingcap/tidb/executor/testIndexMergePanicTableScanWorker"
for i := 0; i < 1000; i++ {
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`panic("%s")`, fp)))
indexMergePanicRunSQL(t, tk, fp)
require.NoError(t, failpoint.Disable(fp))
}
}

func TestIndexMergeError(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
setupPartitionTableHelper(tk)

packagePath := "github.com/pingcap/tidb/executor/"
errFPPaths := []string{
packagePath + "testIndexMergeErrorPartialIndexWorker",
packagePath + "testIndexMergeErrorPartialTableWorker",
Expand All @@ -912,7 +966,7 @@ func TestIndexMergePanic(t *testing.T) {
fmt.Println("handling failpoint: ", fp)
require.NoError(t, failpoint.Enable(fp, fmt.Sprintf(`return("%s")`, fp)))
for i := 0; i < 100; i++ {
runSQL(fp)
indexMergePanicRunSQL(t, tk, fp)
}
require.NoError(t, failpoint.Disable(fp))
}
Expand Down

0 comments on commit f803438

Please sign in to comment.