Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

session: move more session vars to stmt context for retrying #8034

Merged
merged 20 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ddl/ddl_worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ func testCheckJobCancelled(c *C, d *ddl, job *model.Job, state *model.SchemaStat
t := meta.NewMeta(txn)
historyJob, err := t.GetHistoryDDLJob(job.ID)
c.Assert(err, IsNil)
c.Assert(historyJob.IsCancelled() || historyJob.IsRollbackDone(), IsTrue, Commentf("histroy job %s", historyJob))
c.Assert(historyJob.IsCancelled() || historyJob.IsRollbackDone(), IsTrue, Commentf("history job %s", historyJob))
if state != nil {
c.Assert(historyJob.SchemaState, Equals, *state)
}
Expand Down
9 changes: 7 additions & 2 deletions executor/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1175,6 +1175,7 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.MemTracker = memory.NewTracker(s.Text(), vars.MemQuotaQuery)
sc.NowTs = time.Time{}
sc.SysTs = time.Time{}
sc.PreparedParams = []interface{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe put PreparedParams in SessionVars can avoid re-allocating memory before every execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not set it to nil

switch config.GetGlobalConfig().OOMAction {
case config.OOMActionCancel:
sc.MemTracker.SetActionOnExceed(&memory.PanicOnExceed{})
Expand Down Expand Up @@ -1250,7 +1251,6 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
sc.IgnoreTruncate = true
sc.IgnoreZeroInDate = true
}
vars.PreparedParams = vars.PreparedParams[:0]
if !vars.InRestrictedSQL {
if priority := mysql.PriorityEnum(atomic.LoadInt32(&variable.ForcePriority)); priority != mysql.NoPriority {
sc.Priority = priority
Expand All @@ -1260,7 +1260,12 @@ func ResetContextOfStmt(ctx sessionctx.Context, s ast.StmtNode) (err error) {
vars.PrevLastInsertID = vars.LastInsertID
vars.LastInsertID = 0
}
vars.ResetPrevAffectedRows()
sc.PrevAffectedRows = 0
if vars.StmtCtx.InUpdateOrDeleteStmt || vars.StmtCtx.InInsertStmt {
sc.PrevAffectedRows = int64(vars.StmtCtx.AffectedRows())
} else if vars.StmtCtx.InSelectStmt {
sc.PrevAffectedRows = -1
}
err = vars.SetSystemVar("warning_count", fmt.Sprintf("%d", vars.StmtCtx.NumWarnings(false)))
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ func (e *SimpleExec) executeBegin(s *ast.BeginStmt) error {
// If BEGIN is the first statement in TxnCtx, we can reuse the existing transaction, without the
// need to call NewTxn, which commits the existing transaction and begins a new one.
txnCtx := e.ctx.GetSessionVars().TxnCtx
if txnCtx.Histroy != nil {
if txnCtx.History != nil {
err := e.ctx.NewTxn()
if err != nil {
return errors.Trace(err)
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_info.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,6 @@ func (b *builtinRowCountSig) Clone() builtinFunc {
// evalInt evals ROW_COUNT().
// See https://dev.mysql.com/doc/refman/5.7/en/information-functions.html#function_row-count.
func (b *builtinRowCountSig) evalInt(_ chunk.Row) (res int64, isNull bool, err error) {
res = int64(b.ctx.GetSessionVars().PrevAffectedRows)
res = int64(b.ctx.GetSessionVars().StmtCtx.PrevAffectedRows)
return res, false, nil
}
2 changes: 1 addition & 1 deletion expression/builtin_info_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *testEvaluatorSuite) TestRowCount(c *C) {
defer testleak.AfterTest(c)()
ctx := mock.NewContext()
sessionVars := ctx.GetSessionVars()
sessionVars.PrevAffectedRows = 10
sessionVars.StmtCtx.PrevAffectedRows = 10

f, err := funcs[ast.RowCount].getFunction(ctx, nil)
c.Assert(err, IsNil)
Expand Down
2 changes: 1 addition & 1 deletion expression/builtin_other.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,7 +790,7 @@ func (b *builtinGetParamStringSig) evalString(row chunk.Row) (string, bool, erro
if isNull || err != nil {
return "", isNull, errors.Trace(err)
}
v := sessionVars.PreparedParams[idx]
v := sessionVars.StmtCtx.PreparedParams[idx].(types.Datum)
tiancaiamao marked this conversation as resolved.
Show resolved Hide resolved

str, err := v.ToString()
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion planner/core/common_plans.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ func (e *Execute) OptimizePreparedPlan(ctx sessionctx.Context, is infoschema.Inf
return errors.Trace(err)
}
prepared.Params[i].(*driver.ParamMarkerExpr).Datum = val
vars.PreparedParams = append(vars.PreparedParams, val)
vars.StmtCtx.PreparedParams = append(vars.StmtCtx.PreparedParams, val)
}
if prepared.SchemaVersion != is.SchemaMetaVersion() {
// If the schema version has changed we need to preprocess it again,
Expand Down
4 changes: 2 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,8 @@ func (s *session) retry(ctx context.Context, maxCnt uint) error {
if st.IsReadOnly() {
continue
}
s.sessionVars.StmtCtx = sr.stmtCtx
s.sessionVars.StmtCtx.ResetForRetry()
schemaVersion, err = st.RebuildPlan()
if err != nil {
return errors.Trace(err)
Expand All @@ -489,8 +491,6 @@ func (s *session) retry(ctx context.Context, maxCnt uint) error {
} else {
log.Warnf("con:%d schema_ver:%d retry_cnt:%d query_num:%d", connID, schemaVersion, retryCnt, i)
}
s.sessionVars.StmtCtx = sr.stmtCtx
s.sessionVars.StmtCtx.ResetForRetry()
jackysp marked this conversation as resolved.
Show resolved Hide resolved
_, err = st.Exec(ctx)
if err != nil {
s.StmtRollback()
Expand Down
18 changes: 18 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,24 @@ func (s *testSessionSuite) TestDelete(c *C) {
tk1.MustQuery("select * from t;").Check(testkit.Rows("1"))
}

func (s *testSessionSuite) TestResetCtx(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)
tk1 := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("create table t (i int);")
tk.MustExec("insert into t values (1);")
tk.MustExec("begin;")
tk.MustExec("insert into t values (10);")
tk.MustExec("update t set i = i + row_count();")
tk.MustQuery("select * from t;").Check(testkit.Rows("2", "11"))

tk1.MustExec("update t set i = 0 where i = 1;")
jackysp marked this conversation as resolved.
Show resolved Hide resolved
tk1.MustQuery("select * from t;").Check(testkit.Rows("0"))

tk.MustExec("commit;")
tk.MustQuery("select * from t;").Check(testkit.Rows("1", "11"))
}

func (s *testSessionSuite) TestUnique(c *C) {
// test for https://github.com/pingcap/tidb/pull/461

Expand Down
4 changes: 2 additions & 2 deletions session/tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,12 +183,12 @@ func runStmt(ctx context.Context, sctx sessionctx.Context, s sqlexec.Statement)

// GetHistory get all stmtHistory in current txn. Exported only for test.
func GetHistory(ctx sessionctx.Context) *StmtHistory {
hist, ok := ctx.GetSessionVars().TxnCtx.Histroy.(*StmtHistory)
hist, ok := ctx.GetSessionVars().TxnCtx.History.(*StmtHistory)
if ok {
return hist
}
hist = new(StmtHistory)
ctx.GetSessionVars().TxnCtx.Histroy = hist
ctx.GetSessionVars().TxnCtx.History = hist
return hist
}

Expand Down
4 changes: 4 additions & 0 deletions sessionctx/stmtctx/stmtctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type StatementContext struct {
histogramsNotLoad bool
execDetails execdetails.ExecDetails
}
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64
// params for prepared statements
PreparedParams []interface{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why change []types.Datum to []interface{} ?


// Copied from SessionVars.TimeZone.
TimeZone *time.Location
Expand Down
24 changes: 10 additions & 14 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type TransactionContext struct {
DirtyDB interface{}
Binlog interface{}
InfoSchema interface{}
Histroy interface{}
History interface{}
SchemaVersion int64
StartTS uint64
Shard *int64
Expand Down Expand Up @@ -173,8 +173,6 @@ type SessionVars struct {
PreparedStmtNameToID map[string]uint32
// preparedStmtID is id of prepared statement.
preparedStmtID uint32
// params for prepared statements
PreparedParams []types.Datum

// retry information
RetryInfo *RetryInfo
Expand All @@ -199,8 +197,6 @@ type SessionVars struct {
PrevLastInsertID uint64 // PrevLastInsertID is the last insert ID of previous statement.
LastInsertID uint64 // LastInsertID is the auto-generated ID in the current statement.
InsertID uint64 // InsertID is the given insert ID of an auto_increment column.
// PrevAffectedRows is the affected-rows value(DDL is 0, DML is the number of affected rows).
PrevAffectedRows int64

// ClientCapability is client's capability.
ClientCapability uint32
Expand Down Expand Up @@ -328,7 +324,6 @@ func NewSessionVars() *SessionVars {
systems: make(map[string]string),
PreparedStmts: make(map[uint32]*ast.Prepared),
PreparedStmtNameToID: make(map[string]uint32),
PreparedParams: make([]types.Datum, 0, 10),
TxnCtx: &TransactionContext{},
KVVars: kv.NewVariables(),
RetryInfo: &RetryInfo{},
Expand Down Expand Up @@ -463,27 +458,28 @@ func (s *SessionVars) Location() *time.Location {

// ResetPrevAffectedRows reset the prev-affected-rows variable.
func (s *SessionVars) ResetPrevAffectedRows() {
s.PrevAffectedRows = 0
s.StmtCtx.PrevAffectedRows = 0
if s.StmtCtx != nil {
if s.StmtCtx.InUpdateOrDeleteStmt || s.StmtCtx.InInsertStmt {
s.PrevAffectedRows = int64(s.StmtCtx.AffectedRows())
s.StmtCtx.PrevAffectedRows = int64(s.StmtCtx.AffectedRows())
} else if s.StmtCtx.InSelectStmt {
s.PrevAffectedRows = -1
s.StmtCtx.PrevAffectedRows = -1
}
}
}

// GetExecuteArgumentsInfo gets the argument list as a string of execute statement.
func (s *SessionVars) GetExecuteArgumentsInfo() string {
if len(s.PreparedParams) == 0 {
if len(s.StmtCtx.PreparedParams) == 0 {
return ""
}
args := make([]string, 0, len(s.PreparedParams))
for _, v := range s.PreparedParams {
if v.IsNull() {
args := make([]string, 0, len(s.StmtCtx.PreparedParams))
for _, v := range s.StmtCtx.PreparedParams {
d := v.(types.Datum)
if d.IsNull() {
args = append(args, "<nil>")
} else {
str, err := v.ToString()
str, err := d.ToString()
if err != nil {
terror.Log(err)
}
Expand Down