Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv/txn: support local latch in transaction #6418

Merged
merged 22 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ var (

// Config contains configuration options.
type Config struct {
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
EnableTxnLocalLatches bool `toml:"enable-txn-local-latches" json:"enable-txn-local-latches"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand Down Expand Up @@ -222,17 +223,18 @@ type TiKVClient struct {
}

var defaultConf = Config{
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
LowerCaseTableNames: 2,
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
EnableTxnLocalLatches: false,
LowerCaseTableNames: 2,
Log: Log{
Level: "info",
Format: "text",
Expand Down
4 changes: 4 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ oom-action = "log"
# Enable coprocessor streaming.
enable-streaming = false

# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
enable-txn-local-latches = false

# Set system variable 'lower_case_table_names'
lower-case-table-names = 2

Expand Down
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
}
return errors.Trace(err)
}
return errors.Trace(txn.Commit(sessionctx.SetConnID2Ctx(ctx, loadDataInfo.Ctx)))
return errors.Trace(txn.Commit(sessionctx.SetCommitCtx(ctx, loadDataInfo.Ctx)))
}

// handleLoadStats does the additional work after processing the 'load stats' query.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *session) doCommit(ctx context.Context) error {
schemaVer: s.sessionVars.TxnCtx.SchemaVersion,
relatedTableIDs: tableIDs,
})
if err := s.txn.Commit(sessionctx.SetConnID2Ctx(ctx, s)); err != nil {
if err := s.txn.Commit(sessionctx.SetCommitCtx(ctx, s)); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
21 changes: 18 additions & 3 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,22 @@ type contextKey string
// ConnID is the key in context.
const ConnID contextKey = "conn ID"

// SetConnID2Ctx sets the connection ID to context.
func SetConnID2Ctx(ctx context.Context, sessCtx Context) context.Context {
return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
// Retryable is the key in context
const Retryable contextKey = "Retryable"

// SetCommitCtx sets the variables for context before commit a transaction.
func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass such important information through context.Context is not reliable.
If the call just pass a context.Background, those information will loss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion?

ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate
return context.WithValue(ctx, Retryable, retryAble)
}

// GetRetryable returns the value of GetRetryable from the ctx.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value of retryable.

func GetRetryable(ctx context.Context) bool {
var retryAble bool
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is no need to capitalize the a.

val := ctx.Value(Retryable)
if val != nil {
retryAble = val.(bool)
}
return retryAble
}
11 changes: 11 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,17 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
// should be less than `gcRunInterval`.
const maxTxnTimeUse = 590000

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

redundant blank line.

} else {
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
defer func() {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
client := mocktikv.NewRPCClient(s.cluster, mvccStore)
pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)}
spkv := NewMockSafePointKV()
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false)
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false, true)
c.Assert(err, IsNil)
s.store = store
CommitMaxBackoff = 2000
Expand Down
16 changes: 14 additions & 2 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/latch"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
defer mc.Unlock()

security := config.GetGlobalConfig().Security
enableTxnLocalLatches := config.GetGlobalConfig().EnableTxnLocalLatches
etcdAddrs, disableGC, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -105,7 +107,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
return nil, errors.Trace(err)
}

s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC)
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC, enableTxnLocalLatches)
if err != nil {
return nil, errors.Trace(err)
}
Expand All @@ -127,6 +129,7 @@ type tikvStore struct {
pdClient pd.Client
regionCache *RegionCache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
gcWorker GCHandler
etcdAddrs []string
tlsConfig *tls.Config
Expand Down Expand Up @@ -165,7 +168,7 @@ func (s *tikvStore) CheckVisibility(startTime uint64) error {
return nil
}

func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool) (*tikvStore, error) {
func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC, enableTxnLocalLatches bool) (*tikvStore, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are too many arguments to new TiKVStore.
Can we remove enableGC and enableTxnLocalLatches, and set them in a method when we want the non-default value?

o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond)
if err != nil {
return nil, errors.Trace(err)
Expand All @@ -183,6 +186,11 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie
closed: make(chan struct{}),
}
store.lockResolver = newLockResolver(store)
if enableTxnLocalLatches {
store.txnLatches = latch.NewScheduler(102400)
} else {
store.txnLatches = nil
}
store.enableGC = enableGC

go store.runSafePointChecker()
Expand Down Expand Up @@ -274,6 +282,10 @@ func (s *tikvStore) Close() error {
if err := s.client.Close(); err != nil {
return errors.Trace(err)
}

if s.txnLatches != nil {
s.txnLatches.Close()
}
return nil
}

Expand Down
1 change: 1 addition & 0 deletions store/tikv/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock {

// UnLock unlocks a lock with commitTS.
func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) {
lock.commitTS = commitTS
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can remove the argument, and set commitTS before UnLock.

scheduler.RLock()
defer scheduler.RUnlock()
if !scheduler.closed {
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/lock_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve
return nil, errors.Trace(err)
}

s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false)
s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false, false)
if err != nil {
return nil, errors.Trace(err)
}
Expand Down
2 changes: 1 addition & 1 deletion store/tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
// Make sure the uuid is unique.
uid := uuid.NewV4().String()
spkv := NewMockSafePointKV()
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false)
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false, true)
tikvStore.mock = true
return tikvStore, errors.Trace(err)
}
43 changes: 33 additions & 10 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -185,20 +184,44 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
if err != nil {
if err != nil || committer == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will commiter == nil ? and if it's really nil, the error trace would be oddly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no row needs to update in this transaction.

return errors.Trace(err)
}
if committer == nil {
return nil

defer func() {
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be moved into executeAndWriteFinishBinlog, so defer is unnecessary

txn.commitTS = committer.commitTS
}
}()
// latches disabled
if txn.store.txnLatches == nil {
err = committer.executeAndWriteFinishBinlog(ctx)
return errors.Trace(err)
}
err = committer.execute(ctx)
if err != nil {
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)

// latches enabled
// for transactions not retryable, commit directly.
if !sessionctx.GetRetryable(ctx) {
err = committer.executeAndWriteFinishBinlog(ctx)
txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.startTS)
Copy link
Contributor

@alivxxx alivxxx Apr 28, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use the startTS to refresh? Can we refresh if an error occurred when commit?

return errors.Trace(err)
}
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
txn.commitTS = committer.commitTS
return nil

// for transactions which need to acquire latches
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
defer func() {
commitTS := uint64(0)
if err == nil {
commitTS = committer.commitTS
}
txn.store.txnLatches.UnLock(lock, commitTS)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The commitTS is not used in UnLock.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we put commitTS into lock?

}()
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
return errors.Annotate(err, txnRetryableMark)
}
err = committer.executeAndWriteFinishBinlog(ctx)
return errors.Trace(err)
}

func (txn *tikvTxn) close() {
Expand Down