Skip to content

Commit

Permalink
txn: unify the management of transaction activation by TxnManager. (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
SpadeA-Tang authored Jun 30, 2022
1 parent 4fc8693 commit 11f39cd
Show file tree
Hide file tree
Showing 20 changed files with 292 additions and 139 deletions.
5 changes: 5 additions & 0 deletions domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,11 @@ func canSkipSchemaCheckerDDL(tp model.ActionType) bool {

// InfoSchema gets the latest information schema from domain.
func (do *Domain) InfoSchema() infoschema.InfoSchema {
if do.infoCache == nil {
// Return nil is for test purpose where domain is not well initialized in session context.
// In real implementation, the code will not reach here.
return nil
}
return do.infoCache.GetLatest()
}

Expand Down
9 changes: 9 additions & 0 deletions executor/stale_txn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1439,3 +1439,12 @@ func TestIssue31954(t *testing.T) {
tk.MustQuery("select (select v from t1 as of timestamp @a where id=1) as v").
Check(testkit.Rows("10"))
}

func TestIssue35686(t *testing.T) {
store, _, clean := testkit.CreateMockStoreAndDomain(t)
defer clean()

tk := testkit.NewTestKit(t, store)
// This query should not panic
tk.MustQuery("select * from information_schema.ddl_jobs as of timestamp now()")
}
5 changes: 5 additions & 0 deletions infoschema/infoschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/mock"
)

// InfoSchema is the interface used to retrieve the schema information.
Expand Down Expand Up @@ -353,6 +355,9 @@ func init() {
util.GetSequenceByName = func(is interface{}, schema, sequence model.CIStr) (util.SequenceTable, error) {
return GetSequenceByName(is.(InfoSchema), schema, sequence)
}
mock.MockInfoschema = func(tbList []*model.TableInfo) sessionctx.InfoschemaMetaVersion {
return MockInfoSchema(tbList)
}
}

// HasAutoIncrementColumn checks whether the table has auto_increment columns, if so, return true and the column name.
Expand Down
8 changes: 4 additions & 4 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -227,16 +227,16 @@ type Transaction interface {
// If a key doesn't exist, there shouldn't be any corresponding entry in the result map.
BatchGet(ctx context.Context, keys []Key) (map[string][]byte, error)
IsPessimistic() bool
// CacheIndexName caches the index name.
// CacheTableInfo caches the index name.
// PresumeKeyNotExists will use this to help decode error message.
CacheTableInfo(id int64, info *model.TableInfo)
// GetIndexName returns the cached index name.
// GetTableInfo returns the cached index name.
// If there is no such index already inserted through CacheIndexName, it will return UNKNOWN.
GetTableInfo(id int64) *model.TableInfo

// set allowed options of current operation in each TiKV disk usage level.
// SetDiskFullOpt set allowed options of current operation in each TiKV disk usage level.
SetDiskFullOpt(level kvrpcpb.DiskFullOpt)
// clear allowed flag
// ClearDiskFullOpt clear allowed flag
ClearDiskFullOpt()

// GetMemDBCheckpoint gets the transaction's memDB checkpoint.
Expand Down
10 changes: 7 additions & 3 deletions session/schema_amender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,18 @@ package session
import (
"bytes"
"context"
"fmt"
"strconv"
"testing"

"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/model"
"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/sessiontxn"
"github.com/pingcap/tidb/store/mockstore"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
Expand Down Expand Up @@ -254,6 +257,8 @@ func TestAmendCollectAndGenMutations(t *testing.T) {
store: store,
sessionVars: variable.NewSessionVars(),
}
se.mu.values = make(map[fmt.Stringer]interface{})
domain.BindDomain(se, &domain.Domain{})
startStates := []model.SchemaState{model.StateNone, model.StateDeleteOnly, model.StateWriteOnly, model.StateWriteReorganization}
for _, startState := range startStates {
endStatMap := ConstOpAddIndex[startState]
Expand Down Expand Up @@ -404,10 +409,9 @@ func TestAmendCollectAndGenMutations(t *testing.T) {

logutil.BgLogger().Info("[TEST]finish to write old txn data")
// Write data for this new transaction, its memory buffer will be used by schema amender.
txn, err := se.store.Begin()
err = sessiontxn.NewTxn(ctx, se)
require.NoError(t, err)
se.txn.changeInvalidToValid(txn)
txn, err = se.Txn(true)
txn, err := se.Txn(false)
require.NoError(t, err)
var checkKeys []kv.Key
for i, key := range mutations.GetKeys() {
Expand Down
91 changes: 7 additions & 84 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -2467,85 +2467,8 @@ func (s *session) Txn(active bool) (kv.Transaction, error) {
if !active {
return &s.txn, nil
}
if !s.txn.validOrPending() {
return &s.txn, errors.AddStack(kv.ErrInvalidTxn)
}
if s.txn.pending() {
defer func(begin time.Time) {
s.sessionVars.DurationWaitTS = time.Since(begin)
}(time.Now())
// Transaction is lazy initialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
if err := s.txn.changePendingToValid(s.currentCtx); err != nil {
logutil.BgLogger().Error("active transaction fail",
zap.Error(err))
s.txn.cleanup()
s.sessionVars.TxnCtx.StartTS = 0
return &s.txn, err
}
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
if s.sessionVars.TxnCtx.IsPessimistic {
s.txn.SetOption(kv.Pessimistic, true)
}
if !s.sessionVars.IsAutocommit() && s.sessionVars.SnapshotTS == 0 {
s.sessionVars.SetInTxn(true)
}
s.sessionVars.TxnCtx.CouldRetry = s.isTxnRetryable()
s.txn.SetVars(s.sessionVars.KVVars)
readReplicaType := s.sessionVars.GetReplicaRead()
if readReplicaType.IsFollowerRead() {
s.txn.SetOption(kv.ReplicaRead, readReplicaType)
}
s.txn.SetOption(kv.SnapInterceptor, s.getSnapshotInterceptor())
if s.GetSessionVars().StmtCtx.WeakConsistency {
s.txn.SetOption(kv.IsolationLevel, kv.RC)
}
setTxnAssertionLevel(&s.txn, s.sessionVars.AssertionLevel)
}
return &s.txn, nil
}

// isTxnRetryable (if returns true) means the transaction could retry.
// If the transaction is in pessimistic mode, do not retry.
// If the session is already in transaction, enable retry or internal SQL could retry.
// If not, the transaction could always retry, because it should be auto committed transaction.
// Anyway the retry limit is 0, the transaction could not retry.
func (s *session) isTxnRetryable() bool {
sessVars := s.sessionVars

// The pessimistic transaction no need to retry.
if sessVars.TxnCtx.IsPessimistic {
return false
}

// If retry limit is 0, the transaction could not retry.
if sessVars.RetryLimit == 0 {
return false
}

// When `@@tidb_snapshot` is set, it is a ready-only statement and will not cause the errors that should retry a transaction in optimistic mode.
if sessVars.SnapshotTS != 0 {
return false
}

// If the session is not InTxn, it is an auto-committed transaction.
// The auto-committed transaction could always retry.
if !sessVars.InTxn() {
return true
}

// The internal transaction could always retry.
if sessVars.InRestrictedSQL {
return true
}

// If the retry is enabled, the transaction could retry.
if !sessVars.DisableTxnAutoRetry {
return true
}

return false
_, err := sessiontxn.GetTxnManager(s).ActivateTxn()
return &s.txn, err
}

func (s *session) NewTxn(ctx context.Context) error {
Expand All @@ -2563,7 +2486,7 @@ func (s *session) NewTxn(ctx context.Context) error {
}
setTxnAssertionLevel(txn, s.sessionVars.AssertionLevel)
s.txn.changeInvalidToValid(txn)
is := temptable.AttachLocalTemporaryTableInfoSchema(s, domain.GetDomain(s).InfoSchema())
is := s.GetDomainInfoSchema()
s.sessionVars.TxnCtx = &variable.TransactionContext{
TxnCtxNoNeedToRestore: variable.TxnCtxNoNeedToRestore{
InfoSchema: is,
Expand Down Expand Up @@ -3209,11 +3132,11 @@ func (s *session) PrepareTSFuture(ctx context.Context, future oracle.Future, sco
return nil
}

func (s *session) GetPreparedTSFuture() oracle.Future {
if future := s.txn.txnFuture; future != nil {
return future.future
func (s *session) GetPreparedTxnFuture() sessionctx.TxnFuture {
if !s.txn.validOrPending() {
return nil
}
return nil
return &s.txn
}

// RefreshTxnCtx implements context.RefreshTxnCtx interface.
Expand Down
24 changes: 24 additions & 0 deletions session/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,30 @@ func (txn *LazyTxn) KeysNeedToLock() ([]kv.Key, error) {
return keys, nil
}

// Wait converts pending txn to valid
func (txn *LazyTxn) Wait(ctx context.Context, sctx sessionctx.Context) (kv.Transaction, error) {
if !txn.validOrPending() {
return txn, errors.AddStack(kv.ErrInvalidTxn)
}
if txn.pending() {
defer func(begin time.Time) {
sctx.GetSessionVars().DurationWaitTS = time.Since(begin)
}(time.Now())

// Transaction is lazy initialized.
// PrepareTxnCtx is called to get a tso future, makes s.txn a pending txn,
// If Txn() is called later, wait for the future to get a valid txn.
if err := txn.changePendingToValid(ctx); err != nil {
logutil.BgLogger().Error("active transaction fail",
zap.Error(err))
txn.cleanup()
sctx.GetSessionVars().TxnCtx.StartTS = 0
return txn, err
}
}
return txn, nil
}

func keyNeedToLock(k, v []byte, flags kv.KeyFlags) bool {
isTableKey := bytes.HasPrefix(k, tablecodec.TablePrefix())
if !isTableKey {
Expand Down
9 changes: 9 additions & 0 deletions session/txnmanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessiontxn"
Expand Down Expand Up @@ -142,6 +143,14 @@ func (m *txnManager) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePo
return m.ctxProvider.OnStmtErrorForNextAction(point, err)
}

// ActivateTxn decides to activate txn according to the parameter `active`
func (m *txnManager) ActivateTxn() (kv.Transaction, error) {
if m.ctxProvider == nil {
return nil, errors.AddStack(kv.ErrInvalidTxn)
}
return m.ctxProvider.ActivateTxn()
}

// OnStmtRetry is the hook that should be called when a statement retry
func (m *txnManager) OnStmtRetry(ctx context.Context) error {
if m.ctxProvider == nil {
Expand Down
11 changes: 9 additions & 2 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ type Context interface {
HasLockedTables() bool
// PrepareTSFuture uses to prepare timestamp by future.
PrepareTSFuture(ctx context.Context, future oracle.Future, scope string) error
// GetPreparedTSFuture returns the prepared ts future
GetPreparedTSFuture() oracle.Future
// GetPreparedTxnFuture returns the prepared ts future
GetPreparedTxnFuture() TxnFuture
// StoreIndexUsage stores the index usage information.
StoreIndexUsage(tblID int64, idxID int64, rowsSelected int64)
// GetTxnWriteThroughputSLI returns the TxnWriteThroughputSLI.
Expand All @@ -176,6 +176,13 @@ type Context interface {
ReleaseAllAdvisoryLocks() int
}

// TxnFuture is an interface where implementations have a kv.Transaction field and after
// calling Wait of the TxnFuture, the kv.Transaction will become valid.
type TxnFuture interface {
// Wait converts pending txn to valid
Wait(ctx context.Context, sctx Context) (kv.Transaction, error)
}

type basicCtxType int

func (t basicCtxType) String() string {
Expand Down
5 changes: 5 additions & 0 deletions sessiontxn/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"context"

"github.com/pingcap/tidb/infoschema"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/parser/ast"
"github.com/pingcap/tidb/sessionctx"
)
Expand Down Expand Up @@ -129,6 +130,8 @@ type TxnContextProvider interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement is retried internally.
OnStmtRetry(ctx context.Context) error
// ActivateTxn activates the transaction.
ActivateTxn() (kv.Transaction, error)
}

// TxnManager is an interface providing txn context management in session
Expand Down Expand Up @@ -158,6 +161,8 @@ type TxnManager interface {
OnStmtErrorForNextAction(point StmtErrorHandlePoint, err error) (StmtErrorAction, error)
// OnStmtRetry is the hook that should be called when a statement retry
OnStmtRetry(ctx context.Context) error
// ActivateTxn activates the transaction.
ActivateTxn() (kv.Transaction, error)
// GetCurrentStmt returns the current statement node
GetCurrentStmt() ast.StmtNode
}
Expand Down
Loading

0 comments on commit 11f39cd

Please sign in to comment.