Skip to content

Commit

Permalink
util: fix data race in ListInDisk (pingcap#35199)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored Jun 8, 2022
1 parent a523d76 commit 5eab362
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 16 deletions.
6 changes: 5 additions & 1 deletion executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type hashRowContainer struct {

rowContainer *chunk.RowContainer
memTracker *memory.Tracker

// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
Expand Down Expand Up @@ -122,7 +125,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
var matchedRow chunk.Row
matchedPtrs = matchedPtrs[:0]
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -223,6 +226,7 @@ func (c *hashRowContainer) Len() uint64 {

func (c *hashRowContainer) Close() error {
defer c.memTracker.Detach()
c.chkBuf = nil
return c.rowContainer.Close()
}

Expand Down
19 changes: 8 additions & 11 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io"
"os"
"strconv"
"sync"

errors2 "github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand All @@ -40,8 +39,6 @@ type ListInDisk struct {

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter

chkPool *sync.Pool // Using a Chunk Pool to avoid constructing a chunk structure for each GetRow()
}

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
Expand Down Expand Up @@ -108,9 +105,6 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
chkPool: &sync.Pool{New: func() interface{} {
return NewChunkWithCapacity(fieldTypes, 1024)
}},
}
return l
}
Expand Down Expand Up @@ -190,6 +184,12 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
row, _, err = l.GetRowAndAppendToChunk(ptr, nil)
return row, err
}

// GetRowAndAppendToChunk gets a Row from the ListInDisk by RowPtr. Return the Row and the Ref Chunk.
func (l *ListInDisk) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) {
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx)
if err != nil {
return
Expand All @@ -198,12 +198,10 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
return row, err
return row, nil, err
}
chk := l.chkPool.Get().(*Chunk)
row, chk = format.toRow(l.fieldTypes, chk)
l.chkPool.Put(chk)
return row, err
return row, chk, err
}

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
Expand Down Expand Up @@ -241,7 +239,6 @@ func (l *ListInDisk) Close() error {
terror.Call(l.offsetFile.disk.Close)
terror.Log(os.Remove(l.offsetFile.disk.Name()))
}
l.chkPool = nil
return nil
}

Expand Down
14 changes: 10 additions & 4 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,22 @@ func (c *RowContainer) GetChunk(chkIdx int) (*Chunk, error) {
}

// GetRow returns the row the ptr pointed to.
func (c *RowContainer) GetRow(ptr RowPtr) (Row, error) {
func (c *RowContainer) GetRow(ptr RowPtr) (row Row, err error) {
row, _, err = c.GetRowAndAppendToChunk(ptr, nil)
return row, err
}

// GetRowAndAppendToChunk gets a Row from the RowContainer by RowPtr.
func (c *RowContainer) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) {
c.m.RLock()
defer c.m.RUnlock()
if c.alreadySpilled() {
if err := c.m.records.spillError; err != nil {
return Row{}, err
return Row{}, nil, err
}
return c.m.records.inDisk.GetRow(ptr)
return c.m.records.inDisk.GetRowAndAppendToChunk(ptr, chk)
}
return c.m.records.inMemory.GetRow(ptr), nil
return c.m.records.inMemory.GetRow(ptr), nil, nil
}

// GetMemTracker returns the memory tracker in records, panics if the RowContainer has already spilled.
Expand Down

0 comments on commit 5eab362

Please sign in to comment.