diff --git a/executor/admin.go b/executor/admin.go index 5e3243f7182ec..20483347d371c 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -361,7 +361,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) distinctFlags[i] = distinct } - values, err := txn.GetSnapshot().BatchGet(e.batchKeys) + values, err := kv.BatchGetValues(txn, e.batchKeys) if err != nil { return errors.Trace(err) } @@ -485,7 +485,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte for handle := range e.idxValues { e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle)) } - values, err := txn.GetSnapshot().BatchGet(e.batchKeys) + values, err := kv.BatchGetValues(txn, e.batchKeys) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/write.go b/executor/write.go index 1f8cb2f8da01f..019b469704e37 100644 --- a/executor/write.go +++ b/executor/write.go @@ -965,7 +965,7 @@ func batchMarkDupRows(ctx sessionctx.Context, t table.Table, rows [][]types.Datu batchKeys = append(batchKeys, k.key) } } - values, err := ctx.Txn().GetSnapshot().BatchGet(batchKeys) + values, err := kv.BatchGetValues(ctx.Txn(), batchKeys) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/write_test.go b/executor/write_test.go index 147b252fd99b4..e3fdef504140b 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -409,6 +409,40 @@ func (s *testSuite) TestInsertIgnore(c *C) { c.Assert(err, IsNil) r = tk.MustQuery("SHOW WARNINGS") r.Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'")) + + testSQL = `drop table if exists test; +create table test (i int primary key, j int unique); +begin; +insert into test values (1,1); +insert ignore into test values (2,1); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("1 1")) + + testSQL = `delete from test; +insert into test values (1, 1); +begin; +delete from test where i = 1; +insert ignore into test values (2, 1); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("2 1")) + + testSQL = `delete from test; +insert into test values (1, 1); +begin; +update test set i = 2, j = 2 where i = 1; +insert ignore into test values (1, 3); +insert ignore into test values (2, 4); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test order by i;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("1 3", "2 2")) } func (s *testSuite) TestReplace(c *C) { diff --git a/kv/txn.go b/kv/txn.go index 4c35f65746bdb..5bea8f384ae94 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -88,3 +88,37 @@ func BackOff(attempts uint) int { time.Sleep(sleep) return int(sleep) } + +// BatchGetValues gets values in batch. +// The values from buffer in transaction and the values from the storage node are merged together. +func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { + if txn.IsReadOnly() { + return txn.GetSnapshot().BatchGet(keys) + } + bufferValues := make([][]byte, len(keys)) + shrinkKeys := make([]Key, 0, len(keys)) + for i, key := range keys { + val, err := txn.GetMemBuffer().Get(key) + if IsErrNotFound(err) { + shrinkKeys = append(shrinkKeys, key) + continue + } + if err != nil { + return nil, errors.Trace(err) + } + if len(val) != 0 { + bufferValues[i] = val + } + } + storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys) + if err != nil { + return nil, errors.Trace(err) + } + for i, key := range keys { + if bufferValues[i] == nil { + continue + } + storageValues[string(key)] = bufferValues[i] + } + return storageValues, nil +}