diff --git a/util/chunk/disk.go b/util/chunk/disk.go index 516d335149b54..ccbc1e0c2365e 100644 --- a/util/chunk/disk.go +++ b/util/chunk/disk.go @@ -18,7 +18,6 @@ import ( "io" "os" "strconv" - "sync" errors2 "github.com/pingcap/errors" "github.com/pingcap/tidb/config" @@ -32,27 +31,73 @@ import ( // ListInDisk represents a slice of chunks storing in temporary disk. type ListInDisk struct { - fieldTypes []*types.FieldType - // offsets stores the offsets in disk of all RowPtr, - // the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx]. - offsets [][]int64 + fieldTypes []*types.FieldType + numRowsOfEachChunk []int + rowNumOfEachChunkFirstRow []int + totalNumRows int + diskTracker *disk.Tracker // track disk usage. + + dataFile diskFileReaderWriter + offsetFile diskFileReaderWriter +} + +// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file. +type diskFileReaderWriter struct { + disk *os.File + w io.WriteCloser // offWrite is the current offset for writing. offWrite int64 - disk *os.File - w io.WriteCloser - bufFlushMutex sync.RWMutex - diskTracker *disk.Tracker // track disk usage. - numRowsInDisk int - checksumWriter *checksum.Writer - cipherWriter *encrypt.Writer + cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr" // ctrCipher stores the key and nonce using by aes encrypt io layer ctrCipher *encrypt.CtrCipher } +func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) { + l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName) + if err != nil { + return errors2.Trace(err) + } + var underlying io.WriteCloser = l.disk + if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { + // The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" + l.ctrCipher, err = encrypt.NewCtrCipher() + if err != nil { + return + } + l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) + underlying = l.cipherWriter + } + l.checksumWriter = checksum.NewWriter(underlying) + l.w = l.checksumWriter + return +} + +func (l *diskFileReaderWriter) getReader() io.ReaderAt { + var underlying io.ReaderAt = l.disk + if l.ctrCipher != nil { + underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) + } + if l.checksumWriter != nil { + underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + } + return underlying +} + +func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader { + checksumReader := l.getReader() + r := io.NewSectionReader(checksumReader, off, l.offWrite-off) + return r +} + +func (l *diskFileReaderWriter) getWriter() io.Writer { + return l.w +} + var defaultChunkListInDiskPath = "chunk.ListInDisk" +var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset" // NewListInDisk creates a new ListInDisk with field types. func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk { @@ -69,29 +114,17 @@ func (l *ListInDisk) initDiskFile() (err error) { if err != nil { return } - l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label())) + err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label())) if err != nil { - return errors2.Trace(err) - } - var underlying io.WriteCloser = l.disk - if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext { - // The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr" - l.ctrCipher, err = encrypt.NewCtrCipher() - if err != nil { - return - } - l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher) - underlying = l.cipherWriter + return } - l.checksumWriter = checksum.NewWriter(underlying) - l.w = l.checksumWriter - l.bufFlushMutex = sync.RWMutex{} + err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label())) return } // Len returns the number of rows in ListInDisk func (l *ListInDisk) Len() int { - return l.numRowsInDisk + return l.totalNumRows } // GetDiskTracker returns the memory tracker of this List. @@ -101,34 +134,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker { // Add adds a chunk to the ListInDisk. Caller must make sure the input chk // is not empty and not used any more and has the same field types. -// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently. +// Warning: Do not use Add concurrently. func (l *ListInDisk) Add(chk *Chunk) (err error) { if chk.NumRows() == 0 { return errors2.New("chunk appended to List should have at least 1 row") } - if l.disk == nil { + if l.dataFile.disk == nil { err = l.initDiskFile() if err != nil { return } } - chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite} - n, err := chk2.WriteTo(l.w) - l.offWrite += n + // Append data + chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite} + n, err := chkInDisk.WriteTo(l.dataFile.getWriter()) + l.dataFile.offWrite += n if err != nil { return } - l.offsets = append(l.offsets, chk2.getOffsetsOfRows()) - l.diskTracker.Consume(n) - l.numRowsInDisk += chk.NumRows() + + // Append offsets + offsetsOfRows := chkInDisk.getOffsetsOfRows() + l.numRowsOfEachChunk = append(l.numRowsOfEachChunk, len(offsetsOfRows)) + l.rowNumOfEachChunkFirstRow = append(l.rowNumOfEachChunkFirstRow, l.totalNumRows) + n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter()) + l.offsetFile.offWrite += n2 + if err != nil { + return + } + + l.diskTracker.Consume(n + n2) + l.totalNumRows += chk.NumRows() return } // GetChunk gets a Chunk from the ListInDisk by chkIdx. func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx)) - offsets := l.offsets[chkIdx] - for rowIdx := range offsets { + chkSize := l.numRowsOfEachChunk[chkIdx] + for rowIdx := 0; rowIdx < chkSize; rowIdx++ { row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)}) if err != nil { return chk, err @@ -140,16 +184,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) { // GetRow gets a Row from the ListInDisk by RowPtr. func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { + off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) if err != nil { return } - off := l.offsets[ptr.ChkIdx][ptr.RowIdx] - var underlying io.ReaderAt = l.disk - if l.ctrCipher != nil { - underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) - } - checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) - r := io.NewSectionReader(checksumReader, off, l.offWrite-off) + r := l.dataFile.getSectionReader(off) format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { @@ -159,22 +198,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) { return row, err } +func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) { + offsetInOffsetFile := l.rowNumOfEachChunkFirstRow[chkIdx] + int(rowIdx) + b := make([]byte, 8) + reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8) + n, err := io.ReadFull(reader, b) + if err != nil { + return 0, err + } + if n != 8 { + return 0, errors2.New("The file spilled is broken, can not get data offset from the disk") + } + return bytesToI64Slice(b)[0], nil +} + // NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk. func (l *ListInDisk) NumRowsOfChunk(chkID int) int { - return len(l.offsets[chkID]) + return l.numRowsOfEachChunk[chkID] } // NumChunks returns the number of chunks in the ListInDisk. func (l *ListInDisk) NumChunks() int { - return len(l.offsets) + return len(l.numRowsOfEachChunk) } // Close releases the disk resource. func (l *ListInDisk) Close() error { - if l.disk != nil { + if l.dataFile.disk != nil { l.diskTracker.Consume(-l.diskTracker.BytesConsumed()) - terror.Call(l.disk.Close) - terror.Log(os.Remove(l.disk.Name())) + terror.Call(l.dataFile.disk.Close) + terror.Log(os.Remove(l.dataFile.disk.Name())) + } + if l.offsetFile.disk != nil { + terror.Call(l.offsetFile.disk.Close) + terror.Log(os.Remove(l.offsetFile.disk.Name())) } return nil } @@ -198,7 +255,15 @@ type chunkInDisk struct { // offWrite is the current offset for writing. offWrite int64 // offsetsOfRows stores the offset of each row. - offsetsOfRows []int64 + offsetsOfRows offsetsOfRows +} + +type offsetsOfRows []int64 + +// WriteTo serializes the offsetsOfRow, and writes to w. +func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) { + n, err := w.Write(i64SliceToBytes(off)) + return int64(n), err } // WriteTo serializes the chunk into the format of chunkInDisk, and @@ -222,7 +287,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) { } // getOffsetsOfRows gets the offset of each row. -func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows } +func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows } // rowInDisk represents a Row in format of diskFormatRow. type rowInDisk struct { diff --git a/util/chunk/disk_test.go b/util/chunk/disk_test.go index c22b525477a02..ff0d56116ff70 100644 --- a/util/chunk/disk_test.go +++ b/util/chunk/disk_test.go @@ -31,8 +31,6 @@ import ( "github.com/pingcap/tidb/parser/mysql" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/types/json" - "github.com/pingcap/tidb/util/checksum" - "github.com/pingcap/tidb/util/encrypt" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -73,15 +71,15 @@ func TestListInDisk(t *testing.T) { defer func() { err := l.Close() require.NoError(t, err) - require.NotNil(t, l.disk) - _, err = os.Stat(l.disk.Name()) + require.NotNil(t, l.dataFile.disk) + _, err = os.Stat(l.dataFile.disk.Name()) require.True(t, os.IsNotExist(err)) }() for _, chk := range chks { err := l.Add(chk) assert.NoError(t, err) } - require.True(t, strings.HasPrefix(l.disk.Name(), filepath.Join(os.TempDir(), "oom-use-tmp-storage"))) + require.True(t, strings.HasPrefix(l.dataFile.disk.Name(), filepath.Join(os.TempDir(), "oom-use-tmp-storage"))) assert.Equal(t, numChk, l.NumChunks()) assert.Greater(t, l.GetDiskTracker().BytesConsumed(), int64(0)) @@ -145,25 +143,47 @@ type listInDiskWriteDisk struct { ListInDisk } +func (l *diskFileReaderWriter) flushForTest() (err error) { + err = l.disk.Close() + if err != nil { + return + } + l.w = nil + // the l.disk is the underlying object of the l.w, it will be closed + // after calling l.w.Close, we need to reopen it before reading rows. + l.disk, err = os.Open(l.disk.Name()) + if err != nil { + return errors2.Trace(err) + } + return nil +} + func newListInDiskWriteDisk(fieldTypes []*types.FieldType) (*listInDiskWriteDisk, error) { l := listInDiskWriteDisk{*NewListInDisk(fieldTypes)} disk, err := os.CreateTemp(config.GetGlobalConfig().TempStoragePath, strconv.Itoa(l.diskTracker.Label())) if err != nil { return nil, err } - l.disk = disk - l.w = disk + l.dataFile.disk = disk + l.dataFile.w = disk + + disk2, err := os.CreateTemp(config.GetGlobalConfig().TempStoragePath, "offset"+strconv.Itoa(l.diskTracker.Label())) + if err != nil { + return nil, err + } + l.offsetFile.disk = disk2 + l.offsetFile.w = disk2 return &l, nil } func (l *listInDiskWriteDisk) GetRow(ptr RowPtr) (row Row, err error) { - err = l.flush() + err = l.flushForTest() + off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx) if err != nil { return } - off := l.offsets[ptr.ChkIdx][ptr.RowIdx] - r := io.NewSectionReader(l.disk, off, l.offWrite-off) + r := io.NewSectionReader(l.dataFile.disk, off, l.dataFile.offWrite-off) format := rowInDisk{numCol: len(l.fieldTypes)} _, err = format.ReadFrom(r) if err != nil { @@ -173,31 +193,12 @@ func (l *listInDiskWriteDisk) GetRow(ptr RowPtr) (row Row, err error) { return row, err } -func (l *listInDiskWriteDisk) flush() (err error) { - // buffered is not zero only after Add and before GetRow, after the first flush, buffered will always be zero, - // hence we use a RWLock to allow quicker quit. - l.bufFlushMutex.RLock() - checksumWriter := l.w - l.bufFlushMutex.RUnlock() - if checksumWriter == nil { - return nil - } - l.bufFlushMutex.Lock() - defer l.bufFlushMutex.Unlock() - if l.w != nil { - err = l.w.Close() - if err != nil { - return - } - l.w = nil - // the l.disk is the underlying object of the l.w, it will be closed - // after calling l.w.Close, we need to reopen it before reading rows. - l.disk, err = os.Open(l.disk.Name()) - if err != nil { - return errors2.Trace(err) - } +func (l *listInDiskWriteDisk) flushForTest() (err error) { + err = l.dataFile.flushForTest() + if err != nil { + return err } - return + return l.offsetFile.flushForTest() } func checkRow(t *testing.T, row1, row2 Row) { @@ -299,11 +300,7 @@ func testReaderWithCache(t *testing.T) { require.NoError(t, err) require.Equal(t, chk.GetRow(0).GetDatumRow(field), row.GetDatumRow(field)) - var underlying io.ReaderAt = l.disk - if l.ctrCipher != nil { - underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) - } - checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + checksumReader := l.dataFile.getReader() // Read all data. data := make([]byte, 1024) @@ -372,12 +369,7 @@ func testReaderWithCacheNoFlush(t *testing.T) { row, err := l.GetRow(RowPtr{0, 0}) require.NoError(t, err) require.Equal(t, chk.GetRow(0).GetDatumRow(field), row.GetDatumRow(field)) - - var underlying io.ReaderAt = l.disk - if l.ctrCipher != nil { - underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset()) - } - checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset()) + checksumReader := l.dataFile.getReader() // Read all data. data := make([]byte, 1024)