From 5eab3623fcd6609b4bec858638b900c081cee524 Mon Sep 17 00:00:00 2001 From: Shenghui Wu <793703860@qq.com> Date: Wed, 8 Jun 2022 10:26:29 +0800 Subject: [PATCH] util: fix data race in ListInDisk (#35199) close pingcap/tidb#35191 --- executor/hash_table.go | 6 +++++- util/chunk/disk.go | 19 ++++++++----------- util/chunk/row_container.go | 14 ++++++++++---- 3 files changed, 23 insertions(+), 16 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index cd108fabb5031..2794b3f2c2d83 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -86,6 +86,9 @@ type hashRowContainer struct { rowContainer *chunk.RowContainer memTracker *memory.Tracker + + // chkBuf buffer the data reads from the disk if rowContainer is spilled. + chkBuf *chunk.Chunk } func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { @@ -122,7 +125,7 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk var matchedRow chunk.Row matchedPtrs = matchedPtrs[:0] for _, ptr := range innerPtrs { - matchedRow, err = c.rowContainer.GetRow(ptr) + matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf) if err != nil { return nil, nil, err } @@ -223,6 +226,7 @@ func (c *hashRowContainer) Len() uint64 { func (c *hashRowContainer) Close() error { defer c.memTracker.Detach() + c.chkBuf = nil return c.rowContainer.Close() } diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 56e098d6a0c6d..089e9fd9b6a6a 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -18,7 +18,6 @@ import ( "io" "os" "strconv" - "sync" errors2 "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -40,8 +39,6 @@ type ListInDisk struct { dataFile diskFileReaderWriter offsetFile diskFileReaderWriter - - chkPool *sync.Pool // Using a Chunk Pool to avoid constructing a chunk structure for each GetRow() } // diskFileReaderWriter represents a Reader and a Writer for the temporary disk file. @@ -108,9 +105,6 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { fieldTypes: fieldTypes, // TODO(fengliyuan): set the quota of disk usage. diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1), - chkPool: &sync.Pool{New: func() interface{} { - return NewChunkWithCapacity(fieldTypes, 1024) - }}, } return l } @@ -190,6 +184,12 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { // GetRow gets a Row from the ListInDisk by RowPtr. func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { + row, _, err = l.GetRowAndAppendToChunk(ptr, nil) + return row, err +} + +// GetRowAndAppendToChunk gets a Row from the ListInDisk by RowPtr. Return the Row and the Ref Chunk. +func (l *ListInDisk) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) { off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) if err != nil { return @@ -198,12 +198,10 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { - return row, err + return row, nil, err } - chk := l.chkPool.Get().(*Chunk) row, chk = format.toRow(l.fieldTypes, chk) - l.chkPool.Put(chk) - return row, err + return row, chk, err } func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) { @@ -241,7 +239,6 @@ func (l *ListInDisk) Close() error { terror.Call(l.offsetFile.disk.Close) terror.Log(os.Remove(l.offsetFile.disk.Name())) } - l.chkPool = nil return nil } diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 20d5496fa533d..e80e8b7485b77 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -242,16 +242,22 @@ func (c *RowContainer) GetChunk(chkIdx int) (*Chunk, error) { } // GetRow returns the row the ptr pointed to. -func (c *RowContainer) GetRow(ptr RowPtr) (Row, error) { +func (c *RowContainer) GetRow(ptr RowPtr) (row Row, err error) { + row, _, err = c.GetRowAndAppendToChunk(ptr, nil) + return row, err +} + +// GetRowAndAppendToChunk gets a Row from the RowContainer by RowPtr. +func (c *RowContainer) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) { c.m.RLock() defer c.m.RUnlock() if c.alreadySpilled() { if err := c.m.records.spillError; err != nil { - return Row{}, err + return Row{}, nil, err } - return c.m.records.inDisk.GetRow(ptr) + return c.m.records.inDisk.GetRowAndAppendToChunk(ptr, chk) } - return c.m.records.inMemory.GetRow(ptr), nil + return c.m.records.inMemory.GetRow(ptr), nil, nil } // GetMemTracker returns the memory tracker in records, panics if the RowContainer has already spilled.