Skip to content

Commit

Permalink
planner: simplify plan cache for fast point get (#53135)
Browse files Browse the repository at this point in the history
ref #50618
  • Loading branch information
qw4990 authored May 9, 2024
1 parent 9bf3500 commit 58469bb
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 34 deletions.
2 changes: 1 addition & 1 deletion pkg/executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ func (a *ExecStmt) PointGet(ctx context.Context) (*recordSet, error) {
a.PsStmt.PointGet.Executor = nil
} else {
// CachedPlan type is already checked in last step
pointGetPlan := a.PsStmt.PointGet.Plan.(*plannercore.PointGetPlan)
pointGetPlan := a.Plan.(*plannercore.PointGetPlan)
exec.Init(pointGetPlan)
a.PsStmt.PointGet.Executor = exec
executor = exec
Expand Down
20 changes: 5 additions & 15 deletions pkg/executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,12 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
sessVars := c.Ctx.GetSessionVars()
stmtCtx := sessVars.StmtCtx
// handle the execute statement
var (
pointGetPlanShortPathOK bool
preparedObj *plannercore.PlanCacheStmt
)
var preparedObj *plannercore.PlanCacheStmt

if execStmt, ok := stmtNode.(*ast.ExecuteStmt); ok {
if preparedObj, err = plannercore.GetPreparedStmt(execStmt, sessVars); err != nil {
return nil, err
}
pointGetPlanShortPathOK = plannercore.IsPointGetPlanShortPathOK(c.Ctx, is, preparedObj)
}
// Build the final physical plan.
finalPlan, names, err := planner.Optimize(ctx, c.Ctx, stmtNode, is)
Expand Down Expand Up @@ -130,16 +126,10 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
OutputNames: names,
}
// Use cached plan if possible.
if pointGetPlanShortPathOK {
if ep, ok := stmt.Plan.(*plannercore.Execute); ok {
if pointPlan, ok := ep.Plan.(*plannercore.PointGetPlan); ok {
stmtCtx.SetPlan(stmt.Plan)
stmtCtx.SetPlanDigest(preparedObj.NormalizedPlan, preparedObj.PlanDigest)
stmt.Plan = pointPlan
stmt.PsStmt = preparedObj
} else {
// invalid the previous cached point plan
preparedObj.PointGet.Plan = nil
if preparedObj != nil && plannercore.IsSafeToReusePointGetExecutor(c.Ctx, is, preparedObj) {
if exec, isExec := finalPlan.(*plannercore.Execute); isExec {
if pointPlan, isPointPlan := exec.Plan.(*plannercore.PointGetPlan); isPointPlan {
stmt.PsStmt, stmt.Plan = preparedObj, pointPlan // notify to re-use the cached plan
}
}
}
Expand Down
18 changes: 4 additions & 14 deletions pkg/planner/core/plan_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -787,27 +787,17 @@ func checkPreparedPriv(sctx sessionctx.Context, stmt *PlanCacheStmt, is infosche
return err
}

// IsPointGetPlanShortPathOK check if we can execute using plan cached in prepared structure
// Be careful with the short path, current precondition is ths cached plan satisfying
// IsPointGetWithPKOrUniqueKeyByAutoCommit
func IsPointGetPlanShortPathOK(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
if stmt.PointGet.Plan == nil || staleread.IsStmtStaleness(sctx) {
// IsSafeToReusePointGetExecutor checks whether this is a PointGet Plan and safe to reuse its executor.
func IsSafeToReusePointGetExecutor(sctx sessionctx.Context, is infoschema.InfoSchema, stmt *PlanCacheStmt) bool {
if staleread.IsStmtStaleness(sctx) {
return false
}
// check auto commit
if !IsAutoCommitTxn(sctx.GetSessionVars()) {
return false
}
if stmt.SchemaVersion != is.SchemaMetaVersion() {
stmt.PointGet.Plan = nil
stmt.PointGet.ColumnInfos = nil
return false
}
// only support simple PointGet Plan now
switch stmt.PointGet.Plan.(type) {
case *PointGetPlan:
return true
default:
return false
}
return true
}
2 changes: 1 addition & 1 deletion pkg/sessiontxn/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ go_test(
"txn_rc_tso_optimize_test.go",
],
flaky = True,
shard_count = 24,
shard_count = 25,
deps = [
":sessiontxn",
"//pkg/domain",
Expand Down
43 changes: 40 additions & 3 deletions pkg/sessiontxn/txn_context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,12 +505,12 @@ func TestTxnContextForPrepareExecute(t *testing.T) {
doWithCheckPath(t, se, normalPathRecords, func() {
tk.MustExec("prepare s from 'select * from t1 where id=1'")
})
doWithCheckPath(t, se, normalPathRecords, func() {
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
tk.MustQuery("execute s").Check(testkit.Rows("1 10"))
})

// Test ExecutePreparedStmt
doWithCheckPath(t, se, normalPathRecords, func() {
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 10"))
Expand Down Expand Up @@ -549,6 +549,42 @@ func TestTxnContextForPrepareExecute(t *testing.T) {
tk.MustExec("rollback")
}

func TestStaleReadInPrepare(t *testing.T) {
store, _ := setupTxnContextTest(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
se := tk.Session()

tk.MustExec(`create table tt (id int primary key, v int)`)
tk.MustExec(`insert into tt values(1, 10)`)
tk.MustExec("do sleep(0.1)")
tk.MustExec("set @a=now(6)")
tk.MustExec("do sleep(0.1)")

st, _, _, err := se.PrepareStmt("select v from tt where id=1")
require.NoError(t, err)

tk.MustExec(`update tt set v=11 where id=1`)
rs, err := se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("11"))

tk.MustExec("set @@tx_read_ts=@a")
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("10"))

tk.MustExec("update tt set v=12 where id=1")
tk.MustExec("set @@tx_read_ts=''")
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
rs, err = se.ExecutePreparedStmt(context.TODO(), st, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("12"))
tk.MustQuery(`select @@last_plan_from_cache`).Check(testkit.Rows("1"))
}

func TestTxnContextForStaleReadInPrepare(t *testing.T) {
store, _ := setupTxnContextTest(t)
tk := testkit.NewTestKit(t, store)
Expand Down Expand Up @@ -632,7 +668,8 @@ func TestTxnContextForStaleReadInPrepare(t *testing.T) {
tk.MustExec("do sleep(0.1)")
tk.MustExec("update t1 set v=v+1 where id=1")
se.SetValue(sessiontxn.AssertTxnInfoSchemaKey, is2)
doWithCheckPath(t, se, normalPathRecords, func() {
doWithCheckPath(t, se, []string{"assertTxnManagerInCompile", "assertTxnManagerInShortPointGetPlan"}, func() {
// stale-read is not used since `tx_read_ts` is empty, so the plan cache should be used in this case.
rs, err := se.ExecutePreparedStmt(context.TODO(), stmtID1, nil)
require.NoError(t, err)
tk.ResultSetToResult(rs, fmt.Sprintf("%v", rs)).Check(testkit.Rows("1 12"))
Expand Down

0 comments on commit 58469bb

Please sign in to comment.