-
Notifications
You must be signed in to change notification settings - Fork 5.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
table: Add option DupKeyCheckLazy
to check duplicated key lazily
#55246
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -22,7 +22,6 @@ import ( | |||||||||||||||||
"time" | ||||||||||||||||||
|
||||||||||||||||||
"github.com/pingcap/errors" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/errctx" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/errno" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/executor/internal/exec" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/expression" | ||||||||||||||||||
|
@@ -31,6 +30,7 @@ import ( | |||||||||||||||||
"github.com/pingcap/tidb/pkg/parser/model" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/parser/mysql" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/parser/terror" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/sessionctx/variable" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/table" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/table/tables" | ||||||||||||||||||
"github.com/pingcap/tidb/pkg/tablecodec" | ||||||||||||||||||
|
@@ -47,6 +47,7 @@ import ( | |||||||||||||||||
type InsertExec struct { | ||||||||||||||||||
*InsertValues | ||||||||||||||||||
OnDuplicate []*expression.Assignment | ||||||||||||||||||
IgnoreErr bool | ||||||||||||||||||
evalBuffer4Dup chunk.MutRow | ||||||||||||||||||
curInsertVals chunk.MutRow | ||||||||||||||||||
row4Update []types.Datum | ||||||||||||||||||
|
@@ -66,7 +67,6 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { | |||||||||||||||||
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode. | ||||||||||||||||||
sessVars := e.Ctx().GetSessionVars() | ||||||||||||||||||
defer sessVars.CleanBuffers() | ||||||||||||||||||
ignoreErr := sessVars.StmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError | ||||||||||||||||||
|
||||||||||||||||||
txn, err := e.Ctx().Txn(true) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
|
@@ -92,13 +92,14 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { | |||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
} else if ignoreErr { | ||||||||||||||||||
} else if e.IgnoreErr { | ||||||||||||||||||
err := e.batchCheckAndInsert(ctx, rows, e.addRecord, false) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
} else { | ||||||||||||||||||
start := time.Now() | ||||||||||||||||||
dupKeyCheck := optimizeDupKeyCheckForNormalInsert(sessVars, txn) | ||||||||||||||||||
for i, row := range rows { | ||||||||||||||||||
var err error | ||||||||||||||||||
sizeHintStep := int(sessVars.ShardAllocateStep) | ||||||||||||||||||
|
@@ -108,9 +109,9 @@ func (e *InsertExec) exec(ctx context.Context, rows [][]types.Datum) error { | |||||||||||||||||
if sizeHint > remain { | ||||||||||||||||||
sizeHint = remain | ||||||||||||||||||
} | ||||||||||||||||||
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, table.DupKeyCheckDefault) | ||||||||||||||||||
err = e.addRecordWithAutoIDHint(ctx, row, sizeHint, dupKeyCheck) | ||||||||||||||||||
} else { | ||||||||||||||||||
err = e.addRecord(ctx, row, table.DupKeyCheckDefault) | ||||||||||||||||||
err = e.addRecord(ctx, row, dupKeyCheck) | ||||||||||||||||||
} | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
|
@@ -193,7 +194,7 @@ func (e *InsertValues) prefetchDataCache(ctx context.Context, txn kv.Transaction | |||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// updateDupRow updates a duplicate row to a new row. | ||||||||||||||||||
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment) error { | ||||||||||||||||||
func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Transaction, row toBeCheckedRow, handle kv.Handle, _ []*expression.Assignment, dupKeyCheck table.DupKeyCheckMode) error { | ||||||||||||||||||
oldRow, err := getOldRow(ctx, e.Ctx(), txn, row.t, handle, e.GenExprs) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
|
@@ -204,7 +205,7 @@ func (e *InsertExec) updateDupRow(ctx context.Context, idxInBatch int, txn kv.Tr | |||||||||||||||||
extraCols = e.Ctx().GetSessionVars().CurrInsertBatchExtraCols[idxInBatch] | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch) | ||||||||||||||||||
err = e.doDupRowUpdate(ctx, handle, oldRow, row.row, extraCols, e.OnDuplicate, idxInBatch, dupKeyCheck) | ||||||||||||||||||
if kv.ErrKeyExists.Equal(err) || table.ErrCheckConstraintViolated.Equal(err) { | ||||||||||||||||||
ec := e.Ctx().GetSessionVars().StmtCtx.ErrCtx() | ||||||||||||||||||
return ec.HandleErrorWithAlias(kv.ErrKeyExists, err, err) | ||||||||||||||||||
|
@@ -236,14 +237,42 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D | |||||||||||||||||
e.stats.Prefetch += time.Since(prefetchStart) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// If the current row has some conflicts, the operation will be changed to update. | ||||||||||||||||||
// If this happens, there still may be another index that has a conflict, | ||||||||||||||||||
// so we need to determine DupKeyCheckMode here. | ||||||||||||||||||
updateDupKeyCheck := table.DupKeyCheckInPlace | ||||||||||||||||||
if (txn.IsPessimistic() && !e.IgnoreErr) || txn.IsPipelined() { | ||||||||||||||||||
// - If `txn.Pipelined()`, it means current is using `@@tidb_dml_type="bulk"` to insert rows. | ||||||||||||||||||
// `DupKeyCheckLazy` should be used in "bulk" mode to avoid request storage and improve the performance. | ||||||||||||||||||
// - If `txn.IsPessimistic()`, we can use `DupKeyCheckLazy` to postpone the storage constraints check | ||||||||||||||||||
// to subsequence stages such as lock. | ||||||||||||||||||
lcwangchao marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||
// However, if the current statement is `INSERT IGNORE ... ON DUPLICATE KEY ...`, | ||||||||||||||||||
// `DupKeyCheckInPlace` should be used. | ||||||||||||||||||
// It is because the executor should get the dup-key error immediately and ignore it. | ||||||||||||||||||
// - If the current txn is optimistic, `DupKeyCheckInPlace` is always used | ||||||||||||||||||
// even if `tidb_constraint_check_in_place` is `OFF`. | ||||||||||||||||||
// This is because `tidb_constraint_check_in_place` is only designed for insert cases, see comments in issue: | ||||||||||||||||||
// https://github.com/pingcap/tidb/issues/54492#issuecomment-2229941881 | ||||||||||||||||||
// Though it is still in an insert statement, but it seems some old tests still think it should | ||||||||||||||||||
// check constraints in place, see test: | ||||||||||||||||||
// https://github.com/pingcap/tidb/blob/3117d3fae50bbb5dabcde7b9589f92bfbbda5dc6/pkg/executor/test/writetest/write_test.go#L419-L426 | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There are same corner cases here, just check the test case: tidb/pkg/executor/test/writetest/write_test.go Lines 419 to 426 in 3117d3f
And some comments: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||||||||||||||||||
updateDupKeyCheck = table.DupKeyCheckLazy | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Do not use `updateDupKeyCheck` for `AddRecord` because it is not optimized for insert. | ||||||||||||||||||
// It seems that we can just use `DupKeyCheckSkip` here because all constraints are checked. | ||||||||||||||||||
// But we still use `optimizeDupKeyCheckForNormalInsert` to make the refactor same behavior with the original code. | ||||||||||||||||||
// TODO: just use `DupKeyCheckSkip` here. | ||||||||||||||||||
addRecordDupKeyCheck := optimizeDupKeyCheckForNormalInsert(e.Ctx().GetSessionVars(), txn) | ||||||||||||||||||
|
||||||||||||||||||
for i, r := range toBeCheckedRows { | ||||||||||||||||||
if r.handleKey != nil { | ||||||||||||||||||
handle, err := tablecodec.DecodeRowKey(r.handleKey.newKey) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) | ||||||||||||||||||
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck) | ||||||||||||||||||
if err == nil { | ||||||||||||||||||
continue | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -260,7 +289,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D | |||||||||||||||||
if handle == nil { | ||||||||||||||||||
continue | ||||||||||||||||||
} | ||||||||||||||||||
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate) | ||||||||||||||||||
err = e.updateDupRow(ctx, i, txn, r, handle, e.OnDuplicate, updateDupKeyCheck) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
if kv.IsErrNotFound(err) { | ||||||||||||||||||
// Data index inconsistent? A unique key provide the handle information, but the | ||||||||||||||||||
|
@@ -282,7 +311,7 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D | |||||||||||||||||
// and key-values should be filled back to dupOldRowValues for the further row check, | ||||||||||||||||||
// due to there may be duplicate keys inside the insert statement. | ||||||||||||||||||
if newRows[i] != nil { | ||||||||||||||||||
err := e.addRecord(ctx, newRows[i], table.DupKeyCheckDefault) | ||||||||||||||||||
err := e.addRecord(ctx, newRows[i], addRecordDupKeyCheck) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -294,6 +323,25 @@ func (e *InsertExec) batchUpdateDupRows(ctx context.Context, newRows [][]types.D | |||||||||||||||||
return nil | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// optimizeDupKeyCheckForNormalInsert trys to optimize the DupKeyCheckMode for an insert statement according to the | ||||||||||||||||||
// transaction and system variables. | ||||||||||||||||||
// If the DupKeyCheckMode of the current statement can be optimized, it will return `DupKeyCheckLazy` to avoid the | ||||||||||||||||||
// redundant requests to TiKV, otherwise, `DupKeyCheckInPlace` will be returned. | ||||||||||||||||||
// This method only works for "normal" insert statements, that means the options like "IGNORE" and "ON DUPLICATE KEY" | ||||||||||||||||||
// in a statement are not considerate, and callers should handle the above cases by themselves. | ||||||||||||||||||
func optimizeDupKeyCheckForNormalInsert(vars *variable.SessionVars, txn kv.Transaction) table.DupKeyCheckMode { | ||||||||||||||||||
if !vars.ConstraintCheckInPlace || txn.IsPessimistic() || txn.IsPipelined() { | ||||||||||||||||||
// We can just check duplicated key lazily without keys in storage for the below cases: | ||||||||||||||||||
// - `txn.Pipelined()` is true. | ||||||||||||||||||
// It means the user is using `@@tidb_dml_type="bulk"` to insert rows in bulk mode. | ||||||||||||||||||
// DupKeyCheckLazy should be used to improve the performance. | ||||||||||||||||||
// - The current transaction is pessimistic. The duplicate key check can be postponed to the lock stage. | ||||||||||||||||||
// - The current transaction is optimistic but `tidb_constraint_check_in_place` is set to false. | ||||||||||||||||||
return table.DupKeyCheckLazy | ||||||||||||||||||
} | ||||||||||||||||||
return table.DupKeyCheckInPlace | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
// Next implements the Executor Next interface. | ||||||||||||||||||
func (e *InsertExec) Next(ctx context.Context, req *chunk.Chunk) error { | ||||||||||||||||||
req.Reset() | ||||||||||||||||||
|
@@ -379,7 +427,7 @@ func (e *InsertExec) initEvalBuffer4Dup() { | |||||||||||||||||
|
||||||||||||||||||
// doDupRowUpdate updates the duplicate row. | ||||||||||||||||||
func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRow []types.Datum, newRow []types.Datum, | ||||||||||||||||||
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int) error { | ||||||||||||||||||
extraCols []types.Datum, cols []*expression.Assignment, idxInBatch int, dupKeyMode table.DupKeyCheckMode) error { | ||||||||||||||||||
assignFlag := make([]bool, len(e.Table.WritableCols())) | ||||||||||||||||||
// See http://dev.mysql.com/doc/refman/5.7/en/miscellaneous-functions.html#function_values | ||||||||||||||||||
e.curInsertVals.SetDatums(newRow...) | ||||||||||||||||||
|
@@ -426,7 +474,7 @@ func (e *InsertExec) doDupRowUpdate(ctx context.Context, handle kv.Handle, oldRo | |||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
newData := e.row4Update[:len(oldRow)] | ||||||||||||||||||
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades) | ||||||||||||||||||
_, err := updateRecord(ctx, e.Ctx(), handle, oldRow, newData, assignFlag, e.Table, true, e.memTracker, e.fkChecks, e.fkCascades, dupKeyMode) | ||||||||||||||||||
if err != nil { | ||||||||||||||||||
return err | ||||||||||||||||||
} | ||||||||||||||||||
|
@@ -440,7 +488,7 @@ func (e *InsertExec) setMessage() { | |||||||||||||||||
if e.SelectExec != nil || numRecords > 1 { | ||||||||||||||||||
numWarnings := stmtCtx.WarningCount() | ||||||||||||||||||
var numDuplicates uint64 | ||||||||||||||||||
if stmtCtx.ErrGroupLevel(errctx.ErrGroupDupKey) != errctx.LevelError { | ||||||||||||||||||
if e.IgnoreErr { | ||||||||||||||||||
// if ignoreErr | ||||||||||||||||||
numDuplicates = numRecords - stmtCtx.CopiedRows() | ||||||||||||||||||
} else { | ||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it necessary to wrap L240-L260 to a txn util function like
optimizeDupKeyCheckForNormalInsert
? And duplications from L284 to L297 in update.go could be avoided.