Skip to content

Commit

Permalink
*: remove DirtyDB and DirtyTable to reduce memory usage (#19042)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zejun Li authored Sep 4, 2020
1 parent 5717194 commit ce849c3
Show file tree
Hide file tree
Showing 12 changed files with 204 additions and 144 deletions.
6 changes: 0 additions & 6 deletions executor/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -976,8 +976,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
// goroutines write empty DirtyTable to DirtyDB for this table concurrently. Although the DirtyDB looks
// safe for data race in all the cases, the map of golang will throw panic when it's accessed in parallel.
// So we lock it when getting dirty table.
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand All @@ -992,8 +990,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand All @@ -1007,8 +1003,6 @@ func (b *executorBuilder) buildUnionScanFromReader(reader Executor, v *plannerco
}
}
}
physicalTableID := getPhysicalTableID(x.table)
us.dirty = GetDirtyDB(b.ctx).GetDirtyTable(physicalTableID)
us.conditions, us.conditionsWithVirCol = plannercore.SplitSelCondsWithVirtualColumn(v.Conditions)
us.columns = x.columns
us.table = x.table
Expand Down
8 changes: 1 addition & 7 deletions executor/mem_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ func buildMemIndexReader(us *UnionScanExec, idxReader *IndexReaderExecutor) *mem
kvRanges: kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
Expand Down Expand Up @@ -194,7 +193,6 @@ func buildMemTableReader(us *UnionScanExec, tblReader *TableReaderExecutor) *mem
kvRanges: tblReader.kvRanges,
desc: us.desc,
conditions: us.conditions,
addedRows: make([][]types.Datum, 0, us.dirty.addedRows.Len()),
retFieldTypes: retTypes(us),
colIDs: colIDs,
buffer: allocBuf{
Expand Down Expand Up @@ -329,10 +327,7 @@ func iterTxnMemBuffer(ctx sessionctx.Context, kvRanges []kv.KeyRange, fn process
return err
}
for _, rg := range kvRanges {
iter, err := txn.GetMemBuffer().Iter(rg.StartKey, rg.EndKey)
if err != nil {
return err
}
iter := txn.GetMemBuffer().SnapshotIter(rg.StartKey, rg.EndKey)
for ; iter.Valid(); err = iter.Next() {
if err != nil {
return err
Expand Down Expand Up @@ -399,7 +394,6 @@ func buildMemIndexLookUpReader(us *UnionScanExec, idxLookUpReader *IndexLookUpEx
table: idxLookUpReader.table.Meta(),
kvRanges: kvRanges,
desc: idxLookUpReader.desc,
addedRowsLen: us.dirty.addedRows.Len(),
retFieldTypes: retTypes(us),
outputOffset: outputOffset,
belowHandleCols: us.belowHandleCols,
Expand Down
91 changes: 19 additions & 72 deletions executor/union_scan.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,88 +17,23 @@ import (
"context"
"fmt"
"runtime/trace"
"sync"

"github.com/pingcap/parser/model"
"github.com/pingcap/tidb/expression"
"github.com/pingcap/tidb/kv"
plannercore "github.com/pingcap/tidb/planner/core"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/chunk"
)

// DirtyDB stores uncommitted write operations for a transaction.
// It is stored and retrieved by context.Value and context.SetValue method.
type DirtyDB struct {
sync.Mutex

// tables is a map whose key is tableID.
tables map[int64]*DirtyTable
}

// GetDirtyTable gets the DirtyTable by id from the DirtyDB.
func (udb *DirtyDB) GetDirtyTable(tid int64) *DirtyTable {
// The index join access the tables map parallelly.
// But the map throws panic in this case. So it's locked.
udb.Lock()
dt, ok := udb.tables[tid]
if !ok {
dt = &DirtyTable{
tid: tid,
addedRows: kv.NewHandleMap(),
deletedRows: kv.NewHandleMap(),
}
udb.tables[tid] = dt
}
udb.Unlock()
return dt
}

// DirtyTable stores uncommitted write operation for a transaction.
type DirtyTable struct {
tid int64
// addedRows ...
// the key is handle.
addedRows *kv.HandleMap
deletedRows *kv.HandleMap
}

// AddRow adds a row to the DirtyDB.
func (dt *DirtyTable) AddRow(handle kv.Handle) {
dt.addedRows.Set(handle, true)
}

// DeleteRow deletes a row from the DirtyDB.
func (dt *DirtyTable) DeleteRow(handle kv.Handle) {
dt.addedRows.Delete(handle)
dt.deletedRows.Set(handle, true)
}

// IsEmpty checks whether the table is empty.
func (dt *DirtyTable) IsEmpty() bool {
return dt.addedRows.Len()+dt.deletedRows.Len() == 0
}

// GetDirtyDB returns the DirtyDB bind to the context.
func GetDirtyDB(ctx sessionctx.Context) *DirtyDB {
var udb *DirtyDB
x := ctx.GetSessionVars().TxnCtx.DirtyDB
if x == nil {
udb = &DirtyDB{tables: make(map[int64]*DirtyTable)}
ctx.GetSessionVars().TxnCtx.DirtyDB = udb
} else {
udb = x.(*DirtyDB)
}
return udb
}

// UnionScanExec merges the rows from dirty table and the rows from distsql request.
type UnionScanExec struct {
baseExecutor

dirty *DirtyTable
memBuf kv.MemBuffer
memBufSnap kv.Getter

// usedIndex is the column offsets of the index which Src executor has used.
usedIndex []int
desc bool
Expand Down Expand Up @@ -140,6 +75,18 @@ func (us *UnionScanExec) open(ctx context.Context) error {
}

defer trace.StartRegion(ctx, "UnionScanBuildRows").End()
txn, err := us.ctx.Txn(false)
if err != nil {
return err
}

mb := txn.GetMemBuffer()
mb.RLock()
defer mb.RUnlock()

us.memBuf = mb
us.memBufSnap = mb.SnapshotGetter()

// 1. select without virtual columns
// 2. build virtual columns and select with virtual columns
switch x := reader.(type) {
Expand All @@ -161,6 +108,8 @@ func (us *UnionScanExec) open(ctx context.Context) error {

// Next implements the Executor Next interface.
func (us *UnionScanExec) Next(ctx context.Context, req *chunk.Chunk) error {
us.memBuf.RLock()
defer us.memBuf.RUnlock()
req.GrowAndReset(us.maxChunkSize)
mutableRow := chunk.MutRowFromTypes(retTypes(us))
for i, batchSize := 0, req.Capacity(); i < batchSize; i++ {
Expand Down Expand Up @@ -265,10 +214,8 @@ func (us *UnionScanExec) getSnapshotRow(ctx context.Context) ([]types.Datum, err
if err != nil {
return nil, err
}
if _, ok := us.dirty.deletedRows.Get(snapshotHandle); ok {
continue
}
if _, ok := us.dirty.addedRows.Get(snapshotHandle); ok {
checkKey := us.table.RecordKey(snapshotHandle)
if _, err := us.memBufSnap.Get(context.TODO(), checkKey); err == nil {
// If src handle appears in added rows, it means there is conflict and the transaction will fail to
// commit, but for simplicity, we don't handle it here.
continue
Expand Down
25 changes: 25 additions & 0 deletions executor/union_scan_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,31 @@ func (s *testSuite7) TestForUpdateUntouchedIndex(c *C) {
tk.MustExec("admin check table t")
}

func (s *testSuite7) TestUpdateScanningHandles(c *C) {
tk := testkit.NewTestKit(c, s.store)
tk.MustExec("use test")
tk.MustExec("drop table if exists t;")
tk.MustExec("create table t(a int primary key, b int);")
tk.MustExec("begin")
for i := 2; i < 100000; i++ {
tk.MustExec("insert into t values (?, ?)", i, i)
}
tk.MustExec("commit;")

tk.MustExec("set tidb_distsql_scan_concurrency = 1;")
tk.MustExec("set tidb_index_lookup_join_concurrency = 1;")
tk.MustExec("set tidb_projection_concurrency=1;")
tk.MustExec("set tidb_init_chunk_size=1;")
tk.MustExec("set tidb_max_chunk_size=32;")

tk.MustExec("begin")
tk.MustExec("insert into t values (1, 1);")
tk.MustExec("update /*+ INL_JOIN(t1) */ t t1, (select a, b from t) t2 set t1.b = t2.b where t1.a = t2.a + 1000;")
result := tk.MustQuery("select a, a-b from t where a > 1000 and a - b != 1000;")
c.Assert(result.Rows(), HasLen, 0)
tk.MustExec("rollback;")
}

// See https://github.com/pingcap/tidb/issues/19136
func (s *testSuite7) TestForApplyAndUnionScan(c *C) {
tk := testkit.NewTestKit(c, s.store)
Expand Down
13 changes: 13 additions & 0 deletions kv/kv.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,14 @@ type MemBufferIterator interface {
type MemBuffer interface {
RetrieverMutator

// RLock locks the MemBuffer for shared read.
// In the most case, MemBuffer will only used by single goroutine,
// but it will be read by multiple goroutine when combined with executor.UnionScanExec.
// To avoid race introduced by executor.UnionScanExec, MemBuffer expose read lock for it.
RLock()
// RUnlock unlocks the MemBuffer.
RUnlock()

// GetFlags returns the latest flags associated with key.
GetFlags(Key) (KeyFlags, error)
// IterWithFlags returns a MemBufferIterator.
Expand Down Expand Up @@ -202,6 +210,11 @@ type MemBuffer interface {
// InspectStage used to inspect the value updates in the given stage.
InspectStage(StagingHandle, func(Key, KeyFlags, []byte))

// SnapshotGetter returns a Getter for a snapshot of MemBuffer.
SnapshotGetter() Getter
// SnapshotIter returns a Iterator for a snapshot of MemBuffer.
SnapshotIter(k, upperbound Key) Iterator

// Size returns sum of keys and values length.
Size() int
// Len returns the number of entries in the DB.
Expand Down
16 changes: 16 additions & 0 deletions kv/memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"bytes"
"context"
"reflect"
"sync"
"sync/atomic"
"unsafe"
)
Expand Down Expand Up @@ -120,6 +121,9 @@ var tombstone = []byte{}
// When discarding a newly added KV in `Cleanup`, the non-persistent flags will be cleared.
// If there are persistent flags associated with key, we will keep this key in node without value.
type memdb struct {
// This RWMutex only used to ensure memdbSnapGetter.Get will not race with
// concurrent memdb.Set, memdb.SetWithFlags, memdb.Delete and memdb.UpdateFlags.
sync.RWMutex
root memdbArenaAddr
allocator nodeAllocator
vlog memdbVlog
Expand All @@ -145,6 +149,9 @@ func newMemDB() *memdb {
}

func (db *memdb) Staging() StagingHandle {
db.Lock()
defer db.Unlock()

db.stages = append(db.stages, db.vlog.checkpoint())
return StagingHandle(len(db.stages))
}
Expand All @@ -155,6 +162,9 @@ func (db *memdb) Release(h StagingHandle) {
// Use panic to make debug easier.
panic("cannot release staging buffer")
}

db.Lock()
defer db.Unlock()
if int(h) == 1 {
tail := db.vlog.checkpoint()
if !db.stages[0].isSamePosition(&tail) {
Expand All @@ -174,6 +184,8 @@ func (db *memdb) Cleanup(h StagingHandle) {
panic("cannot cleanup staging buffer")
}

db.Lock()
defer db.Unlock()
cp := &db.stages[int(h)-1]
if !db.vlogInvalid {
db.vlog.revertToCheckpoint(db, cp)
Expand Down Expand Up @@ -276,6 +288,10 @@ func (db *memdb) set(key Key, value []byte, ops ...FlagsOp) error {
return ErrEntryTooLarge.GenWithStackByArgs(db.entrySizeLimit, size)
}
}

db.Lock()
defer db.Unlock()

if len(db.stages) == 0 {
db.dirty = true
}
Expand Down
12 changes: 12 additions & 0 deletions kv/memdb_arena.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,18 @@ func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte {
return block[valueOff:lenOff]
}

func (l *memdbVlog) getSnapshotValue(addr memdbArenaAddr, snap *memdbCheckpoint) ([]byte, bool) {
for !addr.isNull() {
if !l.canModify(snap, addr) {
return l.getValue(addr), true
}
var hdr memdbVlogHdr
hdr.load(l.blocks[addr.idx].buf[addr.off-memdbVlogHdrSize:])
addr = hdr.oldValue
}
return nil, false
}

func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) {
cursor := l.checkpoint()
for !cp.isSamePosition(&cursor) {
Expand Down
Loading

0 comments on commit ce849c3

Please sign in to comment.