Skip to content

Commit

Permalink
Merge branch 'master' into issue-30922
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored Dec 22, 2021
2 parents dbbb28c + 9063d3b commit a3b267b
Show file tree
Hide file tree
Showing 39 changed files with 1,300 additions and 181 deletions.
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ dev: checklist check explaintest devgotest gogenerate br_unit_test test_part_par
# Install the check tools.
check-setup:tools/bin/revive tools/bin/goword

check: fmt unconvert lint tidy testSuite check-static vet errdoc
check: fmt check-parallel unconvert lint tidy testSuite check-static vet errdoc

fmt:
@echo "gofmt (simplify)"
Expand Down Expand Up @@ -75,6 +75,13 @@ testSuite:
@echo "testSuite"
./tools/check/check_testSuite.sh

check-parallel:
# Make sure no tests are run in parallel to prevent possible unstable tests.
# See https://github.com/pingcap/tidb/pull/30692.
@! find . -name "*_test.go" -not -path "./vendor/*" -print0 | \
xargs -0 grep -F -n "t.Parallel()" || \
! echo "Error: all the go tests should be run in serial."

clean: failpoint-disable
$(GO) clean -i ./...

Expand Down
4 changes: 0 additions & 4 deletions br/pkg/task/backup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
)

func TestParseTSString(t *testing.T) {
t.Parallel()

var (
ts uint64
err error
Expand All @@ -33,8 +31,6 @@ func TestParseTSString(t *testing.T) {
}

func TestParseCompressionType(t *testing.T) {
t.Parallel()

var (
ct backup.CompressionType
err error
Expand Down
5 changes: 0 additions & 5 deletions br/pkg/task/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ func (f fakeValue) Type() string {
}

func TestUrlNoQuery(t *testing.T) {
t.Parallel()
flag := &pflag.Flag{
Name: flagStorage,
Value: fakeValue("s3://some/what?secret=a123456789&key=987654321"),
Expand All @@ -40,7 +39,6 @@ func TestUrlNoQuery(t *testing.T) {
}

func TestTiDBConfigUnchanged(t *testing.T) {
t.Parallel()
cfg := config.GetGlobalConfig()
restoreConfig := enableTiDBConfig()
require.NotEqual(t, config.GetGlobalConfig(), cfg)
Expand All @@ -49,7 +47,6 @@ func TestTiDBConfigUnchanged(t *testing.T) {
}

func TestStripingPDURL(t *testing.T) {
t.Parallel()
nor1, err := normalizePDURL("https://pd:5432", true)
require.NoError(t, err)
require.Equal(t, "pd:5432", nor1)
Expand All @@ -68,7 +65,6 @@ func TestStripingPDURL(t *testing.T) {
}

func TestCheckCipherKeyMatch(t *testing.T) {
t.Parallel()
cases := []struct {
CipherType encryptionpb.EncryptionMethod
CipherKey string
Expand Down Expand Up @@ -125,7 +121,6 @@ func TestCheckCipherKeyMatch(t *testing.T) {
}

func TestCheckCipherKey(t *testing.T) {
t.Parallel()
cases := []struct {
cipherKey string
keyFile string
Expand Down
2 changes: 0 additions & 2 deletions br/pkg/task/restore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ import (
)

func TestRestoreConfigAdjust(t *testing.T) {
t.Parallel()

cfg := &RestoreConfig{}
cfg.adjustRestoreConfig()

Expand Down
4 changes: 0 additions & 4 deletions br/pkg/utils/backoff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,6 @@ func TestPdBackoffWithRetryableError(t *testing.T) {
}

func TestNewImportSSTBackofferWithSucess(t *testing.T) {
t.Parallel()

var counter int
backoffer := utils.NewImportSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
Expand All @@ -142,8 +140,6 @@ func TestNewImportSSTBackofferWithSucess(t *testing.T) {
}

func TestNewDownloadSSTBackofferWithCancel(t *testing.T) {
t.Parallel()

var counter int
backoffer := utils.NewDownloadSSTBackoffer()
err := utils.WithRetry(context.Background(), func() error {
Expand Down
4 changes: 3 additions & 1 deletion ddl/db_change_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,7 +411,9 @@ func (t *testExecInfo) compileSQL(idx int) (err error) {
compiler := executor.Compiler{Ctx: c.session}
se := c.session
ctx := context.TODO()
se.PrepareTxnCtx(ctx)
if err = se.PrepareTxnCtx(ctx); err != nil {
return err
}
sctx := se.(sessionctx.Context)
if err = executor.ResetContextOfStmt(sctx, c.rawStmt); err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion ddl/placement/bundle_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func (s *testBundleSuite) TestString(c *C) {
c.Assert(err, IsNil)
bundle.Rules = append(rules1, rules2...)

c.Assert(bundle.String(), Equals, `{"group_id":"TiDB_DDL_1","group_index":0,"group_override":false,"rules":[{"group_id":"","id":"","start_key":"","end_key":"","role":"voter","count":3,"label_constraints":[{"key":"zone","op":"in","values":["sh"]}],"location_labels":["region","zone","rack","host"],"isolation_level":"region"},{"group_id":"","id":"","start_key":"","end_key":"","role":"voter","count":4,"label_constraints":[{"key":"zone","op":"notIn","values":["sh"]},{"key":"zone","op":"in","values":["bj"]}],"location_labels":["region","zone","rack","host"],"isolation_level":"region"}]}`)
c.Assert(bundle.String(), Equals, `{"group_id":"TiDB_DDL_1","group_index":0,"group_override":false,"rules":[{"group_id":"","id":"","start_key":"","end_key":"","role":"voter","count":3,"label_constraints":[{"key":"zone","op":"in","values":["sh"]}],"location_labels":["region","zone","rack","host"]},{"group_id":"","id":"","start_key":"","end_key":"","role":"voter","count":4,"label_constraints":[{"key":"zone","op":"notIn","values":["sh"]},{"key":"zone","op":"in","values":["bj"]}],"location_labels":["region","zone","rack","host"]}]}`)

c.Assert(failpoint.Enable("github.com/pingcap/tidb/ddl/placement/MockMarshalFailure", `return(true)`), IsNil)
defer func() {
Expand Down
1 change: 0 additions & 1 deletion ddl/placement/rule.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ func NewRule(role PeerRoleType, replicas uint64, cnst Constraints) *Rule {
Count: int(replicas),
Constraints: cnst,
LocationLabels: []string{"region", "zone", "rack", "host"},
IsolationLevel: "region",
}
}

Expand Down
26 changes: 26 additions & 0 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/pingcap/tidb/plugin"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/execdetails"
Expand Down Expand Up @@ -263,6 +264,12 @@ func (a *ExecStmt) PointGet(ctx context.Context, is infoschema.InfoSchema) (*rec
a.PsStmt.Executor = newExecutor
}
pointExecutor := a.PsStmt.Executor.(*PointGetExecutor)

failpoint.Inject("assertTxnManagerInShortPointGetPlan", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInShortPointGetPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, is)
})

if err = pointExecutor.Open(ctx); err != nil {
terror.Call(pointExecutor.Close)
return nil, err
Expand Down Expand Up @@ -298,6 +305,16 @@ func (a *ExecStmt) RebuildPlan(ctx context.Context) (int64, error) {
if err := plannercore.Preprocess(a.Ctx, a.StmtNode, plannercore.InTxnRetry, plannercore.WithPreprocessorReturn(ret)); err != nil {
return 0, err
}

failpoint.Inject("assertTxnManagerInRebuildPlan", func() {
if is, ok := a.Ctx.Value(sessiontxn.AssertTxnInfoSchemaAfterRetryKey).(infoschema.InfoSchema); ok {
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is)
a.Ctx.SetValue(sessiontxn.AssertTxnInfoSchemaAfterRetryKey, nil)
}
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerInRebuildPlan", true)
sessiontxn.AssertTxnManagerInfoSchema(a.Ctx, ret.InfoSchema)
})

a.InfoSchema = ret.InfoSchema
a.SnapshotTS = ret.LastSnapshotTS
a.IsStaleness = ret.IsStaleness
Expand Down Expand Up @@ -755,6 +772,10 @@ func (a *ExecStmt) handlePessimisticLockError(ctx context.Context, err error) (E
a.Ctx.GetSessionVars().StmtCtx.ResetForRetry()
a.Ctx.GetSessionVars().RetryInfo.ResetOffset()

failpoint.Inject("assertTxnManagerAfterPessimisticLockErrorRetry", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterPessimisticLockErrorRetry", true)
})

if err = e.Open(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -809,6 +830,11 @@ func (a *ExecStmt) buildExecutor() (Executor, error) {
return nil, errors.Trace(b.err)
}

failpoint.Inject("assertTxnManagerAfterBuildExecutor", func() {
sessiontxn.RecordAssert(a.Ctx, "assertTxnManagerAfterBuildExecutor", true)
sessiontxn.AssertTxnManagerInfoSchema(b.ctx, b.is)
})

// ExecuteExec is not a real Executor, we only use it to build another Executor from a prepared statement.
if executorExec, ok := e.(*ExecuteExec); ok {
err := executorExec.Build(b)
Expand Down
13 changes: 12 additions & 1 deletion executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
)

var (
Expand Down Expand Up @@ -57,11 +58,21 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (*ExecStm

ret := &plannercore.PreprocessorReturn{}
pe := &plannercore.PreprocessExecuteISUpdate{ExecuteInfoSchemaUpdate: planner.GetExecuteForUpdateReadIS, Node: stmtNode}
err := plannercore.Preprocess(c.Ctx, stmtNode, plannercore.WithPreprocessorReturn(ret), plannercore.WithExecuteInfoSchemaUpdate(pe))
err := plannercore.Preprocess(c.Ctx,
stmtNode,
plannercore.WithPreprocessorReturn(ret),
plannercore.WithExecuteInfoSchemaUpdate(pe),
plannercore.InitTxnContextProvider,
)
if err != nil {
return nil, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(c.Ctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(c.Ctx, ret.InfoSchema)
})

finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, ret.InfoSchema)
if err != nil {
return nil, err
Expand Down
3 changes: 1 addition & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/parser/terror"
"github.com/pingcap/tidb/planner"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/privilege"
"github.com/pingcap/tidb/sessionctx"
Expand Down Expand Up @@ -1718,7 +1717,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.MemTracker.SetActionOnExceed(action)
}
if execStmt, ok := s.(*ast.ExecuteStmt); ok {
prepareStmt, err := planner.GetPreparedStmt(execStmt, vars)
prepareStmt, err := plannercore.GetPreparedStmt(execStmt, vars)
if err != nil {
return err
}
Expand Down
7 changes: 7 additions & 0 deletions executor/prepared.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"time"

"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/log"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/infoschema"
Expand All @@ -31,6 +32,7 @@ import (
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util"
Expand Down Expand Up @@ -340,6 +342,11 @@ func CompileExecutePreparedStmt(ctx context.Context, sctx sessionctx.Context,
return nil, false, false, err
}

failpoint.Inject("assertTxnManagerInCompile", func() {
sessiontxn.RecordAssert(sctx, "assertTxnManagerInCompile", true)
sessiontxn.AssertTxnManagerInfoSchema(sctx, is)
})

stmt := &ExecStmt{
GoCtx: ctx,
InfoSchema: is,
Expand Down
3 changes: 2 additions & 1 deletion executor/seqtest/prepared_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func TestPrepared(t *testing.T) {
require.Equal(t, query, stmt.OriginText())

// Check that rebuild plan works.
tk.Session().PrepareTxnCtx(ctx)
err = tk.Session().PrepareTxnCtx(ctx)
require.NoError(t, err)
_, err = stmt.RebuildPlan(ctx)
require.NoError(t, err)
rs, err = stmt.Exec(ctx)
Expand Down
14 changes: 14 additions & 0 deletions executor/write_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,20 @@ func TestReplace(t *testing.T) {
tk.MustExec("drop table t1, t2")
}

func TestReplaceWithCICollation(t *testing.T) {
collate.SetNewCollationEnabledForTest(true)
defer collate.SetNewCollationEnabledForTest(false)
store, clean := testkit.CreateMockStore(t)
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")

tk.MustExec("create table t (a varchar(20) charset utf8mb4 collate utf8mb4_general_ci primary key);")
tk.MustExec("replace into t(a) values (_binary'A '),(_binary'A');")
tk.MustQuery("select a from t use index(primary);").Check(testkit.Rows("A"))
tk.MustQuery("select a from t ignore index(primary);").Check(testkit.Rows("A"))
}

func TestGeneratedColumnForInsert(t *testing.T) {
store, clean := testkit.CreateMockStore(t)
defer clean()
Expand Down
20 changes: 20 additions & 0 deletions planner/core/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"sync/atomic"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser"
"github.com/pingcap/tidb/parser/ast"
Expand Down Expand Up @@ -213,3 +214,22 @@ type CachedPrepareStmt struct {
ForUpdateRead bool
SnapshotTSEvaluator func(sessionctx.Context) (uint64, error)
}

// GetPreparedStmt extract the prepared statement from the execute statement.
func GetPreparedStmt(stmt *ast.ExecuteStmt, vars *variable.SessionVars) (*CachedPrepareStmt, error) {
var ok bool
execID := stmt.ExecID
if stmt.Name != "" {
if execID, ok = vars.PreparedStmtNameToID[stmt.Name]; !ok {
return nil, ErrStmtNotFound
}
}
if preparedPointer, ok := vars.PreparedStmts[execID]; ok {
preparedObj, ok := preparedPointer.(*CachedPrepareStmt)
if !ok {
return nil, errors.Errorf("invalid CachedPrepareStmt type")
}
return preparedObj, nil
}
return nil, ErrStmtNotFound
}
Loading

0 comments on commit a3b267b

Please sign in to comment.