Skip to content


util: support spill offset into disk when spilling (pingcap#34212)
Browse files Browse the repository at this point in the history
  • Loading branch information
wshwsh12 authored May 5, 2022
1 parent 7925d5a commit ad6d620
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 97 deletions.
169 changes: 117 additions & 52 deletions util/chunk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (

errors2 ""
Expand All @@ -32,27 +31,73 @@ import (

// ListInDisk represents a slice of chunks storing in temporary disk.
type ListInDisk struct {
fieldTypes []*types.FieldType
// offsets stores the offsets in disk of all RowPtr,
// the offset of one RowPtr is offsets[RowPtr.ChkIdx][RowPtr.RowIdx].
offsets [][]int64
fieldTypes []*types.FieldType
numRowsOfEachChunk []int
rowNumOfEachChunkFirstRow []int
totalNumRows int
diskTracker *disk.Tracker // track disk usage.

dataFile diskFileReaderWriter
offsetFile diskFileReaderWriter

// diskFileReaderWriter represents a Reader and a Writer for the temporary disk file.
type diskFileReaderWriter struct {
disk *os.File
w io.WriteCloser
// offWrite is the current offset for writing.
offWrite int64

disk *os.File
w io.WriteCloser
bufFlushMutex sync.RWMutex
diskTracker *disk.Tracker // track disk usage.
numRowsInDisk int

checksumWriter *checksum.Writer
cipherWriter *encrypt.Writer
cipherWriter *encrypt.Writer // cipherWriter is only enable when config SpilledFileEncryptionMethod is "aes128-ctr"

// ctrCipher stores the key and nonce using by aes encrypt io layer
ctrCipher *encrypt.CtrCipher

func (l *diskFileReaderWriter) initWithFileName(fileName string) (err error) {
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, fileName)
if err != nil {
return errors2.Trace(err)
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter

func (l *diskFileReaderWriter) getReader() io.ReaderAt {
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
if l.checksumWriter != nil {
underlying = NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
return underlying

func (l *diskFileReaderWriter) getSectionReader(off int64) *io.SectionReader {
checksumReader := l.getReader()
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
return r

func (l *diskFileReaderWriter) getWriter() io.Writer {
return l.w

var defaultChunkListInDiskPath = "chunk.ListInDisk"
var defaultChunkListInDiskOffsetPath = "chunk.ListInDiskOffset"

// NewListInDisk creates a new ListInDisk with field types.
func NewListInDisk(fieldTypes []*types.FieldType) *ListInDisk {
Expand All @@ -69,29 +114,17 @@ func (l *ListInDisk) initDiskFile() (err error) {
if err != nil {
l.disk, err = os.CreateTemp(config.GetGlobalConfig().TempStoragePath, defaultChunkListInDiskPath+strconv.Itoa(l.diskTracker.Label()))
err = l.dataFile.initWithFileName(defaultChunkListInDiskPath + strconv.Itoa(l.diskTracker.Label()))
if err != nil {
return errors2.Trace(err)
var underlying io.WriteCloser = l.disk
if config.GetGlobalConfig().Security.SpilledFileEncryptionMethod != config.SpilledFileEncryptionMethodPlaintext {
// The possible values of SpilledFileEncryptionMethod are "plaintext", "aes128-ctr"
l.ctrCipher, err = encrypt.NewCtrCipher()
if err != nil {
l.cipherWriter = encrypt.NewWriter(l.disk, l.ctrCipher)
underlying = l.cipherWriter
l.checksumWriter = checksum.NewWriter(underlying)
l.w = l.checksumWriter
l.bufFlushMutex = sync.RWMutex{}
err = l.offsetFile.initWithFileName(defaultChunkListInDiskOffsetPath + strconv.Itoa(l.diskTracker.Label()))

// Len returns the number of rows in ListInDisk
func (l *ListInDisk) Len() int {
return l.numRowsInDisk
return l.totalNumRows

// GetDiskTracker returns the memory tracker of this List.
Expand All @@ -101,34 +134,45 @@ func (l *ListInDisk) GetDiskTracker() *disk.Tracker {

// Add adds a chunk to the ListInDisk. Caller must make sure the input chk
// is not empty and not used any more and has the same field types.
// Warning: do not mix Add and GetRow (always use GetRow after you have added all the chunks), and do not use Add concurrently.
// Warning: Do not use Add concurrently.
func (l *ListInDisk) Add(chk *Chunk) (err error) {
if chk.NumRows() == 0 {
return errors2.New("chunk appended to List should have at least 1 row")
if l.disk == nil {
if l.dataFile.disk == nil {
err = l.initDiskFile()
if err != nil {
chk2 := chunkInDisk{Chunk: chk, offWrite: l.offWrite}
n, err := chk2.WriteTo(l.w)
l.offWrite += n
// Append data
chkInDisk := chunkInDisk{Chunk: chk, offWrite: l.dataFile.offWrite}
n, err := chkInDisk.WriteTo(l.dataFile.getWriter())
l.dataFile.offWrite += n
if err != nil {
l.offsets = append(l.offsets, chk2.getOffsetsOfRows())
l.numRowsInDisk += chk.NumRows()

// Append offsets
offsetsOfRows := chkInDisk.getOffsetsOfRows()
l.numRowsOfEachChunk = append(l.numRowsOfEachChunk, len(offsetsOfRows))
l.rowNumOfEachChunkFirstRow = append(l.rowNumOfEachChunkFirstRow, l.totalNumRows)
n2, err := offsetsOfRows.WriteTo(l.offsetFile.getWriter())
l.offsetFile.offWrite += n2
if err != nil {

l.diskTracker.Consume(n + n2)
l.totalNumRows += chk.NumRows()

// GetChunk gets a Chunk from the ListInDisk by chkIdx.
func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {
chk := NewChunkWithCapacity(l.fieldTypes, l.NumRowsOfChunk(chkIdx))
offsets := l.offsets[chkIdx]
for rowIdx := range offsets {
chkSize := l.numRowsOfEachChunk[chkIdx]
for rowIdx := 0; rowIdx < chkSize; rowIdx++ {
row, err := l.GetRow(RowPtr{ChkIdx: uint32(chkIdx), RowIdx: uint32(rowIdx)})
if err != nil {
return chk, err
Expand All @@ -140,16 +184,11 @@ func (l *ListInDisk) GetChunk(chkIdx int) (*Chunk, error) {

// GetRow gets a Row from the ListInDisk by RowPtr.
func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
off, err := l.getOffset(ptr.ChkIdx, ptr.RowIdx)
if err != nil {
off := l.offsets[ptr.ChkIdx][ptr.RowIdx]
var underlying io.ReaderAt = l.disk
if l.ctrCipher != nil {
underlying = NewReaderWithCache(encrypt.NewReader(l.disk, l.ctrCipher), l.cipherWriter.GetCache(), l.cipherWriter.GetCacheDataOffset())
checksumReader := NewReaderWithCache(checksum.NewReader(underlying), l.checksumWriter.GetCache(), l.checksumWriter.GetCacheDataOffset())
r := io.NewSectionReader(checksumReader, off, l.offWrite-off)
r := l.dataFile.getSectionReader(off)
format := rowInDisk{numCol: len(l.fieldTypes)}
_, err = format.ReadFrom(r)
if err != nil {
Expand All @@ -159,22 +198,40 @@ func (l *ListInDisk) GetRow(ptr RowPtr) (row Row, err error) {
return row, err

func (l *ListInDisk) getOffset(chkIdx uint32, rowIdx uint32) (int64, error) {
offsetInOffsetFile := l.rowNumOfEachChunkFirstRow[chkIdx] + int(rowIdx)
b := make([]byte, 8)
reader := l.offsetFile.getSectionReader(int64(offsetInOffsetFile) * 8)
n, err := io.ReadFull(reader, b)
if err != nil {
return 0, err
if n != 8 {
return 0, errors2.New("The file spilled is broken, can not get data offset from the disk")
return bytesToI64Slice(b)[0], nil

// NumRowsOfChunk returns the number of rows of a chunk in the ListInDisk.
func (l *ListInDisk) NumRowsOfChunk(chkID int) int {
return len(l.offsets[chkID])
return l.numRowsOfEachChunk[chkID]

// NumChunks returns the number of chunks in the ListInDisk.
func (l *ListInDisk) NumChunks() int {
return len(l.offsets)
return len(l.numRowsOfEachChunk)

// Close releases the disk resource.
func (l *ListInDisk) Close() error {
if l.disk != nil {
if l.dataFile.disk != nil {
if l.offsetFile.disk != nil {
return nil
Expand All @@ -198,7 +255,15 @@ type chunkInDisk struct {
// offWrite is the current offset for writing.
offWrite int64
// offsetsOfRows stores the offset of each row.
offsetsOfRows []int64
offsetsOfRows offsetsOfRows

type offsetsOfRows []int64

// WriteTo serializes the offsetsOfRow, and writes to w.
func (off offsetsOfRows) WriteTo(w io.Writer) (written int64, err error) {
n, err := w.Write(i64SliceToBytes(off))
return int64(n), err

// WriteTo serializes the chunk into the format of chunkInDisk, and
Expand All @@ -222,7 +287,7 @@ func (chk *chunkInDisk) WriteTo(w io.Writer) (written int64, err error) {

// getOffsetsOfRows gets the offset of each row.
func (chk *chunkInDisk) getOffsetsOfRows() []int64 { return chk.offsetsOfRows }
func (chk *chunkInDisk) getOffsetsOfRows() offsetsOfRows { return chk.offsetsOfRows }

// rowInDisk represents a Row in format of diskFormatRow.
type rowInDisk struct {
Expand Down

0 comments on commit ad6d620

Please sign in to comment.