From 015cd4f7b0f02e19c445be25076d42e2e33b7b42 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Jun 2022 12:05:06 +0800 Subject: [PATCH 1/3] fix race --- executor/hash_table.go | 3 ++- util/chunk/disk.go | 19 ++++++++----------- util/chunk/row_container.go | 14 ++++++++++---- 3 files changed, 20 insertions(+), 16 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index adf6f65832770..1509f5b9decb9 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -120,8 +120,9 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = make([]chunk.Row, 0, len(innerPtrs)) var matchedRow chunk.Row matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs)) + var chk *chunk.Chunk for _, ptr := range innerPtrs { - matchedRow, err = c.rowContainer.GetRow(ptr) + matchedRow, chk, err = c.rowContainer.GetRowAndAppendToChunk(ptr, chk) if err != nil { return } diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 56e098d6a0c6d..089e9fd9b6a6a 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -18,7 +18,6 @@ import ( "io" "os" "strconv" - "sync" errors2 "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -40,8 +39,6 @@ type ListInDisk struct { dataFile diskFileReaderWriter offsetFile diskFileReaderWriter - - chkPool *sync.Pool // Using a Chunk Pool to avoid constructing a chunk structure for each GetRow() } // diskFileReaderWriter represents a Reader and a Writer for the temporary disk file. @@ -108,9 +105,6 @@ func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { fieldTypes: fieldTypes, // TODO(fengliyuan): set the quota of disk usage. diskTracker: disk.NewTracker(memory.LabelForChunkListInDisk, -1), - chkPool: &sync.Pool{New: func() interface{} { - return NewChunkWithCapacity(fieldTypes, 1024) - }}, } return l } @@ -190,6 +184,12 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { // GetRow gets a Row from the ListInDisk by RowPtr. func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { + row, _, err = l.GetRowAndAppendToChunk(ptr, nil) + return row, err +} + +// GetRowAndAppendToChunk gets a Row from the ListInDisk by RowPtr. Return the Row and the Ref Chunk. +func (l *ListInDisk) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) { off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) if err != nil { return @@ -198,12 +198,10 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { - return row, err + return row, nil, err } - chk := l.chkPool.Get().(*Chunk) row, chk = format.toRow(l.fieldTypes, chk) - l.chkPool.Put(chk) - return row, err + return row, chk, err } func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) { @@ -241,7 +239,6 @@ func (l *ListInDisk) Close() error { terror.Call(l.offsetFile.disk.Close) terror.Log(os.Remove(l.offsetFile.disk.Name())) } - l.chkPool = nil return nil } diff --git a/util/chunk/row_container.go b/util/chunk/row_container.go index 20d5496fa533d..e80e8b7485b77 100644 --- a/util/chunk/row_container.go +++ b/util/chunk/row_container.go @@ -242,16 +242,22 @@ func (c *RowContainer) GetChunk(chkIdx int) (*Chunk, error) { } // GetRow returns the row the ptr pointed to. -func (c *RowContainer) GetRow(ptr RowPtr) (Row, error) { +func (c *RowContainer) GetRow(ptr RowPtr) (row Row, err error) { + row, _, err = c.GetRowAndAppendToChunk(ptr, nil) + return row, err +} + +// GetRowAndAppendToChunk gets a Row from the RowContainer by RowPtr. +func (c *RowContainer) GetRowAndAppendToChunk(ptr RowPtr, chk *Chunk) (row Row, _ *Chunk, err error) { c.m.RLock() defer c.m.RUnlock() if c.alreadySpilled() { if err := c.m.records.spillError; err != nil { - return Row{}, err + return Row{}, nil, err } - return c.m.records.inDisk.GetRow(ptr) + return c.m.records.inDisk.GetRowAndAppendToChunk(ptr, chk) } - return c.m.records.inMemory.GetRow(ptr), nil + return c.m.records.inMemory.GetRow(ptr), nil, nil } // GetMemTracker returns the memory tracker in records, panics if the RowContainer has already spilled. From 0873e76e9ce8ccabd8fcc50f2487915492caf3e0 Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Jun 2022 14:45:44 +0800 Subject: [PATCH 2/3] address comments --- executor/hash_table.go | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 1509f5b9decb9..95f4bd39d5959 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -86,6 +86,9 @@ type hashRowContainer struct { rowContainer *chunk.RowContainer memTracker *memory.Tracker + + // chkBuf buffer the data if rowContainer is spilled. + chkBuf *chunk.Chunk } func newHashRowContainer(sCtx sessionctx.Context, estCount int, hCtx *hashContext, allTypes []*types.FieldType) *hashRowContainer { @@ -120,9 +123,8 @@ func (c *hashRowContainer) GetMatchedRowsAndPtrs(probeKey uint64, probeRow chunk matched = make([]chunk.Row, 0, len(innerPtrs)) var matchedRow chunk.Row matchedPtrs = make([]chunk.RowPtr, 0, len(innerPtrs)) - var chk *chunk.Chunk for _, ptr := range innerPtrs { - matchedRow, chk, err = c.rowContainer.GetRowAndAppendToChunk(ptr, chk) + matchedRow, c.chkBuf, err = c.rowContainer.GetRowAndAppendToChunk(ptr, c.chkBuf) if err != nil { return } @@ -223,6 +225,7 @@ func (c *hashRowContainer) Len() uint64 { func (c *hashRowContainer) Close() error { defer c.memTracker.Detach() + c.chkBuf = nil return c.rowContainer.Close() } From b3c63f73f9b099ccd15dee89dfec95f22403975d Mon Sep 17 00:00:00 2001 From: wshwsh12 <793703860@qq.com> Date: Tue, 7 Jun 2022 15:46:39 +0800 Subject: [PATCH 3/3] polish comments --- executor/hash_table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/executor/hash_table.go b/executor/hash_table.go index 95f4bd39d5959..c57405597aa71 100644 --- a/executor/hash_table.go +++ b/executor/hash_table.go @@ -87,7 +87,7 @@ type hashRowContainer struct { rowContainer *chunk.RowContainer memTracker *memory.Tracker - // chkBuf buffer the data if rowContainer is spilled. + // chkBuf buffer the data reads from the disk if rowContainer is spilled. chkBuf *chunk.Chunk }