Skip to content

Commit

Permalink
util: support spill offset into disk when spilling (#34212)
Browse files Browse the repository at this point in the history
ref #33877
  • Loading branch information
wshwsh12 authored May 5, 2022
1 parent 7925d5a commit ad6d620
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 97 deletions.
169 changes: 117 additions & 52 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"io"
"os"
"strconv"
"sync"

errors2 "github.com/pingcap/errors"
"github.com/pingcap/tidb/config"
Expand All @@ -32,27 +31,73 @@ import (

// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
// offsets stores the offsets in disk of all RowPtr,
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx].
offsets [][]int64
fieldTypes []*types.FieldType
numRowsOfEachChunk []int
rowNumOfEachChunkFirstRow []int
totalNumRows int
diskTracker *disk.Tracker // track disk usage.

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter
}

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
type diskFileReaderWriter struct {
disk *os.File
w io.WriteCloser
// offWrite is the current offset for writing.
offWrite int64

disk *os.File
w io.WriteCloser
bufFlushMutex sync.RWMutex
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer
cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr"

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher
}

func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) {
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName)
if err != nil {
return errors2.Trace(err)
}
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
return
}
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
}
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
return
}

func (l *diskFileReaderWriter) getReader() io.ReaderAt {
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
if l.checksumWriter != nil {
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
}
return underlying
}

func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader {
checksumReader := l.getReader()
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
return r
}

func (l *diskFileReaderWriter) getWriter() io.Writer {
return l.w
}

var defaultChunkListInDiskPath = "chunk.ListInDisk"
var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset"

// NewListInDisk creates a new ListInDisk with field types.
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
Expand All @@ -69,29 +114,17 @@ func (l *ListInDisk) initDiskFile() (err error) {
if err != nil {
return
}
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label()))
err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label()))
if err != nil {
return errors2.Trace(err)
}
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
return
}
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
return
}
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
l.bufFlushMutex = sync.RWMutex{}
err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label()))
return
}

// Len returns the number of rows in ListInDisk
func (l *ListInDisk) Len() int {
return l.numRowsInDisk
return l.totalNumRows
}

// GetDiskTracker returns the memory tracker of this List.
Expand All @@ -101,34 +134,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {

// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently.
// Warning: Do not use Add concurrently.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors2.New("chunk appended to List should have at least 1 row")
}
if l.disk == nil {
if l.dataFile.disk == nil {
err = l.initDiskFile()
if err != nil {
return
}
}
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite}
n, err := chk2.WriteTo(l.w)
l.offWrite += n
// Append data
chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite}
n, err := chkInDisk.WriteTo(l.dataFile.getWriter())
l.dataFile.offWrite += n
if err != nil {
return
}
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
l.diskTracker.Consume(n)
l.numRowsInDisk += chk.NumRows()

// Append offsets
offsetsOfRows := chkInDisk.getOffsetsOfRows()
l.numRowsOfEachChunk = append(l.numRowsOfEachChunk, len(offsetsOfRows))
l.rowNumOfEachChunkFirstRow = append(l.rowNumOfEachChunkFirstRow, l.totalNumRows)
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter())
l.offsetFile.offWrite += n2
if err != nil {
return
}

l.diskTracker.Consume(n + n2)
l.totalNumRows += chk.NumRows()
return
}

// GetChunk gets a Chunk from the ListInDisk by chkIdx.
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
offsets := l.offsets[chkIdx]
for rowIdx := range offsets {
chkSize := l.numRowsOfEachChunk[chkIdx]
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
if err != nil {
return chk, err
Expand All @@ -140,16 +184,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx)
if err != nil {
return
}
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
}
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
r := l.dataFile.getSectionReader(off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand All @@ -159,22 +198,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
return row, err
}

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
offsetInOffsetFile := l.rowNumOfEachChunkFirstRow[chkIdx] + int(rowIdx)
b := make([]byte, 8)
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8)
n, err := io.ReadFull(reader, b)
if err != nil {
return 0, err
}
if n != 8 {
return 0, errors2.New("The file spilled is broken, can not get data offset from the disk")
}
return bytesToI64Slice(b)[0], nil
}

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return len(l.offsets[chkID])
return l.numRowsOfEachChunk[chkID]
}

// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
return len(l.numRowsOfEachChunk)
}

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
if l.dataFile.disk != nil {
l.diskTracker.Consume(-l.diskTracker.BytesConsumed())
terror.Call(l.disk.Close)
terror.Log(os.Remove(l.disk.Name()))
terror.Call(l.dataFile.disk.Close)
terror.Log(os.Remove(l.dataFile.disk.Name()))
}
if l.offsetFile.disk != nil {
terror.Call(l.offsetFile.disk.Close)
terror.Log(os.Remove(l.offsetFile.disk.Name()))
}
return nil
}
Expand All @@ -198,7 +255,15 @@ type chunkInDisk struct {
// offWrite is the current offset for writing.
offWrite int64
// offsetsOfRows stores the offset of each row.
offsetsOfRows []int64
offsetsOfRows offsetsOfRows
}

type offsetsOfRows []int64

// WriteTo serializes the offsetsOfRow, and writes to w.
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(off))
return int64(n), err
}

// WriteTo serializes the chunk into the format of chunkInDisk, and
Expand All @@ -222,7 +287,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {
}

// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows }
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows }

// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
Expand Down
Loading

0 comments on commit ad6d620

Please sign in to comment.