From b78268fa31438804bd28dc51d8911e4f591169c8 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 3 Apr 2018 18:28:20 +0800 Subject: [PATCH 1/5] merge batch get results --- executor/admin.go | 4 ++-- executor/write.go | 2 +- executor/write_test.go | 23 +++++++++++++++++++++++ kv/mock.go | 28 ++++++++++++++++++++++++++++ 4 files changed, 54 insertions(+), 3 deletions(-) diff --git a/executor/admin.go b/executor/admin.go index 7d6e0953289b8..07df2e4f46560 100644 --- a/executor/admin.go +++ b/executor/admin.go @@ -361,7 +361,7 @@ func (e *RecoverIndexExec) batchMarkDup(txn kv.Transaction, rows []recoverRows) distinctFlags[i] = distinct } - values, err := txn.GetSnapshot().BatchGet(e.batchKeys) + values, err := kv.BatchGetValues(txn, e.batchKeys) if err != nil { return errors.Trace(err) } @@ -485,7 +485,7 @@ func (e *CleanupIndexExec) batchGetRecord(txn kv.Transaction) (map[string][]byte for handle := range e.idxValues { e.batchKeys = append(e.batchKeys, e.table.RecordKey(handle)) } - values, err := txn.GetSnapshot().BatchGet(e.batchKeys) + values, err := kv.BatchGetValues(txn, e.batchKeys) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/write.go b/executor/write.go index 1143e931b9f65..ca89503ae51f2 100644 --- a/executor/write.go +++ b/executor/write.go @@ -965,7 +965,7 @@ func batchMarkDupRows(ctx sessionctx.Context, t table.Table, rows [][]types.Datu batchKeys = append(batchKeys, k.key) } } - values, err := ctx.Txn().GetSnapshot().BatchGet(batchKeys) + values, err := kv.BatchGetValues(ctx.Txn(), batchKeys) if err != nil { return nil, errors.Trace(err) } diff --git a/executor/write_test.go b/executor/write_test.go index 147b252fd99b4..0d537a3c73561 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -409,6 +409,29 @@ func (s *testSuite) TestInsertIgnore(c *C) { c.Assert(err, IsNil) r = tk.MustQuery("SHOW WARNINGS") r.Check(testkit.Rows("Warning 1062 Duplicate entry '1' for key 'PRIMARY'")) + + testSQL = `drop table if exists test; +create table test (i int primary key, j int unique); +begin; +insert into test values (1,1); +insert ignore into test values (2,1); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("1 1")) + + testSQL = `drop table if exists test; +create table test (i int primary key, j int unique); +insert into test values (1, 1); +begin; +delete from test where i = 1; +insert ignore into test values (2, 1); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("2 1")) } func (s *testSuite) TestReplace(c *C) { diff --git a/kv/mock.go b/kv/mock.go index 25719ed500e64..4636d37b1044b 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -121,6 +121,34 @@ func NewMockTxn() Transaction { } } +// BatchGetValues gets values in batch. +// The values from buffer in transaction and the values from the storage node are merged together. +func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { + bufferValues := make(map[string][]byte, len(keys)) + for _, key := range keys { + val, err := txn.GetMemBuffer().Get(key) + if IsErrNotFound(err) { + continue + } + if err != nil { + return nil, errors.Trace(err) + } + bufferValues[string(key)] = val + } + storageValues, err := txn.GetSnapshot().BatchGet(keys) + if err != nil { + return nil, errors.Trace(err) + } + for key, val := range bufferValues { + if _, ok := storageValues[string(key)]; ok && len(val) == 0 { + delete(storageValues, string(key)) + } else { + storageValues[string(key)] = val + } + } + return storageValues, nil +} + // mockStorage is used to start a must commit-failed txn. type mockStorage struct { } From beb287c3ea7dc0139b0b25c46c762905bc8bb816 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 3 Apr 2018 18:44:42 +0800 Subject: [PATCH 2/5] add test case for update --- executor/write_test.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/executor/write_test.go b/executor/write_test.go index 0d537a3c73561..e3fdef504140b 100644 --- a/executor/write_test.go +++ b/executor/write_test.go @@ -421,8 +421,7 @@ commit;` r = tk.MustQuery(testSQL) r.Check(testkit.Rows("1 1")) - testSQL = `drop table if exists test; -create table test (i int primary key, j int unique); + testSQL = `delete from test; insert into test values (1, 1); begin; delete from test where i = 1; @@ -432,6 +431,18 @@ commit;` testSQL = `select * from test;` r = tk.MustQuery(testSQL) r.Check(testkit.Rows("2 1")) + + testSQL = `delete from test; +insert into test values (1, 1); +begin; +update test set i = 2, j = 2 where i = 1; +insert ignore into test values (1, 3); +insert ignore into test values (2, 4); +commit;` + tk.MustExec(testSQL) + testSQL = `select * from test order by i;` + r = tk.MustQuery(testSQL) + r.Check(testkit.Rows("1 3", "2 2")) } func (s *testSuite) TestReplace(c *C) { From c099a72f98d232333c6a2160332f22646680ef1f Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 3 Apr 2018 19:12:21 +0800 Subject: [PATCH 3/5] move the code --- kv/mock.go | 28 ---------------------------- kv/txn.go | 28 ++++++++++++++++++++++++++++ 2 files changed, 28 insertions(+), 28 deletions(-) diff --git a/kv/mock.go b/kv/mock.go index 4636d37b1044b..25719ed500e64 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -121,34 +121,6 @@ func NewMockTxn() Transaction { } } -// BatchGetValues gets values in batch. -// The values from buffer in transaction and the values from the storage node are merged together. -func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { - bufferValues := make(map[string][]byte, len(keys)) - for _, key := range keys { - val, err := txn.GetMemBuffer().Get(key) - if IsErrNotFound(err) { - continue - } - if err != nil { - return nil, errors.Trace(err) - } - bufferValues[string(key)] = val - } - storageValues, err := txn.GetSnapshot().BatchGet(keys) - if err != nil { - return nil, errors.Trace(err) - } - for key, val := range bufferValues { - if _, ok := storageValues[string(key)]; ok && len(val) == 0 { - delete(storageValues, string(key)) - } else { - storageValues[string(key)] = val - } - } - return storageValues, nil -} - // mockStorage is used to start a must commit-failed txn. type mockStorage struct { } diff --git a/kv/txn.go b/kv/txn.go index 4c35f65746bdb..fb5ecb61124d3 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -88,3 +88,31 @@ func BackOff(attempts uint) int { time.Sleep(sleep) return int(sleep) } + +// BatchGetValues gets values in batch. +// The values from buffer in transaction and the values from the storage node are merged together. +func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { + bufferValues := make(map[string][]byte, len(keys)) + for _, key := range keys { + val, err := txn.GetMemBuffer().Get(key) + if IsErrNotFound(err) { + continue + } + if err != nil { + return nil, errors.Trace(err) + } + bufferValues[string(key)] = val + } + storageValues, err := txn.GetSnapshot().BatchGet(keys) + if err != nil { + return nil, errors.Trace(err) + } + for key, val := range bufferValues { + if _, ok := storageValues[string(key)]; ok && len(val) == 0 { + delete(storageValues, string(key)) + } else { + storageValues[string(key)] = val + } + } + return storageValues, nil +} From 68b08e3d98f2b38c552e0ca9d1a1fcf2439fea57 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 3 Apr 2018 19:30:51 +0800 Subject: [PATCH 4/5] optimize a little --- kv/txn.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/kv/txn.go b/kv/txn.go index fb5ecb61124d3..4db872f77caf2 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -93,26 +93,26 @@ func BackOff(attempts uint) int { // The values from buffer in transaction and the values from the storage node are merged together. func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { bufferValues := make(map[string][]byte, len(keys)) + shrinkKeys := make([]Key, 0, len(keys)) for _, key := range keys { val, err := txn.GetMemBuffer().Get(key) if IsErrNotFound(err) { + shrinkKeys = append(shrinkKeys, key) continue } if err != nil { return nil, errors.Trace(err) } - bufferValues[string(key)] = val + if len(val) != 0 { + bufferValues[string(key)] = val + } } - storageValues, err := txn.GetSnapshot().BatchGet(keys) + storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys) if err != nil { return nil, errors.Trace(err) } for key, val := range bufferValues { - if _, ok := storageValues[string(key)]; ok && len(val) == 0 { - delete(storageValues, string(key)) - } else { - storageValues[string(key)] = val - } + storageValues[string(key)] = val } return storageValues, nil } From b8b3c4e5be75417ebfdaffe0885dd37dfabfffe6 Mon Sep 17 00:00:00 2001 From: Yu Shuaipeng Date: Tue, 3 Apr 2018 21:54:24 +0800 Subject: [PATCH 5/5] address comments --- kv/txn.go | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/kv/txn.go b/kv/txn.go index 4db872f77caf2..5bea8f384ae94 100644 --- a/kv/txn.go +++ b/kv/txn.go @@ -92,9 +92,12 @@ func BackOff(attempts uint) int { // BatchGetValues gets values in batch. // The values from buffer in transaction and the values from the storage node are merged together. func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { - bufferValues := make(map[string][]byte, len(keys)) + if txn.IsReadOnly() { + return txn.GetSnapshot().BatchGet(keys) + } + bufferValues := make([][]byte, len(keys)) shrinkKeys := make([]Key, 0, len(keys)) - for _, key := range keys { + for i, key := range keys { val, err := txn.GetMemBuffer().Get(key) if IsErrNotFound(err) { shrinkKeys = append(shrinkKeys, key) @@ -104,15 +107,18 @@ func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) { return nil, errors.Trace(err) } if len(val) != 0 { - bufferValues[string(key)] = val + bufferValues[i] = val } } storageValues, err := txn.GetSnapshot().BatchGet(shrinkKeys) if err != nil { return nil, errors.Trace(err) } - for key, val := range bufferValues { - storageValues[string(key)] = val + for i, key := range keys { + if bufferValues[i] == nil { + continue + } + storageValues[string(key)] = bufferValues[i] } return storageValues, nil }