Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

*: remove DirtyDB and DirtyTable to reduce memory usage #19042

Merged
merged 6 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
// goroutines write empty DirtyTable to DirtyDB for this table concurrently. Although the DirtyDB looks
// safe for data race in all the cases, the map of golang will throw panic when it's accessed in parallel.
// So we lock it when getting dirty table.
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand All @@ -992,8 +990,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand All @@ -1007,8 +1003,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down
8 changes: 1 addition & 7 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem
kvRanges: kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
Expand Down Expand Up @@ -194,7 +193,6 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
kvRanges: tblReader.kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
colIDs: colIDs,
buffer: allocBuf{
Expand Down Expand Up @@ -329,10 +327,7 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process
return err
}
for _, rg := range kvRanges {
iter, err := txn.GetMemBuffer().Iter(rg.StartKey, rg.EndKey)
if err != nil {
return err
}
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
for ; iter.Valid(); err = iter.Next() {
if err != nil {
return err
Expand Down Expand Up @@ -399,7 +394,6 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx
table: idxLookUpReader.table.Meta(),
kvRanges: kvRanges,
desc: idxLookUpReader.desc,
addedRowsLen: us.dirty.addedRows.Len(),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
Expand Down
91 changes: 19 additions & 72 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,23 @@ import (
"context"
"fmt"
"runtime/trace"
"sync"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

// DirtyDB stores uncommitted write operations for a transaction.
// It is stored and retrieved by context.Value and context.SetValue method.
type DirtyDB struct {
sync.Mutex

// tables is a map whose key is tableID.
tables map[int64]*DirtyTable
}

// GetDirtyTable gets the DirtyTable by id from the DirtyDB.
func (udb *DirtyDB) GetDirtyTable(tid int64) *DirtyTable {
// The index join access the tables map parallelly.
// But the map throws panic in this case. So it's locked.
udb.Lock()
dt, ok := udb.tables[tid]
if !ok {
dt = &DirtyTable{
tid: tid,
addedRows: kv.NewHandleMap(),
deletedRows: kv.NewHandleMap(),
}
udb.tables[tid] = dt
}
udb.Unlock()
return dt
}

// DirtyTable stores uncommitted write operation for a transaction.
type DirtyTable struct {
tid int64
// addedRows ...
// the key is handle.
addedRows *kv.HandleMap
deletedRows *kv.HandleMap
}

// AddRow adds a row to the DirtyDB.
func (dt *DirtyTable) AddRow(handle kv.Handle) {
dt.addedRows.Set(handle, true)
}

// DeleteRow deletes a row from the DirtyDB.
func (dt *DirtyTable) DeleteRow(handle kv.Handle) {
dt.addedRows.Delete(handle)
dt.deletedRows.Set(handle, true)
}

// IsEmpty checks whether the table is empty.
func (dt *DirtyTable) IsEmpty() bool {
return dt.addedRows.Len()+dt.deletedRows.Len() == 0
}

// GetDirtyDB returns the DirtyDB bind to the context.
func GetDirtyDB(ctx sessionctx.Context) *DirtyDB {
var udb *DirtyDB
x := ctx.GetSessionVars().TxnCtx.DirtyDB
if x == nil {
udb = &DirtyDB{tables: make(map[int64]*DirtyTable)}
ctx.GetSessionVars().TxnCtx.DirtyDB = udb
} else {
udb = x.(*DirtyDB)
}
return udb
}

// UnionScanExec merges the rows from dirty table and the rows from distsql request.
type UnionScanExec struct {
baseExecutor

dirty *DirtyTable
memBuf kv.MemBuffer
memBufSnap kv.Getter

// usedIndex is the column offsets of the index which Src executor has used.
usedIndex []int
desc bool
Expand Down Expand Up @@ -140,6 +75,18 @@ func (us *UnionScanExec) open(ctx context.Context) error {
}

defer trace.StartRegion(ctx, "UnionScanBuildRows").End()
txn, err := us.ctx.Txn(false)
if err != nil {
return err
}

mb := txn.GetMemBuffer()
mb.RLock()
defer mb.RUnlock()

us.memBuf = mb
us.memBufSnap = mb.SnapshotGetter()

// 1. select without virtual columns
// 2. build virtual columns and select with virtual columns
switch x := reader.(type) {
Expand All @@ -161,6 +108,8 @@ func (us *UnionScanExec) open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
us.memBuf.RLock()
defer us.memBuf.RUnlock()
req.GrowAndReset(us.maxChunkSize)
mutableRow := chunk.MutRowFromTypes(retTypes(us))
for i, batchSize := 0, req.Capacity(); i < batchSize; i++ {
Expand Down Expand Up @@ -265,10 +214,8 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
if err != nil {
return nil, err
}
if _, ok := us.dirty.deletedRows.Get(snapshotHandle); ok {
continue
}
if _, ok := us.dirty.addedRows.Get(snapshotHandle); ok {
checkKey := us.table.RecordKey(snapshotHandle)
if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil {
// If src handle appears in added rows, it means there is conflict and the transaction will fail to
// commit, but for simplicity, we don't handle it here.
continue
Expand Down
25 changes: 25 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,31 @@ func (s *testSuite7) TestForUpdateUntouchedIndex(c *C) {
tk.MustExec("admin check table t")
}

func (s *testSuite7) TestUpdateScanningHandles(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int primary key, b int);")
tk.MustExec("begin")
for i := 2; i < 100000; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a bench test? so much data may slow the CI.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A correctness test. If the snapshot getter reads the newly added key, the result of this SQL will not empty. The first prototype of this PR doesn't implement snapshot getter, and it fails to pass this test.

tk.MustExec("insert into t values (?, ?)", i, i)
}
tk.MustExec("commit;")

tk.MustExec("set tidb_distsql_scan_concurrency = 1;")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why set those concurrency variables?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To slow down the execution, otherwise, the concurrent scanner cannot scan the updated rows.

This SQL is a special case, tries to update handle + 1000 when reading headle. If the executor is slow enough, it can read some dirty data.

tk.MustExec("set tidb_index_lookup_join_concurrency = 1;")
tk.MustExec("set tidb_projection_concurrency=1;")
tk.MustExec("set tidb_init_chunk_size=1;")
tk.MustExec("set tidb_max_chunk_size=32;")

tk.MustExec("begin")
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("update /*+ INL_JOIN(t1) */ t t1, (select a, b from t) t2 set t1.b = t2.b where t1.a = t2.a + 1000;")
result := tk.MustQuery("select a, a-b from t where a > 1000 and a - b != 1000;")
c.Assert(result.Rows(), HasLen, 0)
tk.MustExec("rollback;")
}

// See https://github.com/pingcap/tidb/issues/19136
func (s *testSuite7) TestForApplyAndUnionScan(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down
13 changes: 13 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ type MemBufferIterator interface {
type MemBuffer interface {
RetrieverMutator

// RLock locks the MemBuffer for shared read.
// In the most case, MemBuffer will only used by single goroutine,
// but it will be read by multiple goroutine when combined with executor.UnionScanExec.
// To avoid race introduced by executor.UnionScanExec, MemBuffer expose read lock for it.
RLock()
// RUnlock unlocks the MemBuffer.
RUnlock()

// GetFlags returns the latest flags associated with key.
GetFlags(Key) (KeyFlags, error)
// IterWithFlags returns a MemBufferIterator.
Expand Down Expand Up @@ -202,6 +210,11 @@ type MemBuffer interface {
// InspectStage used to inspect the value updates in the given stage.
InspectStage(StagingHandle, func(Key, KeyFlags, []byte))

// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
SnapshotGetter() Getter
// SnapshotIter returns a Iterator for a snapshot of MemBuffer.
SnapshotIter(k, upperbound Key) Iterator

// Size returns sum of keys and values length.
Size() int
// Len returns the number of entries in the DB.
Expand Down
16 changes: 16 additions & 0 deletions kv/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"reflect"
"sync"
"sync/atomic"
"unsafe"
)
Expand Down Expand Up @@ -120,6 +121,9 @@ var tombstone = []byte{}
// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// If there are persistent flags associated with key, we will keep this key in node without value.
type memdb struct {
// This RWMutex only used to ensure memdbSnapGetter.Get will not race with
// concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags.
sync.RWMutex
root memdbArenaAddr
allocator nodeAllocator
vlog memdbVlog
Expand All @@ -145,6 +149,9 @@ func newMemDB() *memdb {
}

func (db *memdb) Staging() StagingHandle {
db.Lock()
defer db.Unlock()

db.stages = append(db.stages, db.vlog.checkpoint())
return StagingHandle(len(db.stages))
}
Expand All @@ -155,6 +162,9 @@ func (db *memdb) Release(h StagingHandle) {
// Use panic to make debug easier.
panic("cannot release staging buffer")
}

db.Lock()
defer db.Unlock()
if int(h) == 1 {
tail := db.vlog.checkpoint()
if !db.stages[0].isSamePosition(&tail) {
Expand All @@ -174,6 +184,8 @@ func (db *memdb) Cleanup(h StagingHandle) {
panic("cannot cleanup staging buffer")
}

db.Lock()
defer db.Unlock()
cp := &db.stages[int(h)-1]
if !db.vlogInvalid {
db.vlog.revertToCheckpoint(db, cp)
Expand Down Expand Up @@ -276,6 +288,10 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error {
return ErrEntryTooLarge.GenWithStackByArgs(db.entrySizeLimit, size)
}
}

db.Lock()
defer db.Unlock()

if len(db.stages) == 0 {
db.dirty = true
}
Expand Down
12 changes: 12 additions & 0 deletions kv/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte {
return block[valueOff:lenOff]
}

func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *memdbCheckpoint) ([]byte, bool) {
for !addr.isNull() {
if !l.canModify(snap, addr) {
return l.getValue(addr), true
}
var hdr memdbVlogHdr
hdr.load(l.blocks[addr.idx].buf[addr.off-memdbVlogHdrSize:])
addr = hdr.oldValue
}
return nil, false
}

func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) {
cursor := l.checkpoint()
for !cp.isSamePosition(&cursor) {
Expand Down
Loading