From ce849c3e19b0b69404848b0eee05356dcd81277b Mon Sep 17 00:00:00 2001 From: Zejun Li Date: Fri, 4 Sep 2020 16:50:36 +0800 Subject: [PATCH] *: remove `DirtyDB` and `DirtyTable` to reduce memory usage (#19042) --- executor/builder.go | 6 -- executor/mem_reader.go | 8 +-- executor/union_scan.go | 91 ++++++--------------------- executor/union_scan_test.go | 25 ++++++++ kv/kv.go | 13 ++++ kv/memdb.go | 16 +++++ kv/memdb_arena.go | 12 ++++ kv/memdb_snapshot.go | 111 +++++++++++++++++++++++++++++++++ session/txn.go | 58 +++-------------- sessionctx/context.go | 2 - sessionctx/variable/session.go | 2 - table/tables/tables.go | 4 -- 12 files changed, 204 insertions(+), 144 deletions(-) create mode 100644 kv/memdb_snapshot.go diff --git a/executor/builder.go b/executor/builder.go index abf035542d267..81da93eed9489 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -976,8 +976,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco // goroutines write empty DirtyTable to DirtyDB for this table concurrently. Although the DirtyDB looks // safe for data race in all the cases, the map of golang will throw panic when it's accessed in parallel. // So we lock it when getting dirty table. - physicalTableID := getPhysicalTableID(x.table) - us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID) us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) us.columns = x.columns us.table = x.table @@ -992,8 +990,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco } } } - physicalTableID := getPhysicalTableID(x.table) - us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID) us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) us.columns = x.columns us.table = x.table @@ -1007,8 +1003,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco } } } - physicalTableID := getPhysicalTableID(x.table) - us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID) us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions) us.columns = x.columns us.table = x.table diff --git a/executor/mem_reader.go b/executor/mem_reader.go index 78388d86738e2..56c2ccb309f14 100644 --- a/executor/mem_reader.go +++ b/executor/mem_reader.go @@ -59,7 +59,6 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem kvRanges: kvRanges, desc: us.desc, conditions: us.conditions, - addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()), retFieldTypes: retTypes(us), outputOffset: outputOffset, belowHandleCols: us.belowHandleCols, @@ -194,7 +193,6 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem kvRanges: tblReader.kvRanges, desc: us.desc, conditions: us.conditions, - addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()), retFieldTypes: retTypes(us), colIDs: colIDs, buffer: allocBuf{ @@ -329,10 +327,7 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process return err } for _, rg := range kvRanges { - iter, err := txn.GetMemBuffer().Iter(rg.StartKey, rg.EndKey) - if err != nil { - return err - } + iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey) for ; iter.Valid(); err = iter.Next() { if err != nil { return err @@ -399,7 +394,6 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx table: idxLookUpReader.table.Meta(), kvRanges: kvRanges, desc: idxLookUpReader.desc, - addedRowsLen: us.dirty.addedRows.Len(), retFieldTypes: retTypes(us), outputOffset: outputOffset, belowHandleCols: us.belowHandleCols, diff --git a/executor/union_scan.go b/executor/union_scan.go index 00533dd0d6260..43f316b785cdd 100644 --- a/executor/union_scan.go +++ b/executor/union_scan.go @@ -17,88 +17,23 @@ import ( "context" "fmt" "runtime/trace" - "sync" "github.com/pingcap/parser/model" "github.com/pingcap/tidb/expression" "github.com/pingcap/tidb/kv" plannercore "github.com/pingcap/tidb/planner/core" - "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/chunk" ) -// DirtyDB stores uncommitted write operations for a transaction. -// It is stored and retrieved by context.Value and context.SetValue method. -type DirtyDB struct { - sync.Mutex - - // tables is a map whose key is tableID. - tables map[int64]*DirtyTable -} - -// GetDirtyTable gets the DirtyTable by id from the DirtyDB. -func (udb *DirtyDB) GetDirtyTable(tid int64) *DirtyTable { - // The index join access the tables map parallelly. - // But the map throws panic in this case. So it's locked. - udb.Lock() - dt, ok := udb.tables[tid] - if !ok { - dt = &DirtyTable{ - tid: tid, - addedRows: kv.NewHandleMap(), - deletedRows: kv.NewHandleMap(), - } - udb.tables[tid] = dt - } - udb.Unlock() - return dt -} - -// DirtyTable stores uncommitted write operation for a transaction. -type DirtyTable struct { - tid int64 - // addedRows ... - // the key is handle. - addedRows *kv.HandleMap - deletedRows *kv.HandleMap -} - -// AddRow adds a row to the DirtyDB. -func (dt *DirtyTable) AddRow(handle kv.Handle) { - dt.addedRows.Set(handle, true) -} - -// DeleteRow deletes a row from the DirtyDB. -func (dt *DirtyTable) DeleteRow(handle kv.Handle) { - dt.addedRows.Delete(handle) - dt.deletedRows.Set(handle, true) -} - -// IsEmpty checks whether the table is empty. -func (dt *DirtyTable) IsEmpty() bool { - return dt.addedRows.Len()+dt.deletedRows.Len() == 0 -} - -// GetDirtyDB returns the DirtyDB bind to the context. -func GetDirtyDB(ctx sessionctx.Context) *DirtyDB { - var udb *DirtyDB - x := ctx.GetSessionVars().TxnCtx.DirtyDB - if x == nil { - udb = &DirtyDB{tables: make(map[int64]*DirtyTable)} - ctx.GetSessionVars().TxnCtx.DirtyDB = udb - } else { - udb = x.(*DirtyDB) - } - return udb -} - // UnionScanExec merges the rows from dirty table and the rows from distsql request. type UnionScanExec struct { baseExecutor - dirty *DirtyTable + memBuf kv.MemBuffer + memBufSnap kv.Getter + // usedIndex is the column offsets of the index which Src executor has used. usedIndex []int desc bool @@ -140,6 +75,18 @@ func (us *UnionScanExec) open(ctx context.Context) error { } defer trace.StartRegion(ctx, "UnionScanBuildRows").End() + txn, err := us.ctx.Txn(false) + if err != nil { + return err + } + + mb := txn.GetMemBuffer() + mb.RLock() + defer mb.RUnlock() + + us.memBuf = mb + us.memBufSnap = mb.SnapshotGetter() + // 1. select without virtual columns // 2. build virtual columns and select with virtual columns switch x := reader.(type) { @@ -161,6 +108,8 @@ func (us *UnionScanExec) open(ctx context.Context) error { // Next implements the Executor Next interface. func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error { + us.memBuf.RLock() + defer us.memBuf.RUnlock() req.GrowAndReset(us.maxChunkSize) mutableRow := chunk.MutRowFromTypes(retTypes(us)) for i, batchSize := 0, req.Capacity(); i < batchSize; i++ { @@ -265,10 +214,8 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err if err != nil { return nil, err } - if _, ok := us.dirty.deletedRows.Get(snapshotHandle); ok { - continue - } - if _, ok := us.dirty.addedRows.Get(snapshotHandle); ok { + checkKey := us.table.RecordKey(snapshotHandle) + if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil { // If src handle appears in added rows, it means there is conflict and the transaction will fail to // commit, but for simplicity, we don't handle it here. continue diff --git a/executor/union_scan_test.go b/executor/union_scan_test.go index 61b170fab8269..235fc0e1c6616 100644 --- a/executor/union_scan_test.go +++ b/executor/union_scan_test.go @@ -333,6 +333,31 @@ func (s *testSuite7) TestForUpdateUntouchedIndex(c *C) { tk.MustExec("admin check table t") } +func (s *testSuite7) TestUpdateScanningHandles(c *C) { + tk := testkit.NewTestKit(c, s.store) + tk.MustExec("use test") + tk.MustExec("drop table if exists t;") + tk.MustExec("create table t(a int primary key, b int);") + tk.MustExec("begin") + for i := 2; i < 100000; i++ { + tk.MustExec("insert into t values (?, ?)", i, i) + } + tk.MustExec("commit;") + + tk.MustExec("set tidb_distsql_scan_concurrency = 1;") + tk.MustExec("set tidb_index_lookup_join_concurrency = 1;") + tk.MustExec("set tidb_projection_concurrency=1;") + tk.MustExec("set tidb_init_chunk_size=1;") + tk.MustExec("set tidb_max_chunk_size=32;") + + tk.MustExec("begin") + tk.MustExec("insert into t values (1, 1);") + tk.MustExec("update /*+ INL_JOIN(t1) */ t t1, (select a, b from t) t2 set t1.b = t2.b where t1.a = t2.a + 1000;") + result := tk.MustQuery("select a, a-b from t where a > 1000 and a - b != 1000;") + c.Assert(result.Rows(), HasLen, 0) + tk.MustExec("rollback;") +} + // See https://github.com/pingcap/tidb/issues/19136 func (s *testSuite7) TestForApplyAndUnionScan(c *C) { tk := testkit.NewTestKit(c, s.store) diff --git a/kv/kv.go b/kv/kv.go index 2633e1f9fbe5e..444762644664e 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -173,6 +173,14 @@ type MemBufferIterator interface { type MemBuffer interface { RetrieverMutator + // RLock locks the MemBuffer for shared read. + // In the most case, MemBuffer will only used by single goroutine, + // but it will be read by multiple goroutine when combined with executor.UnionScanExec. + // To avoid race introduced by executor.UnionScanExec, MemBuffer expose read lock for it. + RLock() + // RUnlock unlocks the MemBuffer. + RUnlock() + // GetFlags returns the latest flags associated with key. GetFlags(Key) (KeyFlags, error) // IterWithFlags returns a MemBufferIterator. @@ -202,6 +210,11 @@ type MemBuffer interface { // InspectStage used to inspect the value updates in the given stage. InspectStage(StagingHandle, func(Key, KeyFlags, []byte)) + // SnapshotGetter returns a Getter for a snapshot of MemBuffer. + SnapshotGetter() Getter + // SnapshotIter returns a Iterator for a snapshot of MemBuffer. + SnapshotIter(k, upperbound Key) Iterator + // Size returns sum of keys and values length. Size() int // Len returns the number of entries in the DB. diff --git a/kv/memdb.go b/kv/memdb.go index b9fa2c3968d39..2f40d8c402cb5 100644 --- a/kv/memdb.go +++ b/kv/memdb.go @@ -17,6 +17,7 @@ import ( "bytes" "context" "reflect" + "sync" "sync/atomic" "unsafe" ) @@ -120,6 +121,9 @@ var tombstone = []byte{} // When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared. // If there are persistent flags associated with key, we will keep this key in node without value. type memdb struct { + // This RWMutex only used to ensure memdbSnapGetter.Get will not race with + // concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags. + sync.RWMutex root memdbArenaAddr allocator nodeAllocator vlog memdbVlog @@ -145,6 +149,9 @@ func newMemDB() *memdb { } func (db *memdb) Staging() StagingHandle { + db.Lock() + defer db.Unlock() + db.stages = append(db.stages, db.vlog.checkpoint()) return StagingHandle(len(db.stages)) } @@ -155,6 +162,9 @@ func (db *memdb) Release(h StagingHandle) { // Use panic to make debug easier. panic("cannot release staging buffer") } + + db.Lock() + defer db.Unlock() if int(h) == 1 { tail := db.vlog.checkpoint() if !db.stages[0].isSamePosition(&tail) { @@ -174,6 +184,8 @@ func (db *memdb) Cleanup(h StagingHandle) { panic("cannot cleanup staging buffer") } + db.Lock() + defer db.Unlock() cp := &db.stages[int(h)-1] if !db.vlogInvalid { db.vlog.revertToCheckpoint(db, cp) @@ -276,6 +288,10 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error { return ErrEntryTooLarge.GenWithStackByArgs(db.entrySizeLimit, size) } } + + db.Lock() + defer db.Unlock() + if len(db.stages) == 0 { db.dirty = true } diff --git a/kv/memdb_arena.go b/kv/memdb_arena.go index 2c2e8af4a262a..804c1cd91060e 100644 --- a/kv/memdb_arena.go +++ b/kv/memdb_arena.go @@ -275,6 +275,18 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte { return block[valueOff:lenOff] } +func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *memdbCheckpoint) ([]byte, bool) { + for !addr.isNull() { + if !l.canModify(snap, addr) { + return l.getValue(addr), true + } + var hdr memdbVlogHdr + hdr.load(l.blocks[addr.idx].buf[addr.off-memdbVlogHdrSize:]) + addr = hdr.oldValue + } + return nil, false +} + func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) { cursor := l.checkpoint() for !cp.isSamePosition(&cursor) { diff --git a/kv/memdb_snapshot.go b/kv/memdb_snapshot.go new file mode 100644 index 0000000000000..96ae69ad12431 --- /dev/null +++ b/kv/memdb_snapshot.go @@ -0,0 +1,111 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "context" + +func (db *memdb) SnapshotGetter() Getter { + return &memdbSnapGetter{ + db: db, + cp: db.getSnapshot(), + } +} + +func (db *memdb) SnapshotIter(start, end Key) Iterator { + it := &memdbSnapIter{ + memdbIterator: &memdbIterator{ + db: db, + start: start, + end: end, + }, + cp: db.getSnapshot(), + } + it.init() + return it +} + +func (db *memdb) getSnapshot() memdbCheckpoint { + if len(db.stages) > 0 { + return db.stages[0] + } + return db.vlog.checkpoint() +} + +type memdbSnapGetter struct { + db *memdb + cp memdbCheckpoint +} + +func (snap *memdbSnapGetter) Get(_ context.Context, key Key) ([]byte, error) { + x := snap.db.traverse(key, false) + if x.isNull() { + return nil, ErrNotExist + } + if x.vptr.isNull() { + // A flag only key, act as value not exists + return nil, ErrNotExist + } + v, ok := snap.db.vlog.getSnapshotValue(x.vptr, &snap.cp) + if !ok { + return nil, ErrNotExist + } + return v, nil +} + +type memdbSnapIter struct { + *memdbIterator + value []byte + cp memdbCheckpoint +} + +func (i *memdbSnapIter) Value() []byte { + return i.value +} + +func (i *memdbSnapIter) Next() error { + i.value = nil + for i.Valid() { + if err := i.memdbIterator.Next(); err != nil { + return err + } + if i.setValue() { + return nil + } + } + return nil +} + +func (i *memdbSnapIter) setValue() bool { + if !i.Valid() { + return false + } + if v, ok := i.db.vlog.getSnapshotValue(i.curr.vptr, &i.cp); ok { + i.value = v + return true + } + return false +} + +func (i *memdbSnapIter) init() { + if len(i.start) == 0 { + i.seekToFirst() + } else { + i.seek(i.start) + } + + if !i.setValue() { + err := i.Next() + _ = err // memdbIterator will never fail + } +} diff --git a/session/txn.go b/session/txn.go index d2a072b498a9a..9858d134cf8e7 100644 --- a/session/txn.go +++ b/session/txn.go @@ -24,13 +24,12 @@ import ( "github.com/opentracing/opentracing-go" "github.com/pingcap/errors" "github.com/pingcap/failpoint" + "github.com/pingcap/parser/terror" "github.com/pingcap/tidb/config" - "github.com/pingcap/tidb/executor" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/sessionctx" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/store/tikv/oracle" - "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tipb/go-binlog" @@ -52,7 +51,6 @@ type TxnState struct { initCnt int stagingHandle kv.StagingHandle mutations map[int64]*binlog.TableMutation - dirtyTableOP []dirtyTableOperation } func (st *TxnState) init() { @@ -134,9 +132,6 @@ func (st *TxnState) GoString() string { } else if st.Valid() { s.WriteString("state=valid") fmt.Fprintf(&s, ", txnStartTS=%d", st.Transaction.StartTS()) - if len(st.dirtyTableOP) > 0 { - fmt.Fprintf(&s, ", len(dirtyTable)=%d, %#v", len(st.dirtyTableOP), st.dirtyTableOP) - } if len(st.mutations) > 0 { fmt.Fprintf(&s, ", len(mutations)=%d, %#v", len(st.mutations), st.mutations) } @@ -187,14 +182,6 @@ func (st *TxnState) changeToInvalid() { st.txnFuture = nil } -// dirtyTableOperation represents an operation to dirtyTable, we log the operation -// first and apply the operation log when statement commit. -type dirtyTableOperation struct { - kind int - tid int64 - handle kv.Handle -} - var hasMockAutoIncIDRetry = int64(0) func enableMockAutoIncIDRetry() { @@ -224,7 +211,7 @@ func ResetMockAutoRandIDRetryCount(failTimes int64) { // Commit overrides the Transaction interface. func (st *TxnState) Commit(ctx context.Context) error { defer st.reset() - if len(st.mutations) != 0 || len(st.dirtyTableOP) != 0 || st.countHint() != 0 { + if len(st.mutations) != 0 || st.countHint() != 0 { logutil.BgLogger().Error("the code should never run here", zap.String("TxnState", st.GoString()), zap.Int("staging handler", int(st.stagingHandle)), @@ -274,18 +261,6 @@ func (st *TxnState) cleanup() { for key := range st.mutations { delete(st.mutations, key) } - if st.dirtyTableOP != nil { - empty := dirtyTableOperation{} - for i := 0; i < len(st.dirtyTableOP); i++ { - st.dirtyTableOP[i] = empty - } - if len(st.dirtyTableOP) > 256 { - // Reduce memory footprint for the large transaction. - st.dirtyTableOP = nil - } else { - st.dirtyTableOP = st.dirtyTableOP[:0] - } - } } // KeysNeedToLock returns the keys need to be locked. @@ -347,16 +322,6 @@ func mergeToMutation(m1, m2 *binlog.TableMutation) { m1.Sequence = append(m1.Sequence, m2.Sequence...) } -func mergeToDirtyDB(dirtyDB *executor.DirtyDB, op dirtyTableOperation) { - dt := dirtyDB.GetDirtyTable(op.tid) - switch op.kind { - case table.DirtyTableAddRow: - dt.AddRow(op.handle) - case table.DirtyTableDeleteRow: - dt.DeleteRow(op.handle) - } -} - type txnFailFuture struct{} func (txnFailFuture) Wait() (uint64, error) { @@ -406,11 +371,13 @@ func (s *session) getTxnFuture(ctx context.Context) *txnFuture { // HasDirtyContent checks whether there's dirty update on the given table. // Put this function here is to avoid cycle import. func (s *session) HasDirtyContent(tid int64) bool { - x := s.GetSessionVars().TxnCtx.DirtyDB - if x == nil { + if s.txn.Transaction == nil { return false } - return !x.(*executor.DirtyDB).GetDirtyTable(tid).IsEmpty() + seekKey := tablecodec.EncodeTablePrefix(tid) + it, err := s.txn.GetMemBuffer().Iter(seekKey, nil) + terror.Log(err) + return it.Valid() && bytes.HasPrefix(it.Key(), seekKey) } // StmtCommit implements the sessionctx.Context interface. @@ -427,13 +394,6 @@ func (s *session) StmtCommit() { mutation := getBinlogMutation(s, tableID) mergeToMutation(mutation, delta) } - - if len(st.dirtyTableOP) > 0 { - dirtyDB := executor.GetDirtyDB(s) - for _, op := range st.dirtyTableOP { - mergeToDirtyDB(dirtyDB, op) - } - } } // StmtRollback implements the sessionctx.Context interface. @@ -449,7 +409,3 @@ func (s *session) StmtGetMutation(tableID int64) *binlog.TableMutation { } return st.mutations[tableID] } - -func (s *session) StmtAddDirtyTableOP(op int, tid int64, handle kv.Handle) { - s.txn.dirtyTableOP = append(s.txn.dirtyTableOP, dirtyTableOperation{op, tid, handle}) -} diff --git a/sessionctx/context.go b/sessionctx/context.go index c6e1c895fd38c..86a0f9b662296 100644 --- a/sessionctx/context.go +++ b/sessionctx/context.go @@ -82,8 +82,6 @@ type Context interface { StmtRollback() // StmtGetMutation gets the binlog mutation for current statement. StmtGetMutation(int64) *binlog.TableMutation - // StmtAddDirtyTableOP adds the dirty table operation for current statement. - StmtAddDirtyTableOP(op int, physicalID int64, handle kv.Handle) // DDLOwnerChecker returns owner.DDLOwnerChecker. DDLOwnerChecker() owner.DDLOwnerChecker // AddTableLock adds table lock to the session lock map. diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index 81f90b6449310..21e58944d203c 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -134,7 +134,6 @@ type stmtFuture struct { type TransactionContext struct { forUpdateTS uint64 stmtFuture oracle.Future - DirtyDB interface{} Binlog interface{} InfoSchema interface{} History interface{} @@ -254,7 +253,6 @@ func (tc *TransactionContext) SetPessimisticLockCache(key kv.Key, val []byte) { // Cleanup clears up transaction info that no longer use. func (tc *TransactionContext) Cleanup() { // tc.InfoSchema = nil; we cannot do it now, because some operation like handleFieldList depend on this. - tc.DirtyDB = nil tc.Binlog = nil tc.History = nil tc.TableDeltaMap = nil diff --git a/table/tables/tables.go b/table/tables/tables.go index d9cd1329f163e..22a363744c962 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -416,8 +416,6 @@ func (t *TableCommon) UpdateRecord(ctx context.Context, sctx sessionctx.Context, return err } memBuffer.Release(sh) - sctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h) - sctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, h) if shouldWriteBinlog(sctx) { if !t.meta.PKIsHandle { binlogColIDs = append(binlogColIDs, model.ExtraHandleID) @@ -763,7 +761,6 @@ func (t *TableCommon) AddRecord(sctx sessionctx.Context, r []types.Datum, opts . } memBuffer.Release(sh) - sctx.StmtAddDirtyTableOP(table.DirtyTableAddRow, t.physicalTableID, recordID) if shouldWriteBinlog(sctx) { // For insert, TiDB and Binlog can use same row and schema. @@ -978,7 +975,6 @@ func (t *TableCommon) RemoveRecord(ctx sessionctx.Context, h kv.Handle, r []type return err } - ctx.StmtAddDirtyTableOP(table.DirtyTableDeleteRow, t.physicalTableID, h) if shouldWriteBinlog(ctx) { cols := t.Cols() colIDs := make([]int64, 0, len(cols)+1)