Skip to content

Commit

Permalink
session: return an error when a non-transactional DML partially fails (
Browse files Browse the repository at this point in the history
  • Loading branch information
ekexium authored May 10, 2022
1 parent 1d2a0b9 commit 45415f4
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 42 deletions.
1 change: 1 addition & 0 deletions errno/errcode.go
Original file line number Diff line number Diff line change
Expand Up @@ -1021,6 +1021,7 @@ const (
ErrInconsistentIndexedValue = 8140
ErrAssertionFailed = 8141
ErrInstanceScope = 8142
ErrNonTransactionalJobFailure = 8143

// Error codes used by TiDB ddl package
ErrUnsupportedDDLOperation = 8200
Expand Down
1 change: 1 addition & 0 deletions errno/errname.go
Original file line number Diff line number Diff line change
Expand Up @@ -1017,6 +1017,7 @@ var MySQLErrName = map[uint16]*mysql.ErrMessage{
ErrInconsistentIndexedValue: mysql.Message("writing inconsistent data in table: %s, index: %s, col: %s, indexed-value:{%s} != record-value:{%s}", []int{3, 4}),
ErrAssertionFailed: mysql.Message("assertion failed: key: %s, assertion: %s, start_ts: %v, existing start ts: %v, existing commit ts: %v", []int{0}),
ErrInstanceScope: mysql.Message("modifying %s will require SET GLOBAL in a future version of TiDB", nil),
ErrNonTransactionalJobFailure: mysql.Message("non-transactional job failed, job id: %d, total jobs: %d. job range: [%s, %s], job sql: %s, err: %v", []int{2, 3, 4}),

ErrWarnOptimizerHintInvalidInteger: mysql.Message("integer value is out of range in '%s'", nil),
ErrWarnOptimizerHintUnsupportedHint: mysql.Message("Optimizer hint %s is not supported by TiDB and is ignored", nil),
Expand Down
5 changes: 5 additions & 0 deletions errors.toml
Original file line number Diff line number Diff line change
Expand Up @@ -2211,6 +2211,11 @@ error = '''
[%d] can not retry select for update statement
'''

["session:8143"]
error = '''
non-transactional job failed, job id: %d, total jobs: %d. job range: [%s, %s], job sql: %s, err: %v
'''

["structure:8217"]
error = '''
invalid encoded hash key flag
Expand Down
64 changes: 25 additions & 39 deletions session/nontransactional.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/domain"
"github.com/pingcap/tidb/errno"
"github.com/pingcap/tidb/executor"
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/parser"
Expand All @@ -37,12 +38,17 @@ import (
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/collate"
"github.com/pingcap/tidb/util/dbterror"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/mathutil"
"github.com/pingcap/tidb/util/memory"
"github.com/pingcap/tidb/util/sqlexec"
"go.uber.org/zap"
)

// ErrNonTransactionalJobFailure is the error when a non-transactional job fails. The error is returned and following jobs are canceled.
var ErrNonTransactionalJobFailure = dbterror.ClassSession.NewStd(errno.ErrNonTransactionalJobFailure)

// job: handle keys in [start, end]
type job struct {
start types.Datum
Expand Down Expand Up @@ -74,7 +80,7 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
if err != nil {
return nil, err
}
if err := checkConstraint(ctx, stmt, se); err != nil {
if err := checkConstraint(stmt, se); err != nil {
return nil, err
}
metrics.NonTransactionalDeleteCount.Inc()
Expand Down Expand Up @@ -106,14 +112,14 @@ func HandleNonTransactionalDelete(ctx context.Context, stmt *ast.NonTransactiona
return buildExecuteResults(ctx, jobs, se.GetSessionVars().BatchSize.MaxChunkSize, se.GetSessionVars().EnableRedactLog)
}

func checkConstraint(ctx context.Context, stmt *ast.NonTransactionalDeleteStmt, se Session) error {
func checkConstraint(stmt *ast.NonTransactionalDeleteStmt, se Session) error {
sessVars := se.GetSessionVars()
if !(sessVars.IsAutocommit() && !sessVars.InTxn()) {
return errors.Errorf("non-transactional statement can only run in auto-commit mode. auto-commit:%v, inTxn:%v",
return errors.Errorf("non-transactional DML can only run in auto-commit mode. auto-commit:%v, inTxn:%v",
se.GetSessionVars().IsAutocommit(), se.GetSessionVars().InTxn())
}
if config.GetGlobalConfig().EnableBatchDML && sessVars.DMLBatchSize > 0 && (sessVars.BatchDelete || sessVars.BatchInsert) {
return errors.Errorf("can't run non-transactional statement with batch dml")
return errors.Errorf("can't run non-transactional DML with batch-dml")
}

if sessVars.ReadConsistency.IsWeak() {
Expand Down Expand Up @@ -199,6 +205,9 @@ func splitDeleteWorker(ctx context.Context, jobs []job, stmt *ast.NonTransaction
if i == 0 && jobs[i].err != nil {
return nil, errors.Annotate(jobs[i].err, "Early return: error occurred in the first job. All jobs are canceled")
}
if jobs[i].err != nil && !se.GetSessionVars().NonTransactionalIgnoreError {
return nil, ErrNonTransactionalJobFailure.GenWithStackByArgs(jobs[i].jobID, len(jobs), jobs[i].start.String(), jobs[i].end.String(), jobs[i].String(se.GetSessionVars().EnableRedactLog), jobs[i].err.Error())
}
}
return splitStmts, nil
}
Expand Down Expand Up @@ -271,7 +280,8 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
format.RestoreBracketAroundBinaryOperation|
format.RestoreStringWithoutCharset, &sb))
if err != nil {
job.err = errors.Annotate(err, "Failed to restore delete statement")
logutil.Logger(ctx).Error("Non-transactional delete, failed to restore the delete statement", zap.Error(err))
job.err = errors.New("Failed to restore the delete statement, probably because of unsupported type of the shard column")
return ""
}
deleteSQL := sb.String()
Expand All @@ -294,8 +304,10 @@ func doOneJob(ctx context.Context, job *job, totalJobCount int, options statemen
rs, err := se.ExecuteStmt(ctx, options.stmt.DeleteStmt)

// collect errors
failpoint.Inject("splitDeleteError", func(_ failpoint.Value) {
err = errors.New("injected split delete error")
failpoint.Inject("splitDeleteError", func(val failpoint.Value) {
if val.(bool) {
err = errors.New("injected split delete error")
}
})
if err != nil {
logutil.Logger(ctx).Error("Non-transactional delete SQL failed", zap.String("job", deleteSQLInLog), zap.Error(err), zap.Int("jobID", job.jobID), zap.Int("jobSize", job.jobSize))
Expand Down Expand Up @@ -580,44 +592,18 @@ func buildExecuteResults(ctx context.Context, jobs []job, maxChunkSize int, reda
MaxChunkSize: maxChunkSize,
}, nil
}
resultFields := []*ast.ResultField{
{
Column: &model.ColumnInfo{
FieldType: *types.NewFieldType(mysql.TypeString),
},
ColumnAsName: model.NewCIStr("job"),
},
{
Column: &model.ColumnInfo{
FieldType: *types.NewFieldType(mysql.TypeString),
},
ColumnAsName: model.NewCIStr("sql"),
},
{
Column: &model.ColumnInfo{
FieldType: *types.NewFieldType(mysql.TypeString),
},
ColumnAsName: model.NewCIStr("error"),
},
}

rows := make([][]interface{}, 0, len(failedJobs))
// ignoreError must be set.
var sb strings.Builder
for _, job := range failedJobs {
row := make([]interface{}, 2)
row[0] = job.String(false)
row[1] = job.err.Error()
rows = append(rows, row)
sb.WriteString(fmt.Sprintf("%s, %s;\n", job.String(redactLog), job.err.Error()))
}

errStr := sb.String()
// log errors here in case the output is too long. There can be thousands of errors.
logutil.Logger(ctx).Warn("Non-transactional delete failed",
zap.Int("num_failed_jobs", len(failedJobs)), zap.String("failed_jobs", sb.String()))
logutil.Logger(ctx).Error("Non-transactional delete failed",
zap.Int("num_failed_jobs", len(failedJobs)), zap.String("failed_jobs", errStr))

return &sqlexec.SimpleRecordSet{
ResultFields: resultFields,
Rows: rows,
MaxChunkSize: maxChunkSize,
}, nil
return nil, errors.New(fmt.Sprintf("%d/%d jobs failed in the non-transactional DML: %s, ...(more in logs)",
len(failedJobs), len(jobs), errStr[:mathutil.Min(500, len(errStr)-1)]))
}
25 changes: 22 additions & 3 deletions session/nontransactional_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,29 @@ func TestNonTransactionalDeleteErrorMessage(t *testing.T) {
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values ('%d', %d)", i, i*2))
}
failpoint.Enable("github.com/pingcap/tidb/session/splitDeleteError", `return`)
tk.MustExec("set @@tidb_nontransactional_ignore_error=1")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/splitDeleteError", `return(true)`))
defer failpoint.Disable("github.com/pingcap/tidb/session/splitDeleteError")
err := tk.ExecToErr("split on a limit 3 delete from t")
require.EqualError(t, err, "Early return: error occurred in the first job. All jobs are canceled: injected split delete error")

tk.MustExec("truncate t")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values ('%d', %d)", i, i*2))
}
tk.MustExec("set @@tidb_nontransactional_ignore_error=1")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/splitDeleteError", `1*return(false)->return(true)`))
err = tk.ExecToErr("split on a limit 3 delete from t")
require.ErrorContains(t, err, "33/34 jobs failed in the non-transactional DML: job id: 2, estimated size: 3, sql: DELETE FROM `test`.`t` WHERE `a` BETWEEN 3 AND 5, injected split delete error;\n")

tk.MustExec("truncate t")
for i := 0; i < 100; i++ {
tk.MustExec(fmt.Sprintf("insert into t values ('%d', %d)", i, i*2))
}
tk.MustExec("set @@tidb_nontransactional_ignore_error=0")
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/session/splitDeleteError", `1*return(false)->return(true)`))
err = tk.ExecToErr("split on a limit 3 delete from t")
require.EqualError(t, err, "[session:8143]non-transactional job failed, job id: 2, total jobs: 34. job range: [KindInt64 3, KindInt64 5], job sql: job id: 2, estimated size: 3, sql: DELETE FROM `test`.`t` WHERE `a` BETWEEN 3 AND 5, err: injected split delete error")
}

func TestNonTransactionalDeleteSplitOnTiDBRowID(t *testing.T) {
Expand Down Expand Up @@ -408,9 +427,9 @@ func TestNonTransactionalDeleteShardOnUnsupportedTypes(t *testing.T) {
defer clean()
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t(a set('e0', 'e1', 'e2'), b int, key(a))")
tk.MustExec("create table t(a set('e0', 'e1', 'e2'), b int, primary key(a) clustered, key(b))")
tk.MustExec("insert into t values ('e2,e0', 3)")
err := tk.ExecToErr("split on a limit 1 delete from t")
err := tk.ExecToErr("split limit 1 delete from t where a = 'e0,e2'")
require.Error(t, err)
tk.MustQuery("select count(*) from t").Check(testkit.Rows("1"))

Expand Down
3 changes: 3 additions & 0 deletions sessionctx/variable/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1031,6 +1031,9 @@ type SessionVars struct {
RcReadCheckTS bool
// RemoveOrderbyInSubquery indicates whether to remove ORDER BY in subquery.
RemoveOrderbyInSubquery bool
// NonTransactionalIgnoreError indicates whether to ignore error in non-transactional statements.
// When set to false, returns immediately when it meets the first error.
NonTransactionalIgnoreError bool

// MaxAllowedPacket indicates the maximum size of a packet for the MySQL protocol.
MaxAllowedPacket uint64
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/sysvar.go
Original file line number Diff line number Diff line change
Expand Up @@ -1438,6 +1438,12 @@ var defaultSysVars = []*SysVar{
s.MemQuotaQuery = TidbOptInt64(val, DefTiDBMemQuotaQuery)
return nil
}},
{Scope: ScopeGlobal | ScopeSession, Name: TiDBNonTransactionalIgnoreError, Value: BoolToOnOff(DefTiDBBatchDMLIgnoreError), Type: TypeBool,
SetSession: func(s *SessionVars, val string) error {
s.NonTransactionalIgnoreError = TiDBOptOn(val)
return nil
},
},
}

// FeedbackProbability points to the FeedbackProbability in statistics package.
Expand Down
6 changes: 6 additions & 0 deletions sessionctx/variable/tidb_vars.go
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,11 @@ const (

// TiDBQueryLogMaxLen is used to set the max length of the query in the log.
TiDBQueryLogMaxLen = "tidb_query_log_max_len"

// TiDBNonTransactionalIgnoreError is used to ignore error in non-transactional DMLs.
// When set to false, a non-transactional DML returns when it meets the first error.
// When set to true, a non-transactional DML finishes all batches even if errors are met in some batches.
TiDBNonTransactionalIgnoreError = "tidb_nontransactional_ignore_error"
)

// TiDB vars that have only global scope
Expand Down Expand Up @@ -841,6 +846,7 @@ const (
DefMaxAllowedPacket uint64 = 67108864
DefTiDBMemQuotaQuery = 1073741824 // 1GB
DefTiDBQueryLogMaxLen = 4096
DefTiDBBatchDMLIgnoreError = false
)

// Process global variables.
Expand Down

0 comments on commit 45415f4

Please sign in to comment.