From 656dd7b665a71f30772f05da741457c762fef804 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 09:37:07 +0800 Subject: [PATCH 01/10] tikv/txn: support latch in transaction --- config/config.go | 48 +++++++++++++++++++------------------ config/config.toml.example | 4 ++++ executor/simple.go | 4 ++-- server/conn.go | 2 +- session/session.go | 2 +- sessionctx/context.go | 21 +++++++++++++--- store/tikv/2pc.go | 11 +++++++++ store/tikv/2pc_test.go | 2 +- store/tikv/kv.go | 16 +++++++++++-- store/tikv/lock_resolver.go | 2 +- store/tikv/test_util.go | 2 +- store/tikv/txn.go | 43 +++++++++++++++++++++++++-------- 12 files changed, 112 insertions(+), 45 deletions(-) diff --git a/config/config.go b/config/config.go index e3a3172d1e775..682bb09e4e1a0 100644 --- a/config/config.go +++ b/config/config.go @@ -40,18 +40,19 @@ var ( // Config contains configuration options. type Config struct { - Host string `toml:"host" json:"host"` - Port uint `toml:"port" json:"port"` - Store string `toml:"store" json:"store"` - Path string `toml:"path" json:"path"` - Socket string `toml:"socket" json:"socket"` - BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` - Lease string `toml:"lease" json:"lease"` - RunDDL bool `toml:"run-ddl" json:"run-ddl"` - SplitTable bool `toml:"split-table" json:"split-table"` - TokenLimit uint `toml:"token-limit" json:"token-limit"` - OOMAction string `toml:"oom-action" json:"oom-action"` - EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` + Host string `toml:"host" json:"host"` + Port uint `toml:"port" json:"port"` + Store string `toml:"store" json:"store"` + Path string `toml:"path" json:"path"` + Socket string `toml:"socket" json:"socket"` + BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` + Lease string `toml:"lease" json:"lease"` + RunDDL bool `toml:"run-ddl" json:"run-ddl"` + SplitTable bool `toml:"split-table" json:"split-table"` + TokenLimit uint `toml:"token-limit" json:"token-limit"` + OOMAction string `toml:"oom-action" json:"oom-action"` + EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` + EnableTxnLocalLatches bool `toml:"enable-txn-local-latches" json:"enable-txn-local-latches"` // Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html. // TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive. LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"` @@ -222,17 +223,18 @@ type TiKVClient struct { } var defaultConf = Config{ - Host: "0.0.0.0", - Port: 4000, - Store: "mocktikv", - Path: "/tmp/tidb", - RunDDL: true, - SplitTable: true, - Lease: "45s", - TokenLimit: 1000, - OOMAction: "log", - EnableStreaming: false, - LowerCaseTableNames: 2, + Host: "0.0.0.0", + Port: 4000, + Store: "mocktikv", + Path: "/tmp/tidb", + RunDDL: true, + SplitTable: true, + Lease: "45s", + TokenLimit: 1000, + OOMAction: "log", + EnableStreaming: false, + EnableTxnLocalLatches: false, + LowerCaseTableNames: 2, Log: Log{ Level: "info", Format: "text", diff --git a/config/config.toml.example b/config/config.toml.example index 2dec9cde5acbd..0ad312e9cbdcf 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -38,6 +38,10 @@ oom-action = "log" # Enable coprocessor streaming. enable-streaming = false +# Enable local latches for transactions. Enable it when +# there are lots of conflicts between transactions. +enable-txn-local-latches = false + # Set system variable 'lower_case_table_names' lower-case-table-names = 2 diff --git a/executor/simple.go b/executor/simple.go index a4d4387bccd81..51dd3b6196a87 100644 --- a/executor/simple.go +++ b/executor/simple.go @@ -206,7 +206,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) + err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } @@ -238,7 +238,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error { } if len(failedUsers) > 0 { // Commit the transaction even if we returns error - err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx)) + err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx)) if err != nil { return errors.Trace(err) } diff --git a/server/conn.go b/server/conn.go index df2cb26a7b25a..07f12970e47d1 100644 --- a/server/conn.go +++ b/server/conn.go @@ -806,7 +806,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor } return errors.Trace(err) } - return errors.Trace(txn.Commit(sessionctx.SetConnID2Ctx(ctx, loadDataInfo.Ctx))) + return errors.Trace(txn.Commit(sessionctx.SetCommitCtx(ctx, loadDataInfo.Ctx))) } // handleLoadStats does the additional work after processing the 'load stats' query. diff --git a/session/session.go b/session/session.go index 4772e37ec733f..2ced98c25ef5a 100644 --- a/session/session.go +++ b/session/session.go @@ -320,7 +320,7 @@ func (s *session) doCommit(ctx context.Context) error { schemaVer: s.sessionVars.TxnCtx.SchemaVersion, relatedTableIDs: tableIDs, }) - if err := s.txn.Commit(sessionctx.SetConnID2Ctx(ctx, s)); err != nil { + if err := s.txn.Commit(sessionctx.SetCommitCtx(ctx, s)); err != nil { return errors.Trace(err) } return nil diff --git a/sessionctx/context.go b/sessionctx/context.go index 93d1cbe5966c5..da46cb8b6d7eb 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -112,7 +112,22 @@ type contextKey string // ConnID is the key in context. const ConnID contextKey = "conn ID" -// SetConnID2Ctx sets the connection ID to context. -func SetConnID2Ctx(ctx context.Context, sessCtx Context) context.Context { - return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) +// Retryable is the key in context +const Retryable contextKey = "Retryable" + +// SetCommitCtx sets the variables for context before commit a transaction. +func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context { + ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) + retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate + return context.WithValue(ctx, Retryable, retryAble) +} + +// GetRetryable returns the value of GetRetryable from the ctx. +func GetRetryable(ctx context.Context) bool { + var retryAble bool + val := ctx.Value(Retryable) + if val != nil { + retryAble = val.(bool) + } + return retryAble } diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index ccd4d43608408..341e49b9ab5f2 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -563,6 +563,17 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error { // should be less than `gcRunInterval`. const maxTxnTimeUse = 590000 +func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error { + err := c.execute(ctx) + if err != nil { + c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) + + } else { + c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) + } + return errors.Trace(err) +} + // execute executes the two-phase commit protocol. func (c *twoPhaseCommitter) execute(ctx context.Context) error { defer func() { diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 54a738d1c2989..adf2f538036b5 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -43,7 +43,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) { client := mocktikv.NewRPCClient(s.cluster, mvccStore) pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)} spkv := NewMockSafePointKV() - store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false) + store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false, true) c.Assert(err, IsNil) s.store = store CommitMaxBackoff = 2000 diff --git a/store/tikv/kv.go b/store/tikv/kv.go index acce41f8b91cd..3645245b8972b 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/tidb/config" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" + "github.com/pingcap/tidb/store/tikv/latch" "github.com/pingcap/tidb/store/tikv/oracle" "github.com/pingcap/tidb/store/tikv/oracle/oracles" "github.com/pingcap/tidb/store/tikv/tikvrpc" @@ -71,6 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) { defer mc.Unlock() security := config.GetGlobalConfig().Security + enableTxnLocalLatches := config.GetGlobalConfig().EnableTxnLocalLatches etcdAddrs, disableGC, err := parsePath(path) if err != nil { return nil, errors.Trace(err) @@ -105,7 +107,7 @@ func (d Driver) Open(path string) (kv.Storage, error) { return nil, errors.Trace(err) } - s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC) + s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC, enableTxnLocalLatches) if err != nil { return nil, errors.Trace(err) } @@ -127,6 +129,7 @@ type tikvStore struct { pdClient pd.Client regionCache *RegionCache lockResolver *LockResolver + txnLatches *latch.LatchesScheduler gcWorker GCHandler etcdAddrs []string tlsConfig *tls.Config @@ -165,7 +168,7 @@ func (s *tikvStore) CheckVisibility(startTime uint64) error { return nil } -func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool) (*tikvStore, error) { +func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC, enableTxnLocalLatches bool) (*tikvStore, error) { o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond) if err != nil { return nil, errors.Trace(err) @@ -183,6 +186,11 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie closed: make(chan struct{}), } store.lockResolver = newLockResolver(store) + if enableTxnLocalLatches { + store.txnLatches = latch.NewScheduler(102400) + } else { + store.txnLatches = nil + } store.enableGC = enableGC go store.runSafePointChecker() @@ -274,6 +282,10 @@ func (s *tikvStore) Close() error { if err := s.client.Close(); err != nil { return errors.Trace(err) } + + if s.txnLatches != nil { + s.txnLatches.Close() + } return nil } diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 62710fd544338..9a8741e6f94a8 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -76,7 +76,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve return nil, errors.Trace(err) } - s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false) + s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false, false) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index d461e732f84b8..1d60ba9af3c98 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -34,7 +34,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien // Make sure the uuid is unique. uid := uuid.NewV4().String() spkv := NewMockSafePointKV() - tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false) + tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false, true) tikvStore.mock = true return tikvStore, errors.Trace(err) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index f2a211bd5cbc7..01a1bbbd27940 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -22,7 +22,6 @@ import ( "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/metrics" "github.com/pingcap/tidb/sessionctx" - binlog "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) @@ -185,20 +184,44 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { connID = val.(uint64) } committer, err := newTwoPhaseCommitter(txn, connID) - if err != nil { + if err != nil || committer == nil { return errors.Trace(err) } - if committer == nil { - return nil + + defer func() { + if err == nil { + txn.commitTS = committer.commitTS + } + }() + // latches disabled + if txn.store.txnLatches == nil { + err = committer.executeAndWriteFinishBinlog(ctx) + return errors.Trace(err) } - err = committer.execute(ctx) - if err != nil { - committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0) + + // latches enabled + // for transactions not retryable, commit directly. + if !sessionctx.GetRetryable(ctx) { + err = committer.executeAndWriteFinishBinlog(ctx) + txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.startTS) return errors.Trace(err) } - committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS)) - txn.commitTS = committer.commitTS - return nil + + // for transactions which need to acquire latches + lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys) + defer func() { + commitTS := uint64(0) + if err == nil { + commitTS = committer.commitTS + } + txn.store.txnLatches.UnLock(lock, commitTS) + }() + if lock.IsStale() { + err = errors.Errorf("startTS %d is stale", txn.startTS) + return errors.Annotate(err, txnRetryableMark) + } + err = committer.executeAndWriteFinishBinlog(ctx) + return errors.Trace(err) } func (txn *tikvTxn) close() { From b51c483b776d93c22efe1f1a9450ee661ff1ca85 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 11:46:43 +0800 Subject: [PATCH 02/10] address comments --- store/tikv/latch/scheduler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index 050b3e2172794..41c432808d2c0 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -85,6 +85,7 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock { // UnLock unlocks a lock with commitTS. func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) { + lock.commitTS = commitTS scheduler.RLock() defer scheduler.RUnlock() if !scheduler.closed { From ed8154baf4b396259eb23edf238b71c3becde8cc Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 13:30:40 +0800 Subject: [PATCH 03/10] address comments --- store/tikv/latch/latch.go | 5 +++++ store/tikv/latch/scheduler.go | 3 +-- store/tikv/latch/scheduler_test.go | 3 ++- store/tikv/txn.go | 11 ++++------- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 70f818c853daa..16477f47738b9 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -86,6 +86,11 @@ func (l *Lock) isLocked() bool { return !l.isStale && l.acquiredCount != len(l.requiredSlots) } +// SetCommitTS sets the lock's commitTS. +func (l *Lock) SetCommitTS(commitTS uint64) { + l.commitTS = commitTS +} + // Latches which are used for concurrency control. // Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable, // but conceptually a latch is a queue, and a slot is an index to the queue diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index 41c432808d2c0..e1d572e7cb7d7 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -84,8 +84,7 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock { } // UnLock unlocks a lock with commitTS. -func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) { - lock.commitTS = commitTS +func (scheduler *LatchesScheduler) UnLock(lock *Lock) { scheduler.RLock() defer scheduler.RUnlock() if !scheduler.closed { diff --git a/store/tikv/latch/scheduler_test.go b/store/tikv/latch/scheduler_test.go index 80970f40dbff2..d57737fb9512d 100644 --- a/store/tikv/latch/scheduler_test.go +++ b/store/tikv/latch/scheduler_test.go @@ -41,10 +41,11 @@ func (s *testSchedulerSuite) TestWithConcurrency(c *C) { for _, txn := range txns { go func(txn [][]byte, wg *sync.WaitGroup) { lock := sched.Lock(getTso(), txn) - defer sched.UnLock(lock, getTso()) + defer sched.UnLock(lock) if lock.IsStale() { // Should restart the transaction or return error } else { + lock.SetCommitTS(getTso()) // Do 2pc } wg.Done() diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 01a1bbbd27940..daa58cc508361 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -209,18 +209,15 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // for transactions which need to acquire latches lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys) - defer func() { - commitTS := uint64(0) - if err == nil { - commitTS = committer.commitTS - } - txn.store.txnLatches.UnLock(lock, commitTS) - }() + defer txn.store.txnLatches.UnLock(lock) if lock.IsStale() { err = errors.Errorf("startTS %d is stale", txn.startTS) return errors.Annotate(err, txnRetryableMark) } err = committer.executeAndWriteFinishBinlog(ctx) + if err == nil { + lock.SetCommitTS(committer.commitTS) + } return errors.Trace(err) } From 41dc21c73556b0b4330d27b171a9c74ff377df22 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 14:19:17 +0800 Subject: [PATCH 04/10] address comments --- config/config.go | 59 ++++++++++++++++++++--------------- config/config.toml.example | 10 +++--- store/tikv/2pc_test.go | 3 +- store/tikv/kv.go | 18 ++++++----- store/tikv/latch/latch.go | 2 +- store/tikv/latch/scheduler.go | 2 +- store/tikv/lock_resolver.go | 2 +- store/tikv/test_util.go | 3 +- 8 files changed, 57 insertions(+), 42 deletions(-) diff --git a/config/config.go b/config/config.go index 682bb09e4e1a0..4d12ae1d7d25b 100644 --- a/config/config.go +++ b/config/config.go @@ -40,19 +40,19 @@ var ( // Config contains configuration options. type Config struct { - Host string `toml:"host" json:"host"` - Port uint `toml:"port" json:"port"` - Store string `toml:"store" json:"store"` - Path string `toml:"path" json:"path"` - Socket string `toml:"socket" json:"socket"` - BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` - Lease string `toml:"lease" json:"lease"` - RunDDL bool `toml:"run-ddl" json:"run-ddl"` - SplitTable bool `toml:"split-table" json:"split-table"` - TokenLimit uint `toml:"token-limit" json:"token-limit"` - OOMAction string `toml:"oom-action" json:"oom-action"` - EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` - EnableTxnLocalLatches bool `toml:"enable-txn-local-latches" json:"enable-txn-local-latches"` + Host string `toml:"host" json:"host"` + Port uint `toml:"port" json:"port"` + Store string `toml:"store" json:"store"` + Path string `toml:"path" json:"path"` + Socket string `toml:"socket" json:"socket"` + BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"` + Lease string `toml:"lease" json:"lease"` + RunDDL bool `toml:"run-ddl" json:"run-ddl"` + SplitTable bool `toml:"split-table" json:"split-table"` + TokenLimit uint `toml:"token-limit" json:"token-limit"` + OOMAction string `toml:"oom-action" json:"oom-action"` + EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"` + TxnLocalLatches TxnLocalLatches `toml:"txn-local-latches" json:"txn-local-latches"` // Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html. // TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive. LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"` @@ -170,6 +170,12 @@ type PlanCache struct { Shards uint `toml:"shards" json:"shards"` } +// TxnLocalLatches is the TxnLocalLatches section of the config. +type TxnLocalLatches struct { + Enabled bool `toml:"enabled" json:"enabled"` + Capacity uint `toml:"capacity" json:"capacity"` +} + // PreparedPlanCache is the PreparedPlanCache section of the config. type PreparedPlanCache struct { Enabled bool `toml:"enabled" json:"enabled"` @@ -223,18 +229,21 @@ type TiKVClient struct { } var defaultConf = Config{ - Host: "0.0.0.0", - Port: 4000, - Store: "mocktikv", - Path: "/tmp/tidb", - RunDDL: true, - SplitTable: true, - Lease: "45s", - TokenLimit: 1000, - OOMAction: "log", - EnableStreaming: false, - EnableTxnLocalLatches: false, - LowerCaseTableNames: 2, + Host: "0.0.0.0", + Port: 4000, + Store: "mocktikv", + Path: "/tmp/tidb", + RunDDL: true, + SplitTable: true, + Lease: "45s", + TokenLimit: 1000, + OOMAction: "log", + EnableStreaming: false, + TxnLocalLatches: TxnLocalLatches{ + Enabled: false, + Capacity: 1024000, + }, + LowerCaseTableNames: 2, Log: Log{ Level: "info", Format: "text", diff --git a/config/config.toml.example b/config/config.toml.example index 0ad312e9cbdcf..c46d5a93fe8b5 100644 --- a/config/config.toml.example +++ b/config/config.toml.example @@ -38,10 +38,6 @@ oom-action = "log" # Enable coprocessor streaming. enable-streaming = false -# Enable local latches for transactions. Enable it when -# there are lots of conflicts between transactions. -enable-txn-local-latches = false - # Set system variable 'lower_case_table_names' lower-case-table-names = 2 @@ -227,3 +223,9 @@ grpc-connection-count = 16 # max time for commit command, must be twice bigger than raft election timeout. commit-timeout = "41s" + +[txn-local-latches] +# Enable local latches for transactions. Enable it when +# there are lots of conflicts between transactions. +enabled = false +capacity = 1024000 diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index adf2f538036b5..27a9f8bbf7e33 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -43,8 +43,9 @@ func (s *testCommitterSuite) SetUpTest(c *C) { client := mocktikv.NewRPCClient(s.cluster, mvccStore) pdCli := &codecPDClient{mocktikv.NewPDClient(s.cluster)} spkv := NewMockSafePointKV() - store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false, true) + store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false) c.Assert(err, IsNil) + store.EnableTxnLocalLatches(1024) s.store = store CommitMaxBackoff = 2000 } diff --git a/store/tikv/kv.go b/store/tikv/kv.go index 3645245b8972b..d659f20dd3452 100644 --- a/store/tikv/kv.go +++ b/store/tikv/kv.go @@ -72,7 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) { defer mc.Unlock() security := config.GetGlobalConfig().Security - enableTxnLocalLatches := config.GetGlobalConfig().EnableTxnLocalLatches + txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches etcdAddrs, disableGC, err := parsePath(path) if err != nil { return nil, errors.Trace(err) @@ -107,10 +107,13 @@ func (d Driver) Open(path string) (kv.Storage, error) { return nil, errors.Trace(err) } - s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC, enableTxnLocalLatches) + s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), !disableGC) if err != nil { return nil, errors.Trace(err) } + if txnLocalLatches.Enabled { + s.EnableTxnLocalLatches(txnLocalLatches.Capacity) + } s.etcdAddrs = etcdAddrs s.tlsConfig = tlsConfig @@ -168,7 +171,7 @@ func (s *tikvStore) CheckVisibility(startTime uint64) error { return nil } -func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC, enableTxnLocalLatches bool) (*tikvStore, error) { +func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Client, enableGC bool) (*tikvStore, error) { o, err := oracles.NewPdOracle(pdClient, time.Duration(oracleUpdateInterval)*time.Millisecond) if err != nil { return nil, errors.Trace(err) @@ -186,11 +189,6 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie closed: make(chan struct{}), } store.lockResolver = newLockResolver(store) - if enableTxnLocalLatches { - store.txnLatches = latch.NewScheduler(102400) - } else { - store.txnLatches = nil - } store.enableGC = enableGC go store.runSafePointChecker() @@ -198,6 +196,10 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie return store, nil } +func (s *tikvStore) EnableTxnLocalLatches(size uint) { + s.txnLatches = latch.NewScheduler(size) +} + func (s *tikvStore) EtcdAddrs() []string { return s.etcdAddrs } diff --git a/store/tikv/latch/latch.go b/store/tikv/latch/latch.go index 16477f47738b9..a8ca718947490 100644 --- a/store/tikv/latch/latch.go +++ b/store/tikv/latch/latch.go @@ -103,7 +103,7 @@ type Latches struct { // NewLatches create a Latches with fixed length, // the size will be rounded up to the power of 2. -func NewLatches(size int) *Latches { +func NewLatches(size uint) *Latches { powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1))) slots := make([]latch, powerOfTwoSize) return &Latches{ diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index e1d572e7cb7d7..f26e0c740b531 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -28,7 +28,7 @@ type LatchesScheduler struct { } // NewScheduler create the LatchesScheduler. -func NewScheduler(size int) *LatchesScheduler { +func NewScheduler(size uint) *LatchesScheduler { latches := NewLatches(size) unlockCh := make(chan *Lock, lockChanSize) scheduler := &LatchesScheduler{ diff --git a/store/tikv/lock_resolver.go b/store/tikv/lock_resolver.go index 9a8741e6f94a8..62710fd544338 100644 --- a/store/tikv/lock_resolver.go +++ b/store/tikv/lock_resolver.go @@ -76,7 +76,7 @@ func NewLockResolver(etcdAddrs []string, security config.Security) (*LockResolve return nil, errors.Trace(err) } - s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false, false) + s, err := newTikvStore(uuid, &codecPDClient{pdCli}, spkv, newRPCClient(security), false) if err != nil { return nil, errors.Trace(err) } diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index 1d60ba9af3c98..81de2f4fc0ee2 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -34,7 +34,8 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien // Make sure the uuid is unique. uid := uuid.NewV4().String() spkv := NewMockSafePointKV() - tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false, true) + tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false) + tikvStore.EnableTxnLocalLatches(1024) tikvStore.mock = true return tikvStore, errors.Trace(err) } From 25a893f7abd7ed6b95831d108670bf8b624eb76f Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 20:41:47 +0800 Subject: [PATCH 05/10] address comments --- store/tikv/2pc.go | 1 - store/tikv/txn.go | 4 +++- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index 341e49b9ab5f2..f3f1e4a873d63 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -567,7 +567,6 @@ func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) err err := c.execute(ctx) if err != nil { c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) - } else { c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index daa58cc508361..040e02064e98c 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -203,7 +203,9 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // for transactions not retryable, commit directly. if !sessionctx.GetRetryable(ctx) { err = committer.executeAndWriteFinishBinlog(ctx) - txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.startTS) + if err == nil { + txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS) + } return errors.Trace(err) } From cb902391bef38f84dfc5bee22224797adde2a26e Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Sat, 28 Apr 2018 21:09:45 +0800 Subject: [PATCH 06/10] address comments --- sessionctx/context.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sessionctx/context.go b/sessionctx/context.go index da46cb8b6d7eb..357c4baad5d08 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -122,12 +122,12 @@ func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context { return context.WithValue(ctx, Retryable, retryAble) } -// GetRetryable returns the value of GetRetryable from the ctx. +// GetRetryable returns the value of Retryable from the ctx. func GetRetryable(ctx context.Context) bool { - var retryAble bool + var retryable bool val := ctx.Value(Retryable) if val != nil { - retryAble = val.(bool) + retryable = val.(bool) } - return retryAble + return retryable } From 6f3263b7709de4e1649db454d27a5e95dc9caf74 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Wed, 2 May 2018 15:27:53 +0800 Subject: [PATCH 07/10] kv/txn: set retryable in context for commit --- kv/txn.go | 8 +++++++- sessionctx/context.go | 11 +++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/kv/txn.go b/kv/txn.go index 5bea8f384ae94..1318998362ef1 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -24,6 +24,12 @@ import ( "golang.org/x/net/context" ) +// ContextKey is the type of context's key +type ContextKey string + +// Retryable is the key in context +const Retryable ContextKey = "Retryable" + // RunInNewTxn will run the f in a new transaction environment. func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error { var ( @@ -54,7 +60,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e return errors.Trace(err) } - err = txn.Commit(context.Background()) + err = txn.Commit(context.WithValue(context.Background(), Retryable, retryable)) if err == nil { break } diff --git a/sessionctx/context.go b/sessionctx/context.go index 357c4baad5d08..fff2e3c46ce71 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -107,25 +107,20 @@ const ( LastExecuteDDL basicCtxType = 3 ) -type contextKey string - // ConnID is the key in context. -const ConnID contextKey = "conn ID" - -// Retryable is the key in context -const Retryable contextKey = "Retryable" +const ConnID kv.ContextKey = "conn ID" // SetCommitCtx sets the variables for context before commit a transaction. func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context { ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID) retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate - return context.WithValue(ctx, Retryable, retryAble) + return context.WithValue(ctx, kv.Retryable, retryAble) } // GetRetryable returns the value of Retryable from the ctx. func GetRetryable(ctx context.Context) bool { var retryable bool - val := ctx.Value(Retryable) + val := ctx.Value(kv.Retryable) if val != nil { retryable = val.(bool) } From 7657fa4b46fe2411b90c5d72bf1c590214cd4af6 Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Thu, 3 May 2018 10:09:21 +0800 Subject: [PATCH 08/10] tikv/txn: add debug log --- store/tikv/2pc_test.go | 2 +- store/tikv/test_util.go | 2 +- store/tikv/txn.go | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) diff --git a/store/tikv/2pc_test.go b/store/tikv/2pc_test.go index 27a9f8bbf7e33..9dae0941e5fc3 100644 --- a/store/tikv/2pc_test.go +++ b/store/tikv/2pc_test.go @@ -45,7 +45,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) { spkv := NewMockSafePointKV() store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false) c.Assert(err, IsNil) - store.EnableTxnLocalLatches(1024) + store.EnableTxnLocalLatches(1024000) s.store = store CommitMaxBackoff = 2000 } diff --git a/store/tikv/test_util.go b/store/tikv/test_util.go index 81de2f4fc0ee2..0e48ec6cabc97 100644 --- a/store/tikv/test_util.go +++ b/store/tikv/test_util.go @@ -35,7 +35,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien uid := uuid.NewV4().String() spkv := NewMockSafePointKV() tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false) - tikvStore.EnableTxnLocalLatches(1024) + tikvStore.EnableTxnLocalLatches(1024000) tikvStore.mock = true return tikvStore, errors.Trace(err) } diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 040e02064e98c..a2b0839b98368 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -196,6 +196,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { // latches disabled if txn.store.txnLatches == nil { err = committer.executeAndWriteFinishBinlog(ctx) + log.Debug("[kv]", connID, " txnLatches disabled, 2pc directly:", err) return errors.Trace(err) } @@ -206,6 +207,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if err == nil { txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS) } + log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err) return errors.Trace(err) } @@ -220,6 +222,7 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if err == nil { lock.SetCommitTS(committer.commitTS) } + log.Debug("[kv]", connID, " txnLatches enabled while txn retryable:", err) return errors.Trace(err) } From c6a0002545a8f16267bd1addd3853f2f86d449bb Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 4 May 2018 09:33:26 +0800 Subject: [PATCH 09/10] mvcc/reader: address comments --- store/tikv/2pc.go | 1 + store/tikv/txn.go | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/store/tikv/2pc.go b/store/tikv/2pc.go index f3f1e4a873d63..12afbb35d8f77 100644 --- a/store/tikv/2pc.go +++ b/store/tikv/2pc.go @@ -568,6 +568,7 @@ func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) err if err != nil { c.writeFinishBinlog(binlog.BinlogType_Rollback, 0) } else { + c.txn.commitTS = c.commitTS c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS)) } return errors.Trace(err) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index a2b0839b98368..1987bee8b6401 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -187,12 +187,6 @@ func (txn *tikvTxn) Commit(ctx context.Context) error { if err != nil || committer == nil { return errors.Trace(err) } - - defer func() { - if err == nil { - txn.commitTS = committer.commitTS - } - }() // latches disabled if txn.store.txnLatches == nil { err = committer.executeAndWriteFinishBinlog(ctx) From 37695a63a5e661b24533477c34109b874f6ec23a Mon Sep 17 00:00:00 2001 From: wuxuelian Date: Fri, 4 May 2018 09:40:50 +0800 Subject: [PATCH 10/10] update comments --- store/tikv/latch/scheduler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/store/tikv/latch/scheduler.go b/store/tikv/latch/scheduler.go index f26e0c740b531..f3ffad7a77d9f 100644 --- a/store/tikv/latch/scheduler.go +++ b/store/tikv/latch/scheduler.go @@ -83,7 +83,7 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock { return lock } -// UnLock unlocks a lock with commitTS. +// UnLock unlocks a lock. func (scheduler *LatchesScheduler) UnLock(lock *Lock) { scheduler.RLock() defer scheduler.RUnlock()