Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

util: avoid column allocator reuse the column hold huge memory (#32554) #32577

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions session/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -1014,6 +1014,11 @@ func createSessionFunc(store kv.Storage) pools.Factory {
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
<<<<<<< HEAD
=======
// Internal session uses default format to prevent memory leak problem.
se.sessionVars.EnableChunkRPC = false
>>>>>>> cce1ebdeb... util: avoid column allocator reuse the column hold huge memory (#32554)
return se, nil
}
}
Expand All @@ -1034,6 +1039,11 @@ func createSessionWithDomainFunc(store kv.Storage) func(*domain.Domain) (pools.R
}
se.sessionVars.CommonGlobalLoaded = true
se.sessionVars.InRestrictedSQL = true
<<<<<<< HEAD
=======
// Internal session uses default format to prevent memory leak problem.
se.sessionVars.EnableChunkRPC = false
>>>>>>> cce1ebdeb... util: avoid column allocator reuse the column hold huge memory (#32554)
return se, nil
}
}
Expand Down
158 changes: 158 additions & 0 deletions util/chunk/alloc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"github.com/cznic/mathutil"
"github.com/pingcap/tidb/types"
)

// Allocator is an interface defined to reduce object allocation.
// The typical usage is to call Reset() to recycle objects into a pool,
// and Alloc() allocates from the pool.
type Allocator interface {
Alloc(fields []*types.FieldType, capacity, maxChunkSize int) *Chunk
Reset()
}

// NewAllocator creates an Allocator.
func NewAllocator() *allocator {
ret := &allocator{}
ret.columnAlloc.init()
return ret
}

var _ Allocator = &allocator{}

// allocator try to reuse objects.
// It uses `poolColumnAllocator` to alloc chunk column objects.
// The allocated chunks are recorded in the `allocated` array.
// After Reset(), those chunks are decoupled into chunk column objects and get
// into `poolColumnAllocator` again for reuse.
type allocator struct {
allocated []*Chunk
free []*Chunk
columnAlloc poolColumnAllocator
}

// Alloc implements the Allocator interface.
func (a *allocator) Alloc(fields []*types.FieldType, capacity, maxChunkSize int) *Chunk {
var chk *Chunk
// Try to alloc from the free list.
if len(a.free) > 0 {
chk = a.free[len(a.free)-1]
a.free = a.free[:len(a.free)-1]
} else {
chk = &Chunk{columns: make([]*Column, 0, len(fields))}
}

// Init the chunk fields.
chk.capacity = mathutil.Min(capacity, maxChunkSize)
chk.requiredRows = maxChunkSize
// Allocate the chunk columns from the pool column allocator.
for _, f := range fields {
chk.columns = append(chk.columns, a.columnAlloc.NewColumn(f, chk.capacity))
}

a.allocated = append(a.allocated, chk)
return chk
}

const (
maxFreeChunks = 64
maxFreeColumnsPerType = 256
)

// Reset implements the Allocator interface.
func (a *allocator) Reset() {
a.free = a.free[:0]
for i, chk := range a.allocated {
a.allocated[i] = nil
// Decouple chunk into chunk column objects and put them back to the column allocator for reuse.
for _, col := range chk.columns {
a.columnAlloc.put(col)
}
// Reset the chunk and put it to the free list for reuse.
chk.resetForReuse()

if len(a.free) < maxFreeChunks { // Don't cache too much data.
a.free = append(a.free, chk)
}
}
a.allocated = a.allocated[:0]
}

var _ ColumnAllocator = &poolColumnAllocator{}

type poolColumnAllocator struct {
pool map[int]freeList
}

// poolColumnAllocator implements the ColumnAllocator interface.
func (alloc *poolColumnAllocator) NewColumn(ft *types.FieldType, count int) *Column {
typeSize := getFixedLen(ft)
l := alloc.pool[typeSize]
if l != nil && !l.empty() {
col := l.pop()
return col
}
return newColumn(typeSize, count)
}

func (alloc *poolColumnAllocator) init() {
alloc.pool = make(map[int]freeList)
}

func (alloc *poolColumnAllocator) put(col *Column) {
if col.avoidReusing {
return
}
typeSize := col.typeSize()
if typeSize <= 0 {
return
}

l := alloc.pool[typeSize]
if l == nil {
l = make(map[*Column]struct{}, 8)
alloc.pool[typeSize] = l
}
l.push(col)
}

// freeList is defined as a map, rather than a list, because when recycling chunk
// columns, there could be duplicated one: some of the chunk columns are just the
// reference to the others.
type freeList map[*Column]struct{}

func (l freeList) empty() bool {
return len(l) == 0
}

func (l freeList) pop() *Column {
for k := range l {
delete(l, k)
return k
}
return nil
}

func (l freeList) push(c *Column) {
if len(l) >= maxFreeColumnsPerType {
// Don't cache too much to save memory.
return
}
l[c] = struct{}{}
}
204 changes: 204 additions & 0 deletions util/chunk/alloc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package chunk

import (
"testing"

"github.com/pingcap/tidb/parser/mysql"
"github.com/pingcap/tidb/types"
"github.com/stretchr/testify/require"
)

func TestAllocator(t *testing.T) {
alloc := NewAllocator()

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}

initCap := 5
maxChunkSize := 100

chk := alloc.Alloc(fieldTypes, initCap, maxChunkSize)
require.NotNil(t, chk)
check := func() {
require.Equal(t, len(fieldTypes), chk.NumCols())
require.Nil(t, chk.columns[0].elemBuf)
require.Nil(t, chk.columns[1].elemBuf)
require.Equal(t, getFixedLen(fieldTypes[2]), len(chk.columns[2].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[3]), len(chk.columns[3].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[4]), len(chk.columns[4].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[5]), len(chk.columns[5].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[6]), len(chk.columns[6].elemBuf))
require.Equal(t, getFixedLen(fieldTypes[7]), len(chk.columns[7].elemBuf))

require.Equal(t, initCap*getFixedLen(fieldTypes[2]), cap(chk.columns[2].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[3]), cap(chk.columns[3].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[4]), cap(chk.columns[4].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[5]), cap(chk.columns[5].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[6]), cap(chk.columns[6].data))
require.Equal(t, initCap*getFixedLen(fieldTypes[7]), cap(chk.columns[7].data))
}
check()

// Call Reset and alloc again, check the result.
alloc.Reset()
chk = alloc.Alloc(fieldTypes, initCap, maxChunkSize)
check()

// Check maxFreeListLen
for i := 0; i < maxFreeChunks+10; i++ {
alloc.Alloc(fieldTypes, initCap, maxChunkSize)
}
alloc.Reset()
require.Equal(t, len(alloc.free), maxFreeChunks)
}

func TestColumnAllocator(t *testing.T) {
fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}

var alloc1 poolColumnAllocator
alloc1.init()
var alloc2 DefaultColumnAllocator

// Test the basic allocate operation.
initCap := 5
for _, ft := range fieldTypes {
v0 := NewColumn(ft, initCap)
v1 := alloc1.NewColumn(ft, initCap)
v2 := alloc2.NewColumn(ft, initCap)
require.Equal(t, v0, v1)
require.Equal(t, v1, v2)
}

ft := fieldTypes[2]
// Test reuse.
cols := make([]*Column, 0, maxFreeColumnsPerType+10)
for i := 0; i < maxFreeColumnsPerType+10; i++ {
col := alloc1.NewColumn(ft, 20)
cols = append(cols, col)
}
for _, col := range cols {
alloc1.put(col)
}

// Check max column size.
freeList := alloc1.pool[getFixedLen(ft)]
require.NotNil(t, freeList)
require.Len(t, freeList, maxFreeColumnsPerType)
}

func TestNoDuplicateColumnReuse(t *testing.T) {
// For issue https://github.com/pingcap/tidb/issues/29554
// Some chunk columns are just references to other chunk columns.
// So when reusing Chunk, some columns may point to the same memory address.

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}
alloc := NewAllocator()
for i := 0; i < maxFreeChunks+10; i++ {
chk := alloc.Alloc(fieldTypes, 5, 10)
chk.MakeRef(1, 3)
}
alloc.Reset()

a := alloc.columnAlloc
// Make sure no duplicated column in the pool.
for _, p := range a.pool {
dup := make(map[*Column]struct{})
for !p.empty() {
c := p.pop()
_, exist := dup[c]
require.False(t, exist)
dup[c] = struct{}{}
}
}
}

func TestAvoidColumnReuse(t *testing.T) {
// For issue: https://github.com/pingcap/tidb/issues/31981
// Some chunk columns are references to rpc message.
// So when reusing Chunk, we should ignore them.

fieldTypes := []*types.FieldType{
{Tp: mysql.TypeVarchar},
{Tp: mysql.TypeJSON},
{Tp: mysql.TypeFloat},
{Tp: mysql.TypeNewDecimal},
{Tp: mysql.TypeDouble},
{Tp: mysql.TypeLonglong},
{Tp: mysql.TypeTimestamp},
{Tp: mysql.TypeDatetime},
}
alloc := NewAllocator()
for i := 0; i < maxFreeChunks+10; i++ {
chk := alloc.Alloc(fieldTypes, 5, 10)
for _, col := range chk.columns {
col.avoidReusing = true
}
}
alloc.Reset()

a := alloc.columnAlloc
// Make sure no duplicated column in the pool.
for _, p := range a.pool {
require.True(t, p.empty())
}

// test decoder will set avoid reusing flag.
chk := alloc.Alloc(fieldTypes, 5, 1024)
for i := 0; i <= 10; i++ {
for _, col := range chk.columns {
col.AppendNull()
}
}
codec := &Codec{fieldTypes}
buf := codec.Encode(chk)

decoder := NewDecoder(
NewChunkWithCapacity(fieldTypes, 0),
fieldTypes,
)
decoder.Reset(buf)
decoder.ReuseIntermChk(chk)
for _, col := range chk.columns {
require.True(t, col.avoidReusing)
}
}
3 changes: 3 additions & 0 deletions util/chunk/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ func (c *Codec) decodeColumn(buffer []byte, col *Column, ordinal int) (remained

// decode data.
col.data = buffer[:numDataBytes:numDataBytes]
// The column reference the data of the grpc response, the memory of the grpc message cannot be GCed if we reuse
// this column. Thus, we set `avoidReusing` to true.
col.avoidReusing = true
return buffer[numDataBytes:]
}

Expand Down
Loading