Skip to content

Commit

Permalink
*: Add support for MAX_EXECUTION_TIME. (#10541) (#10940)
Browse files Browse the repository at this point in the history
  • Loading branch information
tiancaiamao authored and jackysp committed Jun 26, 2019
1 parent 0db535a commit 7214234
Show file tree
Hide file tree
Showing 16 changed files with 271 additions and 36 deletions.
44 changes: 28 additions & 16 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import (
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/statistics"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/expensivequery"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
Expand All @@ -52,22 +53,23 @@ import (
// Domain represents a storage space. Different domains can use the same database name.
// Multiple domains can be used in parallel without synchronization.
type Domain struct {
store kv.Storage
infoHandle *infoschema.Handle
privHandle *privileges.Handle
statsHandle unsafe.Pointer
statsLease time.Duration
statsUpdating sync2.AtomicInt32
ddl ddl.DDL
info *InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries
store kv.Storage
infoHandle *infoschema.Handle
privHandle *privileges.Handle
statsHandle unsafe.Pointer
statsLease time.Duration
statsUpdating sync2.AtomicInt32
ddl ddl.DDL
info *InfoSyncer
m sync.Mutex
SchemaValidator SchemaValidator
sysSessionPool *sessionPool
exit chan struct{}
etcdClient *clientv3.Client
wg sync.WaitGroup
gvc GlobalVariableCache
slowQuery *topNSlowQueries
expensiveQueryHandle *expensivequery.Handle

MockReloadFailed MockFailure // It mocks reload failed.
}
Expand Down Expand Up @@ -980,6 +982,16 @@ func (do *Domain) autoAnalyzeWorker(owner owner.Manager) {
}
}

// ExpensiveQueryHandle returns the expensive query handle.
func (do *Domain) ExpensiveQueryHandle() *expensivequery.Handle {
return do.expensiveQueryHandle
}

// InitExpensiveQueryHandle init the expensive query handler.
func (do *Domain) InitExpensiveQueryHandle() {
do.expensiveQueryHandle = expensivequery.NewExpensiveQueryHandle(do.exit)
}

const privilegeKey = "/tidb/privilege"

// NotifyUpdatePrivilege updates privilege key in etcd, TiDB client that watches
Expand Down
19 changes: 17 additions & 2 deletions executor/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ import (

// processinfoSetter is the interface use to set current running process info.
type processinfoSetter interface {
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
}

// recordSet wraps an executor, implements sqlexec.RecordSet interface
Expand Down Expand Up @@ -234,8 +234,9 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
sql = ss.SecureText()
}
}
maxExecutionTime := getMaxExecutionTime(sctx, a.StmtNode)
// Update processinfo, ShowProcess() will use it.
pi.SetProcessInfo(sql, time.Now(), cmd)
pi.SetProcessInfo(sql, time.Now(), cmd, maxExecutionTime)
a.Ctx.GetSessionVars().StmtCtx.StmtType = GetStmtLabel(a.StmtNode)
}

Expand Down Expand Up @@ -264,6 +265,20 @@ func (a *ExecStmt) Exec(ctx context.Context) (sqlexec.RecordSet, error) {
}, nil
}

// getMaxExecutionTime get the max execution timeout value.
func getMaxExecutionTime(sctx sessionctx.Context, stmtNode ast.StmtNode) uint64 {
ret := sctx.GetSessionVars().MaxExecutionTime
if sel, ok := stmtNode.(*ast.SelectStmt); ok {
for _, hint := range sel.TableHints {
if hint.HintName.L == variable.MaxExecutionTime {
ret = hint.MaxExecutionTime
break
}
}
}
return ret
}

func (a *ExecStmt) handleNoDelayExecutor(ctx context.Context, sctx sessionctx.Context, e Executor) (sqlexec.RecordSet, error) {
// Check if "tidb_snapshot" is set for the write executors.
// In history read mode, we can not do write operations.
Expand Down
21 changes: 18 additions & 3 deletions server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import (

"github.com/opentracing/opentracing-go"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
Expand Down Expand Up @@ -219,6 +220,11 @@ func (cc *clientConn) readPacket() ([]byte, error) {
}

func (cc *clientConn) writePacket(data []byte) error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.writePacket(data)
}

Expand Down Expand Up @@ -619,7 +625,11 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
cc.lastCmd = hack.String(data)
token := cc.server.getToken()
defer func() {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep)
// if handleChangeUser failed, cc.ctx may be nil
if cc.ctx != nil {
cc.ctx.SetProcessInfo("", t, mysql.ComSleep, 0)
}

cc.server.releaseToken(token)
span.Finish()
}()
Expand All @@ -633,9 +643,9 @@ func (cc *clientConn) dispatch(ctx context.Context, data []byte) error {
switch cmd {
case mysql.ComPing, mysql.ComStmtClose, mysql.ComStmtSendLongData, mysql.ComStmtReset,
mysql.ComSetOption, mysql.ComChangeUser:
cc.ctx.SetProcessInfo("", t, cmd)
cc.ctx.SetProcessInfo("", t, cmd, 0)
case mysql.ComInitDB:
cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd)
cc.ctx.SetProcessInfo("use "+hack.String(data), t, cmd, 0)
}

switch cmd {
Expand Down Expand Up @@ -697,6 +707,11 @@ func (cc *clientConn) useDB(ctx context.Context, db string) (err error) {
}

func (cc *clientConn) flush() error {
failpoint.Inject("FakeClientConn", func() {
if cc.pkt == nil {
failpoint.Return(nil)
}
})
return cc.pkt.flush()
}

Expand Down
2 changes: 1 addition & 1 deletion server/conn_stmt.go
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ func (cc *clientConn) handleStmtFetch(ctx context.Context, data []byte) (err err
if prepared, ok := cc.ctx.GetStatement(int(stmtID)).(*TiDBStatement); ok {
sql = prepared.sql
}
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute)
cc.ctx.SetProcessInfo(sql, time.Now(), mysql.ComStmtExecute, 0)
rs := stmt.GetResultSet()
if rs == nil {
return mysql.NewErr(mysql.ErrUnknownStmtHandler,
Expand Down
72 changes: 72 additions & 0 deletions server/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,14 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"

. "github.com/pingcap/check"
"github.com/pingcap/failpoint"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/tidb/session"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/util/arena"
)

type ConnTestSuite struct{}
Expand Down Expand Up @@ -163,3 +168,70 @@ func mapBelong(m1, m2 map[string]string) bool {
}
return true
}

func (ts ConnTestSuite) TestConnExecutionTimeout(c *C) {
//There is no underlying netCon, use failpoint to avoid panic
c.Assert(failpoint.Enable("github.com/pingcap/tidb/server/FakeClientConn", "return(1)"), IsNil)

c.Parallel()
store, err := mockstore.NewMockTikvStore()
c.Assert(err, IsNil)
defer store.Close()
dom, err := session.BootstrapSession(store)
c.Assert(err, IsNil)
defer dom.Close()
se, err := session.CreateSession4Test(store)
c.Assert(err, IsNil)

connID := 1
se.SetConnectionID(uint64(connID))
tc := &TiDBContext{
session: se,
stmts: make(map[int]*TiDBStatement),
}
cc := &clientConn{
connectionID: uint32(connID),
server: &Server{
capability: defaultCapability,
},
ctx: tc,
alloc: arena.NewAllocator(32 * 1024),
}
srv := &Server{
clients: map[uint32]*clientConn{
uint32(connID): cc,
},
}
handle := dom.ExpensiveQueryHandle().SetSessionManager(srv)
go handle.Run()

_, err = se.Execute(context.Background(), "use test;")
c.Assert(err, IsNil)
_, err = se.Execute(context.Background(), "CREATE TABLE testTable2 (id bigint PRIMARY KEY, age int)")
c.Assert(err, IsNil)
for i := 0; i < 10; i++ {
str := fmt.Sprintf("insert into testTable2 values(%d, %d)", i, i%80)
_, err = se.Execute(context.Background(), str)
c.Assert(err, IsNil)
}

_, err = se.Execute(context.Background(), "select SLEEP(1);")
c.Assert(err, IsNil)

_, err = se.Execute(context.Background(), "set @@max_execution_time = 500;")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, NotNil)

_, err = se.Execute(context.Background(), "set @@max_execution_time = 0;")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, IsNil)

err = cc.handleQuery(context.Background(), "select /*+ MAX_EXECUTION_TIME(100)*/ * FROM testTable2 WHERE SLEEP(1);")
c.Assert(err, NotNil)

c.Assert(failpoint.Disable("github.com/pingcap/tidb/server/FakeClientConn"), IsNil)
}
2 changes: 1 addition & 1 deletion server/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ type QueryCtx interface {
// SetValue saves a value associated with this context for key.
SetValue(key fmt.Stringer, value interface{})

SetProcessInfo(sql string, t time.Time, command byte)
SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64)

// CommitTxn commits the transaction operations.
CommitTxn(ctx context.Context) error
Expand Down
4 changes: 2 additions & 2 deletions server/driver_tidb.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ func (tc *TiDBContext) CommitTxn(ctx context.Context) error {
}

// SetProcessInfo implements QueryCtx SetProcessInfo method.
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte) {
tc.session.SetProcessInfo(sql, t, command)
func (tc *TiDBContext) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) {
tc.session.SetProcessInfo(sql, t, command, maxExecutionTime)
}

// RollbackTxn implements QueryCtx RollbackTxn method.
Expand Down
3 changes: 1 addition & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ type Server struct {
tlsConfig *tls.Config
driver IDriver
listener net.Listener
rwlock *sync.RWMutex
rwlock sync.RWMutex
concurrentLimiter *TokenLimiter
clients map[uint32]*clientConn
capability uint32
Expand Down Expand Up @@ -164,7 +164,6 @@ func NewServer(cfg *config.Config, driver IDriver) (*Server, error) {
cfg: cfg,
driver: driver,
concurrentLimiter: NewTokenLimiter(cfg.TokenLimit),
rwlock: &sync.RWMutex{},
clients: make(map[uint32]*clientConn),
stopListenerCh: make(chan struct{}, 1),
}
Expand Down
15 changes: 13 additions & 2 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Session interface {
SetClientCapability(uint32) // Set client capability flags.
SetConnectionID(uint64)
SetCommandValue(byte)
SetProcessInfo(string, time.Time, byte)
SetProcessInfo(string, time.Time, byte, uint64)
SetTLSState(*tls.ConnectionState)
SetCollation(coID int) error
SetSessionManager(util.SessionManager)
Expand Down Expand Up @@ -661,6 +661,10 @@ func createSessionFunc(store kv.Storage) pools.Factory {
if err != nil {
return nil, errors.Trace(err)
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0))
if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
Expand All @@ -677,6 +681,10 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
if err != nil {
return nil, errors.Trace(err)
}
err = variable.SetSessionSystemVar(se.sessionVars, variable.MaxExecutionTime, types.NewUintDatum(0))
if err != nil {
return nil, errors.Trace(err)
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
return se, nil
Expand Down Expand Up @@ -783,14 +791,16 @@ func (s *session) ParseSQL(ctx context.Context, sql, charset, collation string)
return s.parser.Parse(sql, charset, collation)
}

func (s *session) SetProcessInfo(sql string, t time.Time, command byte) {
func (s *session) SetProcessInfo(sql string, t time.Time, command byte, maxExecutionTime uint64) {
pi := util.ProcessInfo{
ID: s.sessionVars.ConnectionID,
DB: s.sessionVars.CurrentDB,
Command: mysql.Command2Str[command],
Time: t,
State: s.Status(),
Info: sql,

MaxExecutionTime: maxExecutionTime,
}
if s.sessionVars.User != nil {
pi.User = s.sessionVars.User.Username
Expand Down Expand Up @@ -1232,6 +1242,7 @@ func BootstrapSession(store kv.Storage) (*domain.Domain, error) {

timeutil.SetSystemTZ(tz)
dom := domain.GetDomain(se)
dom.InitExpensiveQueryHandle()

if !config.GetGlobalConfig().Security.SkipGrantTable {
err = dom.LoadPrivilegeLoop(se)
Expand Down
26 changes: 26 additions & 0 deletions session/session_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2432,3 +2432,29 @@ func (s *testSessionSuite) TestTxnGoString(c *C) {
tk.MustExec("rollback")
c.Assert(fmt.Sprintf("%#v", txn), Equals, "Txn{state=invalid}")
}

func (s *testSessionSuite) TestMaxExeucteTime(c *C) {
tk := testkit.NewTestKitWithInit(c, s.store)

tk.MustExec("create table MaxExecTime( id int,name varchar(128),age int);")
tk.MustExec("begin")
tk.MustExec("insert into MaxExecTime (id,name,age) values (1,'john',18),(2,'lary',19),(3,'lily',18);")

tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("0"))
tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("0"))
tk.MustQuery("select /*+ MAX_EXECUTION_TIME(1000) */ * FROM MaxExecTime;")

tk.MustExec("set @@global.MAX_EXECUTION_TIME = 300;")
tk.MustQuery("select * FROM MaxExecTime;")

tk.MustExec("set @@MAX_EXECUTION_TIME = 150;")
tk.MustQuery("select * FROM MaxExecTime;")

tk.MustQuery("select @@global.MAX_EXECUTION_TIME;").Check(testkit.Rows("300"))
tk.MustQuery("select @@MAX_EXECUTION_TIME;").Check(testkit.Rows("150"))

tk.MustExec("set @@global.MAX_EXECUTION_TIME = 0;")
tk.MustExec("set @@MAX_EXECUTION_TIME = 0;")
tk.MustExec("commit")
tk.MustExec("drop table if exists MaxExecTime;")
}
Loading

0 comments on commit 7214234

Please sign in to comment.