From ce8ea9b804ddfa5c2808b1971e1c19510ffb819d Mon Sep 17 00:00:00 2001 From: you06 Date: Fri, 19 Mar 2021 17:25:36 +0800 Subject: [PATCH] cherry pick #22381 to release-5.0 Signed-off-by: ti-srebot --- executor/prepared.go | 1 + planner/core/cache.go | 1 + planner/core/common_plans.go | 10 +++++- planner/core/optimizer.go | 4 +++ planner/core/planbuilder.go | 5 +++ planner/optimize.go | 14 ++++++++ session/pessimistic_test.go | 66 ++++++++++++++++++++++++++++++++++++ session/session.go | 16 +++++++-- 8 files changed, 114 insertions(+), 3 deletions(-) diff --git a/executor/prepared.go b/executor/prepared.go index cef541a757c91..970638dd68cc0 100644 --- a/executor/prepared.go +++ b/executor/prepared.go @@ -220,6 +220,7 @@ func (e *PrepareExec) Next(ctx context.Context, req *chunk.Chunk) error { VisitInfos: destBuilder.GetVisitInfo(), NormalizedSQL: normalized, SQLDigest: digest, + ForUpdateRead: destBuilder.GetIsForUpdateRead(), } return vars.AddPreparedStmt(e.ID, preparedObj) } diff --git a/planner/core/cache.go b/planner/core/cache.go index 1e87a984331b0..80cd27c930890 100644 --- a/planner/core/cache.go +++ b/planner/core/cache.go @@ -195,4 +195,5 @@ type CachedPrepareStmt struct { NormalizedPlan string SQLDigest string PlanDigest string + ForUpdateRead bool } diff --git a/planner/core/common_plans.go b/planner/core/common_plans.go index 8b856d10dffa6..8b77efad81337 100644 --- a/planner/core/common_plans.go +++ b/planner/core/common_plans.go @@ -275,7 +275,8 @@ func (e *Execute) setFoundInPlanCache(sctx sessionctx.Context, opt bool) error { } func (e *Execute) getPhysicalPlan(ctx context.Context, sctx sessionctx.Context, is infoschema.InfoSchema, preparedStmt *CachedPrepareStmt) error { - stmtCtx := sctx.GetSessionVars().StmtCtx + sessVars := sctx.GetSessionVars() + stmtCtx := sessVars.StmtCtx prepared := preparedStmt.PreparedAst stmtCtx.UseCache = prepared.UseCache var cacheKey kvcache.Key @@ -375,6 +376,12 @@ REBUILD: e.Plan = p _, isTableDual := p.(*PhysicalTableDual) if !isTableDual && prepared.UseCache && !stmtCtx.OptimDependOnMutableConst { + // rebuild key to exclude kv.TiFlash when stmt is not read only + if _, isolationReadContainTiFlash := sessVars.IsolationReadEngines[kv.TiFlash]; isolationReadContainTiFlash && !IsReadOnly(stmt, sessVars) { + delete(sessVars.IsolationReadEngines, kv.TiFlash) + cacheKey = NewPSTMTPlanCacheKey(sctx.GetSessionVars(), e.ExecID, prepared.SchemaVersion) + sessVars.IsolationReadEngines[kv.TiFlash] = struct{}{} + } cached := NewPSTMTPlanCacheValue(p, names, stmtCtx.TblInfo2UnionScan, tps) preparedStmt.NormalizedPlan, preparedStmt.PlanDigest = NormalizePlan(p) stmtCtx.SetPlanDigest(preparedStmt.NormalizedPlan, preparedStmt.PlanDigest) @@ -1323,5 +1330,6 @@ func IsPointUpdateByAutoCommit(ctx sessionctx.Context, p Plan) (bool, error) { if _, isFastSel := updPlan.SelectPlan.(*PointGetPlan); isFastSel { return true, nil } + return false, nil } diff --git a/planner/core/optimizer.go b/planner/core/optimizer.go index a81b0f3896d18..67ef1639ef398 100644 --- a/planner/core/optimizer.go +++ b/planner/core/optimizer.go @@ -27,6 +27,7 @@ import ( "github.com/pingcap/tidb/planner/property" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/types" utilhint "github.com/pingcap/tidb/util/hint" "github.com/pingcap/tidb/util/set" @@ -39,6 +40,9 @@ var OptimizeAstNode func(ctx context.Context, sctx sessionctx.Context, node ast. // AllowCartesianProduct means whether tidb allows cartesian join without equal conditions. var AllowCartesianProduct = atomic.NewBool(true) +// IsReadOnly check whether the ast.Node is a read only statement. +var IsReadOnly func(node ast.Node, vars *variable.SessionVars) bool + const ( flagGcSubstitute uint64 = 1 << iota flagPrunColumns diff --git a/planner/core/planbuilder.go b/planner/core/planbuilder.go index 0a7b73beb482e..4e133876eee35 100644 --- a/planner/core/planbuilder.go +++ b/planner/core/planbuilder.go @@ -524,6 +524,11 @@ func (b *PlanBuilder) GetVisitInfo() []visitInfo { return b.visitInfo } +// GetIsForUpdateRead gets if the PlanBuilder use forUpdateRead +func (b *PlanBuilder) GetIsForUpdateRead() bool { + return b.isForUpdateRead +} + // GetDBTableInfo gets the accessed dbs and tables info. func (b *PlanBuilder) GetDBTableInfo() []stmtctx.TableEntry { var tables []stmtctx.TableEntry diff --git a/planner/optimize.go b/planner/optimize.go index b8bac7a8c2cd8..8d8868f2cd38f 100644 --- a/planner/optimize.go +++ b/planner/optimize.go @@ -30,6 +30,7 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/planner/cascades" + "github.com/pingcap/tidb/planner/core" plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/privilege" "github.com/pingcap/tidb/sessionctx" @@ -236,6 +237,18 @@ func optimize(ctx context.Context, sctx sessionctx.Context, node ast.Node, is in } sctx.GetSessionVars().RewritePhaseInfo.DurationRewrite = time.Since(beginRewrite) + if execPlan, ok := p.(*plannercore.Execute); ok { + execID := execPlan.ExecID + if execPlan.Name != "" { + execID = sctx.GetSessionVars().PreparedStmtNameToID[execPlan.Name] + } + if preparedPointer, ok := sctx.GetSessionVars().PreparedStmts[execID]; ok { + if preparedObj, ok := preparedPointer.(*core.CachedPrepareStmt); ok && preparedObj.ForUpdateRead { + is = domain.GetDomain(sctx).InfoSchema() + } + } + } + sctx.GetSessionVars().StmtCtx.Tables = builder.GetDBTableInfo() activeRoles := sctx.GetSessionVars().ActiveRoles // Check privilege. Maybe it's better to move this to the Preprocess, but @@ -556,4 +569,5 @@ func setFoundInBinding(sctx sessionctx.Context, opt bool) error { func init() { plannercore.OptimizeAstNode = Optimize + plannercore.IsReadOnly = IsReadOnly } diff --git a/session/pessimistic_test.go b/session/pessimistic_test.go index a49b01c348a3a..90146de91594d 100644 --- a/session/pessimistic_test.go +++ b/session/pessimistic_test.go @@ -28,11 +28,13 @@ import ( "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" + plannercore "github.com/pingcap/tidb/planner/core" "github.com/pingcap/tidb/session" "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/store/tikv" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/tablecodec" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/codec" "github.com/pingcap/tidb/util/testkit" ) @@ -2459,6 +2461,70 @@ func (s *testPessimisticSuite) TestIssue21498(c *C) { } } +func (s *testPessimisticSuite) TestPlanCacheSchemaChange(c *C) { + orgEnable := plannercore.PreparedPlanCacheEnabled() + defer func() { + plannercore.SetPreparedPlanCache(orgEnable) + }() + plannercore.SetPreparedPlanCache(true) + + tk := testkit.NewTestKitWithInit(c, s.store) + tk2 := testkit.NewTestKitWithInit(c, s.store) + tk3 := testkit.NewTestKitWithInit(c, s.store) + ctx := context.Background() + + tk.MustExec("use test") + tk2.MustExec("use test") + tk3.MustExec("use test") + + tk.MustExec("drop table if exists t") + tk.MustExec("create table t (id int primary key, v int, unique index iv (v), vv int)") + tk.MustExec("insert into t values(1, 1, 1), (2, 2, 2), (4, 4, 4)") + + tk.MustExec("set tidb_enable_amend_pessimistic_txn = 1") + tk2.MustExec("set tidb_enable_amend_pessimistic_txn = 1") + + //generate plan cache + tk.MustExec("prepare update_stmt from 'update t set vv = vv + 1 where v = ?'") + tk.MustExec("set @v = 1") + tk.MustExec("execute update_stmt using @v") + + stmtID, _, _, err := tk2.Se.PrepareStmt("update t set vv = vv + 1 where v = ?") + c.Assert(err, IsNil) + _, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(1)}) + c.Assert(err, IsNil) + + tk.MustExec("begin pessimistic") + tk2.MustExec("begin pessimistic") + + tk3.MustExec("alter table t drop index iv") + tk3.MustExec("update t set v = 3 where v = 2") + tk3.MustExec("update t set v = 5 where v = 4") + + tk.MustExec("set @v = 2") + tk.MustExec("execute update_stmt using @v") + tk.CheckExecResult(0, 0) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + tk.MustExec("set @v = 3") + tk.MustExec("execute update_stmt using @v") + tk.CheckExecResult(1, 0) + tk.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + _, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(4)}) + c.Assert(err, IsNil) + tk2.CheckExecResult(0, 0) + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("0")) + _, err = tk2.Se.ExecutePreparedStmt(ctx, stmtID, []types.Datum{types.NewDatum(5)}) + c.Assert(err, IsNil) + tk2.CheckExecResult(1, 0) + tk2.MustQuery("select @@last_plan_from_cache").Check(testkit.Rows("1")) + + tk.MustExec("commit") + tk2.MustExec("commit") + + tk.MustQuery("select * from t").Check(testkit.Rows("1 1 3", "2 3 3", "4 5 5")) +} + func (s *testPessimisticSuite) TestAsyncCommitCalTSFail(c *C) { atomic.StoreUint64(&tikv.ManagedLockTTL, 5000) defer func() { diff --git a/session/session.go b/session/session.go index 4763407ca5886..e9256464fb01b 100644 --- a/session/session.go +++ b/session/session.go @@ -1618,7 +1618,12 @@ func (s *session) cachedPlanExec(ctx context.Context, stmtID uint32, prepareStmt *plannercore.CachedPrepareStmt, args []types.Datum) (sqlexec.RecordSet, error) { prepared := prepareStmt.PreparedAst // compile ExecStmt - is := infoschema.GetInfoSchema(s) + var is infoschema.InfoSchema + if prepareStmt.ForUpdateRead { + is = domain.GetDomain(s).InfoSchema() + } else { + is = infoschema.GetInfoSchema(s) + } execAst := &ast.ExecuteStmt{ExecID: stmtID} if err := executor.ResetContextOfStmt(s, execAst); err != nil { return nil, err @@ -1659,9 +1664,16 @@ func (s *session) cachedPlanExec(ctx context.Context, s.PrepareTSFuture(ctx) stmtCtx.Priority = kv.PriorityHigh resultSet, err = runStmt(ctx, s, stmt) + case nil: + // cache is invalid + if prepareStmt.ForUpdateRead { + s.PrepareTSFuture(ctx) + } + resultSet, err = runStmt(ctx, s, stmt) default: + err = errors.Errorf("invalid cached plan type %T", prepared.CachedPlan) prepared.CachedPlan = nil - return nil, errors.Errorf("invalid cached plan type") + return nil, err } return resultSet, err }