Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: fix data race in ListInDisk #35199

Merged
merged 4 commits into from
Jun 8, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion executor/hash_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,9 @@ type hashRowContainer struct {

rowContainer *chunk.RowContainer
memTracker *memory.Tracker

// chkBuf buffer the data reads from the disk if rowContainer is spilled.
chkBuf *chunk.Chunk
}

func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer {
Expand Down Expand Up @@ -122,7 +125,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk
var matchedRow chunk.Row
matchedPtrs = matchedPtrs[:0]
for _, ptr := range innerPtrs {
matchedRow, err = c.rowContainer.GetRow(ptr)
matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf)
if err != nil {
return nil, nil, err
}
Expand Down Expand Up @@ -223,6 +226,7 @@ func (c *hashRowContainer) Len() uint64 {

func (c *hashRowContainer) Close() error {
defer c.memTracker.Detach()
c.chkBuf = nil
return c.rowContainer.Close()
}

Expand Down
19 changes: 8 additions & 11 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io"
"os"
"strconv"
"sync"

errors2 "github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand All @@ -40,8 +39,6 @@ type ListInDisk struct {

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter

chkPool *sync.Pool // Using a Chunk Pool to avoid constructing a chunk structure for each GetRow()
}

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
Expand Down Expand Up @@ -108,9 +105,6 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
fieldTypes: fieldTypes,
// TODO(fengliyuan): set the quota of disk usage.
diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1),
chkPool: &sync.Pool{New: func() interface{} {
return NewChunkWithCapacity(fieldTypes, 1024)
}},
}
return l
}
Expand Down Expand Up @@ -190,6 +184,12 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
row, _, err = l.GetRowAndAppendToChunk(ptr, nil)
return row, err
}

// GetRowAndAppendToChunk gets a Row from the ListInDisk by RowPtr. Return the Row and the Ref Chunk.
func (l *ListInDisk) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) {
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx)
if err != nil {
return
Expand All @@ -198,12 +198,10 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
return row, err
return row, nil, err
}
chk := l.chkPool.Get().(*Chunk)
row, chk = format.toRow(l.fieldTypes, chk)
l.chkPool.Put(chk)
return row, err
return row, chk, err
}

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
Expand Down Expand Up @@ -241,7 +239,6 @@ func (l *ListInDisk) Close() error {
terror.Call(l.offsetFile.disk.Close)
terror.Log(os.Remove(l.offsetFile.disk.Name()))
}
l.chkPool = nil
return nil
}

Expand Down
14 changes: 10 additions & 4 deletions util/chunk/row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,16 +242,22 @@ func (c *RowContainer) GetChunk(chkIdx int) (*Chunk, error) {
}

// GetRow returns the row the ptr pointed to.
func (c *RowContainer) GetRow(ptr RowPtr) (Row, error) {
func (c *RowContainer) GetRow(ptr RowPtr) (row Row, err error) {
row, _, err = c.GetRowAndAppendToChunk(ptr, nil)
return row, err
}

// GetRowAndAppendToChunk gets a Row from the RowContainer by RowPtr.
func (c *RowContainer) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) {
c.m.RLock()
defer c.m.RUnlock()
if c.alreadySpilled() {
if err := c.m.records.spillError; err != nil {
return Row{}, err
return Row{}, nil, err
}
return c.m.records.inDisk.GetRow(ptr)
return c.m.records.inDisk.GetRowAndAppendToChunk(ptr, chk)
}
return c.m.records.inMemory.GetRow(ptr), nil
return c.m.records.inMemory.GetRow(ptr), nil, nil
}

// GetMemTracker returns the memory tracker in records, panics if the RowContainer has already spilled.
Expand Down