Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cherrypick-1.1: distsql: pre-reserve memory needed to mark rows in HashJoiner build phase #18975

Merged
merged 1 commit into from
Oct 3, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions pkg/sql/distsqlrun/hash_row_container.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,7 @@ const sizeOfBool = int64(unsafe.Sizeof(false))
// memRowContainer corresponding to matching rows.
// NOTE: Once a row is marked, adding more rows to the hashMemRowContainer
// results in undefined behavior. It is not necessary to do otherwise for the
// current usage of hashMemRowContainer and allows us to assume that a memory
// error can only occur at the start of the marking phase, thus not having to
// deal with half-emitted buckets and marks when falling back to disk.
// current usage of hashMemRowContainer.
type hashMemRowContainer struct {
*memRowContainer
columnEncoder
Expand All @@ -134,6 +132,11 @@ type hashMemRowContainer struct {
// been marked. Used for iterating over unmarked rows.
marked []bool

// markMemoryReserved specifies whether the hashMemRowContainer's memory
// account already accounts for the memory needed to mark the rows in the
// hashMemRowContainer.
markMemoryReserved bool

// buckets contains the indices into memRowContainer for a given group
// key (which is the encoding of storedEqCols).
buckets map[string][]int
Expand Down Expand Up @@ -219,6 +222,20 @@ func (h *hashMemRowContainer) addRowToBucket(
return nil
}

// reserveMarkMemoryMaybe is a utility function to grow the
// hashMemRowContainer's memory account by the memory needed to mark all rows.
// It is a noop if h.markMemoryReserved is true.
func (h *hashMemRowContainer) reserveMarkMemoryMaybe(ctx context.Context) error {
if h.markMemoryReserved {
return nil
}
if err := h.bucketsAcc.Grow(ctx, sizeOfBoolSlice+(sizeOfBool*int64(h.Len()))); err != nil {
return err
}
h.markMemoryReserved = true
return nil
}

// hashMemRowBucketIterator iterates over the rows in a bucket.
type hashMemRowBucketIterator struct {
*hashMemRowContainer
Expand Down Expand Up @@ -267,8 +284,8 @@ func (i *hashMemRowBucketIterator) Mark(ctx context.Context, mark bool) error {
log.Fatal(ctx, "hash mem row container not set up for marking")
}
if i.marked == nil {
if err := i.bucketsAcc.Grow(ctx, sizeOfBoolSlice+(sizeOfBool*int64(i.Len()))); err != nil {
return err
if !i.markMemoryReserved {
panic("mark memory should have been reserved already")
}
i.marked = make([]bool, i.Len())
}
Expand Down
182 changes: 106 additions & 76 deletions pkg/sql/distsqlrun/hashjoiner.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,8 @@ func (p hashJoinPhase) String() string {
// choose the right stream and read it and buffer it until the end.
//
// 2. Build phase: in this phase we build the buckets from the rows stored
// in the first phase.
// in the first phase. If temp storage is enabled and we ran out of memory
// in the buffer phase, this phase falls back to disk.
//
// 3. Probe phase: in this phase we process all the rows from the other stream
// and look for matching rows from the stored stream using the map.
Expand Down Expand Up @@ -176,16 +177,10 @@ func (h *hashJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
limitedMon.Start(ctx, rowContainerMon, mon.BoundAccount{})
defer limitedMon.Stop(ctx)

// Override initialBufferSize to be a third of this processor's memory
// Override initialBufferSize to be half of this processor's memory
// limit. We consume up to h.initialBufferSize bytes from each input
// stream. If the chosen stream is fully consumed and does not go over
// this limit, h.initialBufferSize bytes (the last third) are budgeted
// to construct a hash map from these rows. We do not expect the hash
// map structure to consume more than the memory used to store the rows
// themselves.
// This assumption allows us to only worry about falling back to disk in
// the buffer phase.
h.initialBufferSize = limit / 3
// stream.
h.initialBufferSize = limit / 2

rowContainerMon = &limitedMon
}
Expand All @@ -195,51 +190,60 @@ func (h *hashJoiner) Run(ctx context.Context, wg *sync.WaitGroup) {
defer h.rows[leftSide].Close(ctx)
defer h.rows[rightSide].Close(ctx)

var storedRows hashRowContainer
defer func() {
if storedRows != nil {
storedRows.Close(ctx)
}
}()
row, earlyExit, err := h.bufferPhase(ctx)

bufferPhaseOom := false
if pgErr, ok := pgerror.GetPGCause(err); ok && pgErr.Code == pgerror.CodeOutOfMemoryError {
bufferPhaseOom = true
}

if earlyExit, err := h.bufferPhase(ctx, useTempStorage, &storedRows); earlyExit || err != nil {
// Exit if earlyExit is true or we received an error that is not a memory
// error.
if earlyExit || !(err == nil || bufferPhaseOom) {
if err != nil {
// We got an error. We still want to drain. Any error encountered while
// draining will be swallowed, and the original error will be forwarded to
// the consumer.
log.Infof(ctx, "initial buffering phase error %s", err)
log.Infof(ctx, "buffer phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
return
}

// From this point, we are done with the source for h.storedSide.
srcToClose := h.leftSource
if h.storedSide == leftSide {
srcToClose = h.rightSource
// Build hashRowContainer. If we didn't get an oom error in the buffer
// phase, the build phase will attempt to build an in-memory representation,
// falling back to disk according to useTempStorage.
storedRows, earlyExit, err := h.buildPhase(
ctx, useTempStorage, !bufferPhaseOom, /* attemptMemoryBuild */
)
if earlyExit || err != nil {
if err != nil {
// We got an error. We still want to drain. Any error encountered while
// draining will be swallowed, and the original error will be forwarded to
// the consumer.
log.Infof(ctx, "build phase error %s", err)
}
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
return
}
defer storedRows.Close(ctx)

// If storedRows is not nil, there was a memory limit reached in the buffer
// phase so we have already fallen back to a disk-based hashRowContainer.
// Otherwise, we have to build an in-memory hashRowContainer from
// h.rows[h.storedSide].
if storedRows == nil {
storedMemRows := makeHashMemRowContainer(&h.rows[h.storedSide])
if err := storedMemRows.Init(
ctx,
shouldEmitUnmatchedRow(h.storedSide, h.joinType),
h.rows[h.storedSide].types,
h.eqCols[h.storedSide],
); err != nil {
// We got an error. We still want to drain. Any error encountered
// while draining will be swallowed, and the original error will be
// forwarded to the consumer.
err = errors.Wrap(err, "error creating hash row container")
log.Info(ctx, err)
// If the buffer phase returned a row, this row has not been added to any
// row container, since it is the row that caused a memory error. We add
// this row to our built container (which is a hashDiskRowContainer in this
// case).
if row != nil {
if err := storedRows.AddRow(ctx, row); err != nil {
log.Infof(ctx, "unable to add row to disk %s", err)
DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource)
return
}
storedRows = &storedMemRows
}

// From this point, we are done with the source for h.storedSide.
srcToClose := h.leftSource
if h.storedSide == leftSide {
srcToClose = h.rightSource
}

log.VEventf(ctx, 1, "build phase complete")
Expand Down Expand Up @@ -300,25 +304,58 @@ func (h *hashJoiner) receiveRow(
}
}

// bufferPhase attempts to read a portion of both streams into memory (up to
// h.initialBufferSize) in the hope that one of them is small and should be used
// as h.storedSide. The phase attempts to consume all the rows from the chosen
// side and falls back to disk if useTempStorage is true and the rows do not
// fit in memory. In this case, an on-disk hash table is constructed from the
// rows and storedRows is set to this hashRowContainer.
// A successful initial buffering phase sets h.storedSide.
func (h *hashJoiner) bufferPhase(
ctx context.Context, useTempStorage bool, storedRows *hashRowContainer,
) (earlyExit bool, _ error) {
row, earlyExit, err := h.bufferPhaseImpl(ctx)
if pgErr, ok := pgerror.GetPGCause(err); earlyExit || !(ok && pgErr.Code == pgerror.CodeOutOfMemoryError) {
return earlyExit, err
// buildPhase constructs our internal hash map of rows seen. This is done
// entirely from one stream: h.storedSide (chosen during initial buffering).
// Arguments:
// - useTempStorage specifies whether the build phase can fall back to temp
// storage if necessary.
// - attemptMemoryBuild specifies whether the build phase should attempt to
// build an in-memory hashRowContainer representation as opposed to
// immediately building an on-disk hashRowContainer.
func (h *hashJoiner) buildPhase(
ctx context.Context, useTempStorage bool, attemptMemoryBuild bool,
) (_ hashRowContainer, earlyExit bool, _ error) {
if attemptMemoryBuild {
storedMemRows := makeHashMemRowContainer(&h.rows[h.storedSide])
shouldMark := shouldEmitUnmatchedRow(h.storedSide, h.joinType)
if err := storedMemRows.Init(
ctx,
shouldMark,
h.rows[h.storedSide].types,
h.eqCols[h.storedSide],
); err != nil {
return nil, false, err
}
if !shouldMark {
return &storedMemRows, false, nil
}
// If we should mark, we pre-reserve the memory needed to mark
// in-memory rows in this phase to not have to worry about hitting a
// memory limit in the probe phase.
err := storedMemRows.reserveMarkMemoryMaybe(ctx)
if h.testingKnobMemFailPoint == build && rand.Float64() < h.testingKnobFailProbability {
err = pgerror.NewErrorf(
pgerror.CodeOutOfMemoryError,
"%s test induced error",
h.testingKnobMemFailPoint,
)
}
if err == nil {
return &storedMemRows, false, nil
}
storedMemRows.Close(ctx)
if pgErr, ok := pgerror.GetPGCause(err); !(ok && pgErr.Code == pgerror.CodeOutOfMemoryError) {
return nil, false, err
}
}

if !useTempStorage {
return false, errors.Wrap(err, "external storage for large queries disabled")
return nil, false, errors.New(
"attempted to fall back to disk but external storage for large queries disabled",
)
}

log.VEventf(ctx, 2, "buffer phase falling back to disk")
log.VEventf(ctx, 2, "build phase falling back to disk")

storedDiskRows := makeHashDiskRowContainer(h.flowCtx.diskMonitor, h.flowCtx.TempStorage)
if err := storedDiskRows.Init(
Expand All @@ -327,63 +364,56 @@ func (h *hashJoiner) bufferPhase(
h.rows[h.storedSide].types,
h.eqCols[h.storedSide],
); err != nil {
return false, err
}

// Add the row that caused the memory error.
if row != nil {
if err := storedDiskRows.AddRow(ctx, row); err != nil {
return false, err
}
return nil, false, err
}

// Transfer rows from memory.
i := h.rows[h.storedSide].NewIterator(ctx)
defer i.Close()
for i.Rewind(); ; i.Next() {
if err := h.cancelChecker.Check(); err != nil {
return false, err
return nil, false, err
}
if ok, err := i.Valid(); err != nil {
return false, err
return nil, false, err
} else if !ok {
break
}
memRow, err := i.Row()
if err != nil {
return false, err
return nil, false, err
}
if err := storedDiskRows.AddRow(ctx, memRow); err != nil {
return false, err
return nil, false, err
}
}

*storedRows = &storedDiskRows

// Finish consuming the chosen source.
source := h.rightSource
if h.storedSide == leftSide {
source = h.leftSource
}
for {
if err := h.cancelChecker.Check(); err != nil {
return false, err
return nil, false, err
}
row, earlyExit, err := h.receiveRow(ctx, source, h.storedSide)
if row == nil {
if err != nil {
return false, err
return nil, false, err
}
return earlyExit, nil
// Done consuming rows.
return &storedDiskRows, earlyExit, nil
}
if err := storedDiskRows.AddRow(ctx, row); err != nil {
return false, err
return nil, false, err
}
}
}

// bufferPhaseImpl is an initial phase where we read a portion of both streams,
// in the hope that one of them is small.
// bufferPhase reads a portion of both streams into memory (up to
// h.initialBufferSize) in the hope that one of them is small and should be used
// as h.storedSide. The phase consumes all the rows from the chosen side.
//
// Rows that contain NULLs on equality columns go straight to the output if it's
// an outer join; otherwise they are discarded.
Expand All @@ -396,7 +426,7 @@ func (h *hashJoiner) bufferPhase(
// row would have been added to.
//
// If earlyExit is set, the output doesn't need more rows.
func (h *hashJoiner) bufferPhaseImpl(
func (h *hashJoiner) bufferPhase(
ctx context.Context,
) (row sqlbase.EncDatumRow, earlyExit bool, _ error) {
srcs := [2]RowSource{h.leftSource, h.rightSource}
Expand Down
21 changes: 11 additions & 10 deletions pkg/sql/distsqlrun/hashjoiner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -569,16 +569,17 @@ func TestHashJoiner(t *testing.T) {
// Run tests with a probability of the run failing with a memory error.
// These verify that the hashJoiner falls back to disk correctly in all
// cases.
for i := 0; i < 5; i++ {
memFailPoint := buffer
t.Run(fmt.Sprintf("MemFailPoint=%s", memFailPoint), func(t *testing.T) {
if err := testFunc(t, func(h *hashJoiner) {
h.testingKnobMemFailPoint = memFailPoint
h.testingKnobFailProbability = 0.5
}); err != nil {
t.Fatal(err)
}
})
for _, memFailPoint := range []hashJoinPhase{buffer, build} {
for i := 0; i < 5; i++ {
t.Run(fmt.Sprintf("MemFailPoint=%s", memFailPoint), func(t *testing.T) {
if err := testFunc(t, func(h *hashJoiner) {
h.testingKnobMemFailPoint = memFailPoint
h.testingKnobFailProbability = 0.5
}); err != nil {
t.Fatal(err)
}
})
}
}

// Run test with a variety of memory limits.
Expand Down