From c46b894f00b22a883db7cd153272c54ae65269c0 Mon Sep 17 00:00:00 2001 From: Zejun Li Date: Tue, 18 Aug 2020 19:50:57 +0800 Subject: [PATCH] executor: use before statement snapshot for UnionScan --- executor/mem_reader.go | 2 +- executor/union_scan_test.go | 25 +++++++++++++++++++++++++ kv/kv.go | 2 ++ kv/mock.go | 4 ++++ session/txn.go | 5 +++++ store/tikv/txn.go | 4 ++++ 6 files changed, 41 insertions(+), 1 deletion(-) diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 0a696ffa694bd..c9752427b0cea 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -308,7 +308,7 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process return err } for _, rg := range kvRanges { - iter, err := txn.GetMemBuffer().Iter(rg.StartKey, rg.EndKey) + iter, err := txn.GetMemBufferSnapshot().Iter(rg.StartKey, rg.EndKey) if err != nil { return err } diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 6cf82482d4baa..059e44bff0926 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -328,3 +328,28 @@ func (s *testSuite7) TestForUpdateUntouchedIndex(c *C) { tk.MustExec("commit") tk.MustExec("admin check table t") } + +func (s *testSuite7) TestUpdateScanningHandles(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int primary key, b int);") + tk.MustExec("begin") + for i := 2; i < 100000; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("commit;") + + tk.MustExec("set tidb_distsql_scan_concurrency = 1;") + tk.MustExec("set tidb_index_lookup_join_concurrency = 1;") + tk.MustExec("set tidb_projection_concurrency=1;") + tk.MustExec("set tidb_init_chunk_size=1;") + tk.MustExec("set tidb_max_chunk_size=32;") + + tk.MustExec("begin") + tk.MustExec("insert into t values (1, 1);") + tk.MustExec("update /*+ INL_JOIN(t1) */ t t1, (select a, b from t) t2 set t1.b = t2.b where t1.a = t2.a + 1000;") + result := tk.MustQuery("select a, a-b from t where a > 1000 and a - b != 1000;") + c.Assert(result.Rows(), HasLen, 0) + tk.MustExec("rollback;") +} diff --git a/kv/kv.go b/kv/kv.go index 07af8ec6903d5..3b814a8733096 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -202,6 +202,8 @@ type Transaction interface { Valid() bool // GetMemBuffer return the MemBuffer binding to this transaction. GetMemBuffer() MemBuffer + // GetMemBufferSnapshot is used to return a snapshot of MemBuffer without any statement modify. + GetMemBufferSnapshot() MemBuffer // GetSnapshot returns the Snapshot binding to this transaction. GetSnapshot() Snapshot // SetVars sets variables to the transaction. diff --git a/kv/mock.go b/kv/mock.go index ad4457ecdcc60..7b65b525deb8f 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -101,6 +101,10 @@ func (t *mockTxn) GetMemBuffer() MemBuffer { return nil } +func (t *mockTxn) GetMemBufferSnapshot() MemBuffer { + return nil +} + func (t *mockTxn) GetSnapshot() Snapshot { return nil } diff --git a/session/txn.go b/session/txn.go index e068d921e6da8..7486525bdc182 100644 --- a/session/txn.go +++ b/session/txn.go @@ -315,6 +315,11 @@ func (st *TxnState) GetMemBuffer() kv.MemBuffer { return kv.NewBufferStoreFrom(st.Transaction.GetMemBuffer(), st.stmtBuf) } +// GetMemBufferSnapshot overrides the Transaction interface. +func (st *TxnState) GetMemBufferSnapshot() kv.MemBuffer { + return st.Transaction.GetMemBuffer() +} + // BatchGet overrides the Transaction interface. func (st *TxnState) BatchGet(ctx context.Context, keys []kv.Key) (map[string][]byte, error) { bufferValues := make([][]byte, len(keys)) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 70fa2494ae378..0b6f301270b6b 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -538,6 +538,10 @@ func (txn *tikvTxn) GetMemBuffer() kv.MemBuffer { return txn.us.GetMemBuffer() } +func (txn *tikvTxn) GetMemBufferSnapshot() kv.MemBuffer { + panic("unsupported operation") +} + func (txn *tikvTxn) GetSnapshot() kv.Snapshot { return txn.snapshot }