Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: make plan replayer capture support prepared stmt #40167

Merged
merged 11 commits into from
Jan 3, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
support prepared stmt
  • Loading branch information
Yisaer committed Dec 26, 2022
commit 4c47df3a13a4786f008dee37b63029cf7b4faa04
7 changes: 6 additions & 1 deletion executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -1724,7 +1724,12 @@ func (b *executorBuilder) buildTableDual(v *plannercore.PhysicalTableDual) Execu

// `getSnapshotTS` returns for-update-ts if in insert/update/delete/lock statement otherwise the isolation read ts
// Please notice that in RC isolation, the above two ts are the same
func (b *executorBuilder) getSnapshotTS() (uint64, error) {
func (b *executorBuilder) getSnapshotTS() (ts uint64, err error) {
defer func() {
if err == nil {
b.ctx.GetSessionVars().StmtCtx.StmtSnapshotTS = ts
}
}()
if b.forDataReaderBuilder {
return b.dataReaderTS, nil
}
Expand Down
85 changes: 0 additions & 85 deletions executor/compiler.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,7 @@ import (
"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/parser/mysql"
Expand All @@ -34,7 +32,6 @@ import (
"github.com/pingcap/tidb/sessiontxn/staleread"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/replayer"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -157,91 +154,9 @@ func (c *Compiler) Compile(ctx context.Context, stmtNode ast.StmtNode) (_ *ExecS
}
}
}
if c.Ctx.GetSessionVars().IsPlanReplayerCaptureEnabled() && !c.Ctx.GetSessionVars().InRestrictedSQL {
if _, ok := stmtNode.(*ast.SelectStmt); ok {
startTS, err := sessiontxn.GetTxnManager(c.Ctx).GetStmtReadTS()
if err != nil {
return nil, err
}
if c.Ctx.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(c.Ctx, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(c.Ctx, stmtNode, startTS)
}
}
}

return stmt, nil
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx)
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := getPlanDigest(sctx.GetSessionVars().StmtCtx)
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// needLowerPriority checks whether it's needed to lower the execution priority
// of a query.
// If the estimated output row count of any operator in the physical plan tree
Expand Down
16 changes: 15 additions & 1 deletion executor/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,26 @@ func TestPlanReplayer(t *testing.T) {
}

func TestPlanReplayerCapture(t *testing.T) {
store := testkit.CreateMockStore(t)
store, dom := testkit.CreateMockStoreAndDomain(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("plan replayer capture '123' '123';")
tk.MustQuery("select sql_digest, plan_digest from mysql.plan_replayer_task;").Check(testkit.Rows("123 123"))
tk.MustGetErrMsg("plan replayer capture '123' '123';", "plan replayer capture task already exists")
tk.MustExec("delete from mysql.plan_replayer_task")
tk.MustExec("create table t(id int)")
tk.MustExec("prepare stmt from 'update t set id = ? where id = ? + 1';")
tk.MustExec("SET @number = 5;")
tk.MustExec("execute stmt using @number,@number")
_, sqlDigest := tk.Session().GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := tk.Session().GetSessionVars().StmtCtx.GetPlanDigest()
tk.MustExec("SET @@tidb_enable_plan_replayer_capture = ON;")
tk.MustExec(fmt.Sprintf("plan replayer capture '%v' '%v'", sqlDigest.String(), planDigest.String()))
err := dom.GetPlanReplayerHandle().CollectPlanReplayerTask()
require.NoError(t, err)
tk.MustExec("execute stmt using @number,@number")
task := dom.GetPlanReplayerHandle().DrainTask()
require.NotNil(t, task)
}

func TestPlanReplayerContinuesCapture(t *testing.T) {
Expand Down
5 changes: 5 additions & 0 deletions executor/plan_replayer.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,11 @@ func (e *PlanReplayerExec) registerCaptureTask(ctx context.Context) error {
zap.Error(err))
return err
}
err = domain.GetDomain(e.ctx).GetPlanReplayerHandle().CollectPlanReplayerTask()
if err != nil {
logutil.BgLogger().Warn("collect task failed", zap.Error(err))
}
logutil.BgLogger().Info("collect plan replayer task success")
e.endFlag = true
return nil
}
Expand Down
83 changes: 83 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ import (
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/logutil/consistency"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/replayer"
"github.com/pingcap/tidb/util/sem"
"github.com/pingcap/tidb/util/sli"
"github.com/pingcap/tidb/util/sqlexec"
Expand Down Expand Up @@ -2375,6 +2376,20 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
se: se,
}, err
}
if !se.GetSessionVars().InRestrictedSQL {
sql, sqlDigest := se.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := se.GetSessionVars().StmtCtx.GetPlanDigest()
fmt.Println(fmt.Sprintf("sql=%v,sqldigest=%v,plandigest=%v", sql, sqlDigest.String(), planDigest.String()))
}
if !se.GetSessionVars().InRestrictedSQL && se.GetSessionVars().IsPlanReplayerCaptureEnabled() {
stmtNode := s.GetStmtNode()
startTS := se.GetSessionVars().StmtCtx.StmtSnapshotTS
if se.GetSessionVars().EnablePlanReplayedContinuesCapture {
checkPlanReplayerContinuesCapture(se, stmtNode, startTS)
} else {
checkPlanReplayerCaptureTask(se, stmtNode, startTS)
}
}

err = finishStmt(ctx, se, err, s)
if se.hasQuerySpecial() {
Expand All @@ -2389,6 +2404,74 @@ func runStmt(ctx context.Context, se *session, s sqlexec.Statement) (rs sqlexec.
return nil, err
}

func checkPlanReplayerCaptureTask(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
tasks := handle.GetTasks()
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
for _, task := range tasks {
if task.SQLDigest == sqlDigest.String() {
if task.PlanDigest == "*" || task.PlanDigest == planDigest.String() {
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, false)
return
}
}
}
}

func checkPlanReplayerContinuesCapture(sctx sessionctx.Context, stmtNode ast.StmtNode, startTS uint64) {
dom := domain.GetDomain(sctx)
if dom == nil {
return
}
handle := dom.GetPlanReplayerHandle()
if handle == nil {
return
}
_, sqlDigest := sctx.GetSessionVars().StmtCtx.SQLDigest()
_, planDigest := sctx.GetSessionVars().StmtCtx.GetPlanDigest()
key := replayer.PlanReplayerTaskKey{
SQLDigest: sqlDigest.String(),
PlanDigest: planDigest.String(),
}
existed := sctx.GetSessionVars().CheckPlanReplayerFinishedTaskKey(key)
if existed {
return
}
sendPlanReplayerDumpTask(key, sctx, stmtNode, startTS, true)
sctx.GetSessionVars().AddPlanReplayerFinishedTaskKey(key)
}

func sendPlanReplayerDumpTask(key replayer.PlanReplayerTaskKey, sctx sessionctx.Context, stmtNode ast.StmtNode,
startTS uint64, isContinuesCapture bool) {
stmtCtx := sctx.GetSessionVars().StmtCtx
handle := sctx.Value(bindinfo.SessionBindInfoKeyType).(*bindinfo.SessionHandle)
dumpTask := &domain.PlanReplayerDumpTask{
PlanReplayerTaskKey: key,
StartTS: startTS,
EncodePlan: executor.GetEncodedPlan,
TblStats: stmtCtx.TableStats,
SessionBindings: handle.GetAllBindRecord(),
SessionVars: sctx.GetSessionVars(),
ExecStmts: []ast.StmtNode{stmtNode},
Analyze: false,
IsCapture: true,
IsContinuesCapture: isContinuesCapture,
}
domain.GetDomain(sctx).GetPlanReplayerHandle().SendTask(dumpTask)
}

// ExecStmtVarKeyType is a dummy type to avoid naming collision in context.
type ExecStmtVarKeyType int

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,6 +384,9 @@ type StatementContext struct {
TableStats map[int64]interface{}
// useChunkAlloc indicates whether statement use chunk alloc
useChunkAlloc bool

// StmtSnapshotTS indicates the snapshot ts for the stmt
StmtSnapshotTS uint64
}

// StmtHints are SessionVars related sql hints.
Expand Down