Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tikv/txn: support local latch in transaction #6418

Merged
merged 22 commits into from
May 7, 2018
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 33 additions & 22 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,18 +40,19 @@ var (

// Config contains configuration options.
type Config struct {
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
Host string `toml:"host" json:"host"`
Port uint `toml:"port" json:"port"`
Store string `toml:"store" json:"store"`
Path string `toml:"path" json:"path"`
Socket string `toml:"socket" json:"socket"`
BinlogSocket string `toml:"binlog-socket" json:"binlog-socket"`
Lease string `toml:"lease" json:"lease"`
RunDDL bool `toml:"run-ddl" json:"run-ddl"`
SplitTable bool `toml:"split-table" json:"split-table"`
TokenLimit uint `toml:"token-limit" json:"token-limit"`
OOMAction string `toml:"oom-action" json:"oom-action"`
EnableStreaming bool `toml:"enable-streaming" json:"enable-streaming"`
TxnLocalLatches TxnLocalLatches `toml:"txn-local-latches" json:"txn-local-latches"`
// Set sys variable lower-case-table-names, ref: https://dev.mysql.com/doc/refman/5.7/en/identifier-case-sensitivity.html.
// TODO: We actually only support mode 2, which keeps the original case, but the comparison is case-insensitive.
LowerCaseTableNames int `toml:"lower-case-table-names" json:"lower-case-table-names"`
Expand Down Expand Up @@ -169,6 +170,12 @@ type PlanCache struct {
Shards uint `toml:"shards" json:"shards"`
}

// TxnLocalLatches is the TxnLocalLatches section of the config.
type TxnLocalLatches struct {
Enabled bool `toml:"enabled" json:"enabled"`
Capacity uint `toml:"capacity" json:"capacity"`
}

// PreparedPlanCache is the PreparedPlanCache section of the config.
type PreparedPlanCache struct {
Enabled bool `toml:"enabled" json:"enabled"`
Expand Down Expand Up @@ -222,16 +229,20 @@ type TiKVClient struct {
}

var defaultConf = Config{
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
Host: "0.0.0.0",
Port: 4000,
Store: "mocktikv",
Path: "/tmp/tidb",
RunDDL: true,
SplitTable: true,
Lease: "45s",
TokenLimit: 1000,
OOMAction: "log",
EnableStreaming: false,
TxnLocalLatches: TxnLocalLatches{
Enabled: false,
Capacity: 1024000,
},
LowerCaseTableNames: 2,
Log: Log{
Level: "info",
Expand Down
6 changes: 6 additions & 0 deletions config/config.toml.example
Original file line number Diff line number Diff line change
Expand Up @@ -223,3 +223,9 @@ grpc-connection-count = 16

# max time for commit command, must be twice bigger than raft election timeout.
commit-timeout = "41s"

[txn-local-latches]
# Enable local latches for transactions. Enable it when
# there are lots of conflicts between transactions.
enabled = false
capacity = 1024000
4 changes: 2 additions & 2 deletions executor/simple.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (e *SimpleExec) executeAlterUser(s *ast.AlterUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down Expand Up @@ -238,7 +238,7 @@ func (e *SimpleExec) executeDropUser(s *ast.DropUserStmt) error {
}
if len(failedUsers) > 0 {
// Commit the transaction even if we returns error
err := e.ctx.Txn().Commit(sessionctx.SetConnID2Ctx(context.Background(), e.ctx))
err := e.ctx.Txn().Commit(sessionctx.SetCommitCtx(context.Background(), e.ctx))
if err != nil {
return errors.Trace(err)
}
Expand Down
8 changes: 7 additions & 1 deletion kv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ import (
"golang.org/x/net/context"
)

// ContextKey is the type of context's key
type ContextKey string

// Retryable is the key in context
const Retryable ContextKey = "Retryable"

// RunInNewTxn will run the f in a new transaction environment.
func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) error {
var (
Expand Down Expand Up @@ -54,7 +60,7 @@ func RunInNewTxn(store Storage, retryable bool, f func(txn Transaction) error) e
return errors.Trace(err)
}

err = txn.Commit(context.Background())
err = txn.Commit(context.WithValue(context.Background(), Retryable, retryable))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really dislike passing those value through context, even txn.SetOption(retryable) is better than this.
If you insist, I'll refactor the code here after it's merged.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tried your suggestion and it's a little complex. I think to refactor the code in another PR is
ok for me.

if err == nil {
break
}
Expand Down
2 changes: 1 addition & 1 deletion server/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ func (cc *clientConn) handleLoadData(ctx context.Context, loadDataInfo *executor
}
return errors.Trace(err)
}
return errors.Trace(txn.Commit(sessionctx.SetConnID2Ctx(ctx, loadDataInfo.Ctx)))
return errors.Trace(txn.Commit(sessionctx.SetCommitCtx(ctx, loadDataInfo.Ctx)))
}

// handleLoadStats does the additional work after processing the 'load stats' query.
Expand Down
2 changes: 1 addition & 1 deletion session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (s *session) doCommit(ctx context.Context) error {
schemaVer: s.sessionVars.TxnCtx.SchemaVersion,
relatedTableIDs: tableIDs,
})
if err := s.txn.Commit(sessionctx.SetConnID2Ctx(ctx, s)); err != nil {
if err := s.txn.Commit(sessionctx.SetCommitCtx(ctx, s)); err != nil {
return errors.Trace(err)
}
return nil
Expand Down
22 changes: 16 additions & 6 deletions sessionctx/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,22 @@ const (
LastExecuteDDL basicCtxType = 3
)

type contextKey string

// ConnID is the key in context.
const ConnID contextKey = "conn ID"
const ConnID kv.ContextKey = "conn ID"

// SetCommitCtx sets the variables for context before commit a transaction.
func SetCommitCtx(ctx context.Context, sessCtx Context) context.Context {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pass such important information through context.Context is not reliable.
If the call just pass a context.Background, those information will loss.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any suggestion?

ctx = context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
retryAble := !sessCtx.GetSessionVars().TxnCtx.ForUpdate
return context.WithValue(ctx, kv.Retryable, retryAble)
}

// SetConnID2Ctx sets the connection ID to context.
func SetConnID2Ctx(ctx context.Context, sessCtx Context) context.Context {
return context.WithValue(ctx, ConnID, sessCtx.GetSessionVars().ConnectionID)
// GetRetryable returns the value of Retryable from the ctx.
func GetRetryable(ctx context.Context) bool {
var retryable bool
val := ctx.Value(kv.Retryable)
if val != nil {
retryable = val.(bool)
}
return retryable
}
10 changes: 10 additions & 0 deletions store/tikv/2pc.go
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,16 @@ func (c *twoPhaseCommitter) cleanupKeys(bo *Backoffer, keys [][]byte) error {
// should be less than `gcRunInterval`.
const maxTxnTimeUse = 590000

func (c *twoPhaseCommitter) executeAndWriteFinishBinlog(ctx context.Context) error {
err := c.execute(ctx)
if err != nil {
c.writeFinishBinlog(binlog.BinlogType_Rollback, 0)
} else {
c.writeFinishBinlog(binlog.BinlogType_Commit, int64(c.commitTS))
}
return errors.Trace(err)
}

// execute executes the two-phase commit protocol.
func (c *twoPhaseCommitter) execute(ctx context.Context) error {
defer func() {
Expand Down
1 change: 1 addition & 0 deletions store/tikv/2pc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ func (s *testCommitterSuite) SetUpTest(c *C) {
spkv := NewMockSafePointKV()
store, err := newTikvStore("mocktikv-store", pdCli, spkv, client, false)
c.Assert(err, IsNil)
store.EnableTxnLocalLatches(1024000)
s.store = store
CommitMaxBackoff = 2000
}
Expand Down
14 changes: 14 additions & 0 deletions store/tikv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/store/tikv/latch"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/store/tikv/oracle/oracles"
"github.com/pingcap/tidb/store/tikv/tikvrpc"
Expand Down Expand Up @@ -71,6 +72,7 @@ func (d Driver) Open(path string) (kv.Storage, error) {
defer mc.Unlock()

security := config.GetGlobalConfig().Security
txnLocalLatches := config.GetGlobalConfig().TxnLocalLatches
etcdAddrs, disableGC, err := parsePath(path)
if err != nil {
return nil, errors.Trace(err)
Expand Down Expand Up @@ -109,6 +111,9 @@ func (d Driver) Open(path string) (kv.Storage, error) {
if err != nil {
return nil, errors.Trace(err)
}
if txnLocalLatches.Enabled {
s.EnableTxnLocalLatches(txnLocalLatches.Capacity)
}
s.etcdAddrs = etcdAddrs
s.tlsConfig = tlsConfig

Expand All @@ -127,6 +132,7 @@ type tikvStore struct {
pdClient pd.Client
regionCache *RegionCache
lockResolver *LockResolver
txnLatches *latch.LatchesScheduler
gcWorker GCHandler
etcdAddrs []string
tlsConfig *tls.Config
Expand Down Expand Up @@ -190,6 +196,10 @@ func newTikvStore(uuid string, pdClient pd.Client, spkv SafePointKV, client Clie
return store, nil
}

func (s *tikvStore) EnableTxnLocalLatches(size uint) {
s.txnLatches = latch.NewScheduler(size)
}

func (s *tikvStore) EtcdAddrs() []string {
return s.etcdAddrs
}
Expand Down Expand Up @@ -274,6 +284,10 @@ func (s *tikvStore) Close() error {
if err := s.client.Close(); err != nil {
return errors.Trace(err)
}

if s.txnLatches != nil {
s.txnLatches.Close()
}
return nil
}

Expand Down
7 changes: 6 additions & 1 deletion store/tikv/latch/latch.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@ func (l *Lock) isLocked() bool {
return !l.isStale && l.acquiredCount != len(l.requiredSlots)
}

// SetCommitTS sets the lock's commitTS.
func (l *Lock) SetCommitTS(commitTS uint64) {
l.commitTS = commitTS
}

// Latches which are used for concurrency control.
// Each latch is indexed by a slot's ID, hence the term latch and slot are used in interchangeable,
// but conceptually a latch is a queue, and a slot is an index to the queue
Expand All @@ -98,7 +103,7 @@ type Latches struct {

// NewLatches create a Latches with fixed length,
// the size will be rounded up to the power of 2.
func NewLatches(size int) *Latches {
func NewLatches(size uint) *Latches {
powerOfTwoSize := 1 << uint32(bits.Len32(uint32(size-1)))
slots := make([]latch, powerOfTwoSize)
return &Latches{
Expand Down
4 changes: 2 additions & 2 deletions store/tikv/latch/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type LatchesScheduler struct {
}

// NewScheduler create the LatchesScheduler.
func NewScheduler(size int) *LatchesScheduler {
func NewScheduler(size uint) *LatchesScheduler {
latches := NewLatches(size)
unlockCh := make(chan *Lock, lockChanSize)
scheduler := &LatchesScheduler{
Expand Down Expand Up @@ -84,7 +84,7 @@ func (scheduler *LatchesScheduler) Lock(startTS uint64, keys [][]byte) *Lock {
}

// UnLock unlocks a lock with commitTS.
func (scheduler *LatchesScheduler) UnLock(lock *Lock, commitTS uint64) {
func (scheduler *LatchesScheduler) UnLock(lock *Lock) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO, this API design is worse than before, because the user may forget to call lock.SetCommitTS after UnLock, while previous API make a stronger contract.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both are ok for me. It's suggested by @coocood , maybe I could update it when you reach an agreement?

scheduler.RLock()
defer scheduler.RUnlock()
if !scheduler.closed {
Expand Down
3 changes: 2 additions & 1 deletion store/tikv/latch/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ func (s *testSchedulerSuite) TestWithConcurrency(c *C) {
for _, txn := range txns {
go func(txn [][]byte, wg *sync.WaitGroup) {
lock := sched.Lock(getTso(), txn)
defer sched.UnLock(lock, getTso())
defer sched.UnLock(lock)
if lock.IsStale() {
// Should restart the transaction or return error
} else {
lock.SetCommitTS(getTso())
// Do 2pc
}
wg.Done()
Expand Down
1 change: 1 addition & 0 deletions store/tikv/test_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewTestTiKVStore(client Client, pdClient pd.Client, clientHijack func(Clien
uid := uuid.NewV4().String()
spkv := NewMockSafePointKV()
tikvStore, err := newTikvStore(uid, pdCli, spkv, client, false)
tikvStore.EnableTxnLocalLatches(1024000)
tikvStore.mock = true
return tikvStore, errors.Trace(err)
}
45 changes: 35 additions & 10 deletions store/tikv/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"golang.org/x/net/context"
)
Expand Down Expand Up @@ -185,20 +184,46 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
connID = val.(uint64)
}
committer, err := newTwoPhaseCommitter(txn, connID)
if err != nil {
if err != nil || committer == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will commiter == nil ? and if it's really nil, the error trace would be oddly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When no row needs to update in this transaction.

return errors.Trace(err)
}
if committer == nil {
return nil

defer func() {
if err == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this can be moved into executeAndWriteFinishBinlog, so defer is unnecessary

txn.commitTS = committer.commitTS
}
}()
// latches disabled
if txn.store.txnLatches == nil {
err = committer.executeAndWriteFinishBinlog(ctx)
log.Debug("[kv]", connID, " txnLatches disabled, 2pc directly:", err)
return errors.Trace(err)
}
err = committer.execute(ctx)
if err != nil {
committer.writeFinishBinlog(binlog.BinlogType_Rollback, 0)

// latches enabled
// for transactions not retryable, commit directly.
if !sessionctx.GetRetryable(ctx) {
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
txn.store.txnLatches.RefreshCommitTS(committer.keys, committer.commitTS)
}
log.Debug("[kv]", connID, " txnLatches enabled while txn not retryable, 2pc directly:", err)
return errors.Trace(err)
}
committer.writeFinishBinlog(binlog.BinlogType_Commit, int64(committer.commitTS))
txn.commitTS = committer.commitTS
return nil

// for transactions which need to acquire latches
lock := txn.store.txnLatches.Lock(committer.startTS, committer.keys)
defer txn.store.txnLatches.UnLock(lock)
if lock.IsStale() {
err = errors.Errorf("startTS %d is stale", txn.startTS)
return errors.Annotate(err, txnRetryableMark)
}
err = committer.executeAndWriteFinishBinlog(ctx)
if err == nil {
lock.SetCommitTS(committer.commitTS)
}
log.Debug("[kv]", connID, " txnLatches enabled while txn retryable:", err)
return errors.Trace(err)
}

func (txn *tikvTxn) close() {
Expand Down