-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathallocator.go
349 lines (324 loc) · 13.2 KB
/
allocator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
// Copyright 2019 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package colmem
import (
"context"
"fmt"
"time"
"unsafe"
"github.com/cockroachdb/cockroach/pkg/col/coldata"
"github.com/cockroachdb/cockroach/pkg/col/typeconv"
"github.com/cockroachdb/cockroach/pkg/sql/colexecbase/colexecerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/types"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)
// Allocator is a memory management tool for vectorized components. It provides
// new batches (and appends to existing ones) within a fixed memory budget. If
// the budget is exceeded, it will panic with an error.
//
// In the future this can also be used to pool coldata.Vec allocations.
type Allocator struct {
ctx context.Context
acc *mon.BoundAccount
factory coldata.ColumnFactory
}
func selVectorSize(capacity int) int64 {
return int64(capacity * sizeOfInt)
}
func getVecMemoryFootprint(vec coldata.Vec) int64 {
if vec.CanonicalTypeFamily() == types.BytesFamily {
return int64(vec.Bytes().Size())
}
return int64(EstimateBatchSizeBytes([]*types.T{vec.Type()}, vec.Capacity()))
}
func getVecsMemoryFootprint(vecs []coldata.Vec) int64 {
var size int64
for _, dest := range vecs {
size += getVecMemoryFootprint(dest)
}
return size
}
// GetProportionalBatchMemSize returns the memory size of the batch that is
// proportional to given 'length'. This method returns the estimated memory
// footprint *only* of the first 'length' tuples in 'b'.
func GetProportionalBatchMemSize(b coldata.Batch, length int64) int64 {
usesSel := b.Selection() != nil
b.SetSelection(true)
selCapacity := cap(b.Selection())
b.SetSelection(usesSel)
proportionalBatchMemSize := int64(0)
if selCapacity > 0 {
proportionalBatchMemSize = selVectorSize(selCapacity) * length / int64(selCapacity)
}
for _, vec := range b.ColVecs() {
if vec.CanonicalTypeFamily() == types.BytesFamily {
proportionalBatchMemSize += int64(vec.Bytes().ProportionalSize(length))
} else {
proportionalBatchMemSize += getVecMemoryFootprint(vec) * length / int64(vec.Capacity())
}
}
return proportionalBatchMemSize
}
// NewAllocator constructs a new Allocator instance.
func NewAllocator(
ctx context.Context, acc *mon.BoundAccount, factory coldata.ColumnFactory,
) *Allocator {
return &Allocator{
ctx: ctx,
acc: acc,
factory: factory,
}
}
// NewMemBatch allocates a new in-memory coldata.Batch.
func (a *Allocator) NewMemBatch(typs []*types.T) coldata.Batch {
return a.NewMemBatchWithSize(typs, coldata.BatchSize())
}
// NewMemBatchWithSize allocates a new in-memory coldata.Batch with the given
// column size.
func (a *Allocator) NewMemBatchWithSize(typs []*types.T, size int) coldata.Batch {
estimatedMemoryUsage := selVectorSize(size) + int64(EstimateBatchSizeBytes(typs, size))
if err := a.acc.Grow(a.ctx, estimatedMemoryUsage); err != nil {
colexecerror.InternalError(err)
}
return coldata.NewMemBatchWithSize(typs, size, a.factory)
}
// NewMemBatchNoCols creates a "skeleton" of new in-memory coldata.Batch. It
// allocates memory for the selection vector but does *not* allocate any memory
// for the column vectors - those will have to be added separately.
func (a *Allocator) NewMemBatchNoCols(types []*types.T, size int) coldata.Batch {
estimatedMemoryUsage := selVectorSize(size)
if err := a.acc.Grow(a.ctx, estimatedMemoryUsage); err != nil {
colexecerror.InternalError(err)
}
return coldata.NewMemBatchNoCols(types, size)
}
// RetainBatch adds the size of the batch to the memory account. This shouldn't
// need to be used regularly, since most memory accounting necessary is done
// through PerformOperation. Use this if you want to explicitly manage the
// memory accounted for.
// NOTE: when calculating memory footprint, this method looks at the capacities
// of the vectors and does *not* pay attention to the length of the batch.
func (a *Allocator) RetainBatch(b coldata.Batch) {
if b == coldata.ZeroBatch {
// coldata.ZeroBatch takes up no space but also doesn't support the change
// of the selection vector, so we need to handle it separately.
return
}
// We need to get the capacity of the internal selection vector, even if b
// currently doesn't use it, so we set selection to true and will reset
// below.
usesSel := b.Selection() != nil
b.SetSelection(true)
if err := a.acc.Grow(a.ctx, selVectorSize(cap(b.Selection()))+getVecsMemoryFootprint(b.ColVecs())); err != nil {
colexecerror.InternalError(err)
}
b.SetSelection(usesSel)
}
// ReleaseBatch releases the size of the batch from the memory account. This
// shouldn't need to be used regularly, since all accounts are closed by
// Flow.Cleanup. Use this if you want to explicitly manage the memory used. An
// example of a use case is releasing a batch before writing it to disk.
// NOTE: when calculating memory footprint, this method looks at the capacities
// of the vectors and does *not* pay attention to the length of the batch.
func (a *Allocator) ReleaseBatch(b coldata.Batch) {
if b == coldata.ZeroBatch {
// coldata.ZeroBatch takes up no space but also doesn't support the change
// of the selection vector, so we need to handle it separately.
return
}
// We need to get the capacity of the internal selection vector, even if b
// currently doesn't use it, so we set selection to true and will reset
// below.
usesSel := b.Selection() != nil
b.SetSelection(true)
batchMemSize := selVectorSize(cap(b.Selection())) + getVecsMemoryFootprint(b.ColVecs())
a.ReleaseMemory(batchMemSize)
b.SetSelection(usesSel)
}
// NewMemColumn returns a new coldata.Vec, initialized with a length.
func (a *Allocator) NewMemColumn(t *types.T, n int) coldata.Vec {
estimatedMemoryUsage := int64(EstimateBatchSizeBytes([]*types.T{t}, n))
if err := a.acc.Grow(a.ctx, estimatedMemoryUsage); err != nil {
colexecerror.InternalError(err)
}
return coldata.NewMemColumn(t, n, a.factory)
}
// MaybeAppendColumn might append a newly allocated coldata.Vec of the given
// type to b at position colIdx. Behavior of the function depends on how colIdx
// compares to the width of b:
// 1. if colIdx < b.Width(), then we expect that correctly-typed vector is
// already present in position colIdx. If that's not the case, we will panic.
// 2. if colIdx == b.Width(), then we will append a newly allocated coldata.Vec
// of the given type.
// 3. if colIdx > b.Width(), then we will panic because such condition
// indicates an error in setting up vector type enforcers during the planning
// stage.
// NOTE: b must be non-zero length batch.
func (a *Allocator) MaybeAppendColumn(b coldata.Batch, t *types.T, colIdx int) {
if b.Length() == 0 {
colexecerror.InternalError("trying to add a column to zero length batch")
}
width := b.Width()
if colIdx < width {
presentVec := b.ColVec(colIdx)
presentType := presentVec.Type()
if presentType.Identical(t) {
// We already have the vector of the desired type in place.
if presentVec.CanonicalTypeFamily() == types.BytesFamily {
// Flat bytes vector needs to be reset before the vector can be
// reused.
presentVec.Bytes().Reset()
}
return
}
if presentType.Family() == types.UnknownFamily {
// We already have an unknown vector in place. If this is expected,
// then it will not be accessed and we're good; if this is not
// expected, then an error will occur later.
return
}
// We have a vector with an unexpected type, so we panic.
colexecerror.InternalError(errors.Errorf(
"trying to add a column of %s type at index %d but %s vector already present",
t, colIdx, presentType,
))
} else if colIdx > width {
// We have a batch of unexpected width which indicates an error in the
// planning stage.
colexecerror.InternalError(errors.Errorf(
"trying to add a column of %s type at index %d but batch has width %d",
t, colIdx, width,
))
}
estimatedMemoryUsage := int64(EstimateBatchSizeBytes([]*types.T{t}, coldata.BatchSize()))
if err := a.acc.Grow(a.ctx, estimatedMemoryUsage); err != nil {
colexecerror.InternalError(err)
}
b.AppendCol(a.NewMemColumn(t, coldata.BatchSize()))
}
// PerformOperation executes 'operation' (that somehow modifies 'destVecs') and
// updates the memory account accordingly.
// NOTE: if some columnar vectors are not modified, they should not be included
// in 'destVecs' to reduce the performance hit of memory accounting.
func (a *Allocator) PerformOperation(destVecs []coldata.Vec, operation func()) {
before := getVecsMemoryFootprint(destVecs)
// To simplify the accounting, we perform the operation first and then will
// update the memory account. The minor "drift" in accounting that is
// caused by this approach is ok.
operation()
after := getVecsMemoryFootprint(destVecs)
a.AdjustMemoryUsage(after - before)
}
// GetAccount returns the memory account that this allocator is working with.
func (a *Allocator) GetAccount() *mon.BoundAccount {
return a.acc
}
// Used returns the number of bytes currently allocated through this allocator.
func (a *Allocator) Used() int64 {
return a.acc.Used()
}
// AdjustMemoryUsage adjusts the number of bytes currently allocated through
// this allocator by delta bytes (which can be both positive or negative).
func (a *Allocator) AdjustMemoryUsage(delta int64) {
if delta > 0 {
if err := a.acc.Grow(a.ctx, delta); err != nil {
colexecerror.InternalError(err)
}
} else {
a.ReleaseMemory(-delta)
}
}
// ReleaseMemory reduces the number of bytes currently allocated through this
// allocator by (at most) size bytes. size must be non-negative.
func (a *Allocator) ReleaseMemory(size int64) {
if size < 0 {
colexecerror.InternalError(fmt.Sprintf("unexpectedly negative size in ReleaseMemory: %d", size))
}
if size > a.acc.Used() {
size = a.acc.Used()
}
a.acc.Shrink(a.ctx, size)
}
const (
sizeOfBool = int(unsafe.Sizeof(true))
sizeOfInt = int(unsafe.Sizeof(int(0)))
sizeOfInt16 = int(unsafe.Sizeof(int16(0)))
sizeOfInt32 = int(unsafe.Sizeof(int32(0)))
sizeOfInt64 = int(unsafe.Sizeof(int64(0)))
sizeOfFloat64 = int(unsafe.Sizeof(float64(0)))
sizeOfTime = int(unsafe.Sizeof(time.Time{}))
sizeOfDuration = int(unsafe.Sizeof(duration.Duration{}))
sizeOfDatum = int(unsafe.Sizeof(tree.Datum(nil)))
)
// SizeOfBatchSizeSelVector is the size (in bytes) of a selection vector of
// coldata.BatchSize() length.
var SizeOfBatchSizeSelVector = coldata.BatchSize() * sizeOfInt
// EstimateBatchSizeBytes returns an estimated amount of bytes needed to
// store a batch in memory that has column types vecTypes.
// WARNING: This only is correct for fixed width types, and returns an
// estimate for non fixed width types. In future it might be possible to
// remove the need for estimation by specifying batch sizes in terms of bytes.
func EstimateBatchSizeBytes(vecTypes []*types.T, batchLength int) int {
// acc represents the number of bytes to represent a row in the batch.
acc := 0
for _, t := range vecTypes {
switch typeconv.TypeFamilyToCanonicalTypeFamily(t.Family()) {
case types.BoolFamily:
acc += sizeOfBool
case types.BytesFamily:
// For byte arrays, we initially allocate BytesInitialAllocationFactor
// number of bytes (plus an int32 for the offset) for each row, so we use
// the sum of two values as the estimate. However, later, the exact
// memory footprint will be used: whenever a modification of Bytes takes
// place, the Allocator will measure the old footprint and the updated
// one and will update the memory account accordingly.
acc += coldata.BytesInitialAllocationFactor + sizeOfInt32
case types.IntFamily:
switch t.Width() {
case 16:
acc += sizeOfInt16
case 32:
acc += sizeOfInt32
default:
acc += sizeOfInt64
}
case types.FloatFamily:
acc += sizeOfFloat64
case types.DecimalFamily:
// Similar to byte arrays, we can't tell how much space is used
// to hold the arbitrary precision decimal objects.
acc += 50
case types.TimestampTZFamily:
// time.Time consists of two 64 bit integers and a pointer to
// time.Location. We will only account for this 3 bytes without paying
// attention to the full time.Location struct. The reason is that it is
// likely that time.Location's are cached and are shared among all the
// timestamps, so if we were to include that in the estimation, we would
// significantly overestimate.
// TODO(yuzefovich): figure out whether the caching does take place.
acc += sizeOfTime
case types.IntervalFamily:
acc += sizeOfDuration
case typeconv.DatumVecCanonicalTypeFamily:
// In datum vec we need to account for memory underlying the struct
// that is the implementation of tree.Datum interface (for example,
// tree.DBoolFalse) as well as for the overhead of storing that
// implementation in the slice of tree.Datums.
implementationSize, _ := tree.DatumTypeSize(t)
acc += int(implementationSize) + sizeOfDatum
default:
colexecerror.InternalError(fmt.Sprintf("unhandled type %s", t))
}
}
return acc * batchLength
}