From faef76dc1e65ef853981df011d263fd1dce4db09 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Alfonso=20Subiotto=20Marqu=C3=A9s?= Date: Wed, 27 Sep 2017 15:47:50 -0400 Subject: [PATCH] distsql: pre-reserve memory needed to mark rows in HashJoiner build phase A situation was uncovered by #18600, where the HashJoiner would run out of memory in the probe phase. This was because we had made an assumption that we wouldn't hit a memory limit if the buffer phase filled up at most 2/3 of the limit with both streams , since the marking infrastructure would take up only a fraction of 1/3 (the chosen stream). This assumption failed to take into account other limits shared with other processors. This change pre-reserves the memory needed for the probe phase in the build phase so that we can keep a single point in the code where we fall back to disk while not relying on any limit assumptions. --- pkg/sql/distsqlrun/hash_row_container.go | 27 +++- pkg/sql/distsqlrun/hashjoiner.go | 182 +++++++++++++---------- pkg/sql/distsqlrun/hashjoiner_test.go | 21 +-- 3 files changed, 139 insertions(+), 91 deletions(-) diff --git a/pkg/sql/distsqlrun/hash_row_container.go b/pkg/sql/distsqlrun/hash_row_container.go index bf7e2adeecf8..003c29a8ca70 100644 --- a/pkg/sql/distsqlrun/hash_row_container.go +++ b/pkg/sql/distsqlrun/hash_row_container.go @@ -119,9 +119,7 @@ const sizeOfBool = int64(unsafe.Sizeof(false)) // memRowContainer corresponding to matching rows. // NOTE: Once a row is marked, adding more rows to the hashMemRowContainer // results in undefined behavior. It is not necessary to do otherwise for the -// current usage of hashMemRowContainer and allows us to assume that a memory -// error can only occur at the start of the marking phase, thus not having to -// deal with half-emitted buckets and marks when falling back to disk. +// current usage of hashMemRowContainer. type hashMemRowContainer struct { *memRowContainer columnEncoder @@ -134,6 +132,11 @@ type hashMemRowContainer struct { // been marked. Used for iterating over unmarked rows. marked []bool + // markMemoryReserved specifies whether the hashMemRowContainer's memory + // account already accounts for the memory needed to mark the rows in the + // hashMemRowContainer. + markMemoryReserved bool + // buckets contains the indices into memRowContainer for a given group // key (which is the encoding of storedEqCols). buckets map[string][]int @@ -219,6 +222,20 @@ func (h *hashMemRowContainer) addRowToBucket( return nil } +// reserveMarkMemoryMaybe is a utility function to grow the +// hashMemRowContainer's memory account by the memory needed to mark all rows. +// It is a noop if h.markMemoryReserved is true. +func (h *hashMemRowContainer) reserveMarkMemoryMaybe(ctx context.Context) error { + if h.markMemoryReserved { + return nil + } + if err := h.bucketsAcc.Grow(ctx, sizeOfBoolSlice+(sizeOfBool*int64(h.Len()))); err != nil { + return err + } + h.markMemoryReserved = true + return nil +} + // hashMemRowBucketIterator iterates over the rows in a bucket. type hashMemRowBucketIterator struct { *hashMemRowContainer @@ -267,8 +284,8 @@ func (i *hashMemRowBucketIterator) Mark(ctx context.Context, mark bool) error { log.Fatal(ctx, "hash mem row container not set up for marking") } if i.marked == nil { - if err := i.bucketsAcc.Grow(ctx, sizeOfBoolSlice+(sizeOfBool*int64(i.Len()))); err != nil { - return err + if !i.markMemoryReserved { + panic("mark memory should have been reserved already") } i.marked = make([]bool, i.Len()) } diff --git a/pkg/sql/distsqlrun/hashjoiner.go b/pkg/sql/distsqlrun/hashjoiner.go index 1d0d66ecc43a..a74406f58f4f 100644 --- a/pkg/sql/distsqlrun/hashjoiner.go +++ b/pkg/sql/distsqlrun/hashjoiner.go @@ -69,7 +69,8 @@ func (p hashJoinPhase) String() string { // choose the right stream and read it and buffer it until the end. // // 2. Build phase: in this phase we build the buckets from the rows stored -// in the first phase. +// in the first phase. If temp storage is enabled and we ran out of memory +// in the buffer phase, this phase falls back to disk. // // 3. Probe phase: in this phase we process all the rows from the other stream // and look for matching rows from the stored stream using the map. @@ -176,16 +177,10 @@ func (h *hashJoiner) Run(ctx context.Context, wg *sync.WaitGroup) { limitedMon.Start(ctx, rowContainerMon, mon.BoundAccount{}) defer limitedMon.Stop(ctx) - // Override initialBufferSize to be a third of this processor's memory + // Override initialBufferSize to be half of this processor's memory // limit. We consume up to h.initialBufferSize bytes from each input - // stream. If the chosen stream is fully consumed and does not go over - // this limit, h.initialBufferSize bytes (the last third) are budgeted - // to construct a hash map from these rows. We do not expect the hash - // map structure to consume more than the memory used to store the rows - // themselves. - // This assumption allows us to only worry about falling back to disk in - // the buffer phase. - h.initialBufferSize = limit / 3 + // stream. + h.initialBufferSize = limit / 2 rowContainerMon = &limitedMon } @@ -195,51 +190,60 @@ func (h *hashJoiner) Run(ctx context.Context, wg *sync.WaitGroup) { defer h.rows[leftSide].Close(ctx) defer h.rows[rightSide].Close(ctx) - var storedRows hashRowContainer - defer func() { - if storedRows != nil { - storedRows.Close(ctx) - } - }() + row, earlyExit, err := h.bufferPhase(ctx) + + bufferPhaseOom := false + if pgErr, ok := pgerror.GetPGCause(err); ok && pgErr.Code == pgerror.CodeOutOfMemoryError { + bufferPhaseOom = true + } - if earlyExit, err := h.bufferPhase(ctx, useTempStorage, &storedRows); earlyExit || err != nil { + // Exit if earlyExit is true or we received an error that is not a memory + // error. + if earlyExit || !(err == nil || bufferPhaseOom) { if err != nil { // We got an error. We still want to drain. Any error encountered while // draining will be swallowed, and the original error will be forwarded to // the consumer. - log.Infof(ctx, "initial buffering phase error %s", err) + log.Infof(ctx, "buffer phase error %s", err) } DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) return } - // From this point, we are done with the source for h.storedSide. - srcToClose := h.leftSource - if h.storedSide == leftSide { - srcToClose = h.rightSource + // Build hashRowContainer. If we didn't get an oom error in the buffer + // phase, the build phase will attempt to build an in-memory representation, + // falling back to disk according to useTempStorage. + storedRows, earlyExit, err := h.buildPhase( + ctx, useTempStorage, !bufferPhaseOom, /* attemptMemoryBuild */ + ) + if earlyExit || err != nil { + if err != nil { + // We got an error. We still want to drain. Any error encountered while + // draining will be swallowed, and the original error will be forwarded to + // the consumer. + log.Infof(ctx, "build phase error %s", err) + } + DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) + return } + defer storedRows.Close(ctx) - // If storedRows is not nil, there was a memory limit reached in the buffer - // phase so we have already fallen back to a disk-based hashRowContainer. - // Otherwise, we have to build an in-memory hashRowContainer from - // h.rows[h.storedSide]. - if storedRows == nil { - storedMemRows := makeHashMemRowContainer(&h.rows[h.storedSide]) - if err := storedMemRows.Init( - ctx, - shouldEmitUnmatchedRow(h.storedSide, h.joinType), - h.rows[h.storedSide].types, - h.eqCols[h.storedSide], - ); err != nil { - // We got an error. We still want to drain. Any error encountered - // while draining will be swallowed, and the original error will be - // forwarded to the consumer. - err = errors.Wrap(err, "error creating hash row container") - log.Info(ctx, err) + // If the buffer phase returned a row, this row has not been added to any + // row container, since it is the row that caused a memory error. We add + // this row to our built container (which is a hashDiskRowContainer in this + // case). + if row != nil { + if err := storedRows.AddRow(ctx, row); err != nil { + log.Infof(ctx, "unable to add row to disk %s", err) DrainAndClose(ctx, h.out.output, err /* cause */, h.leftSource, h.rightSource) return } - storedRows = &storedMemRows + } + + // From this point, we are done with the source for h.storedSide. + srcToClose := h.leftSource + if h.storedSide == leftSide { + srcToClose = h.rightSource } log.VEventf(ctx, 1, "build phase complete") @@ -300,25 +304,58 @@ func (h *hashJoiner) receiveRow( } } -// bufferPhase attempts to read a portion of both streams into memory (up to -// h.initialBufferSize) in the hope that one of them is small and should be used -// as h.storedSide. The phase attempts to consume all the rows from the chosen -// side and falls back to disk if useTempStorage is true and the rows do not -// fit in memory. In this case, an on-disk hash table is constructed from the -// rows and storedRows is set to this hashRowContainer. -// A successful initial buffering phase sets h.storedSide. -func (h *hashJoiner) bufferPhase( - ctx context.Context, useTempStorage bool, storedRows *hashRowContainer, -) (earlyExit bool, _ error) { - row, earlyExit, err := h.bufferPhaseImpl(ctx) - if pgErr, ok := pgerror.GetPGCause(err); earlyExit || !(ok && pgErr.Code == pgerror.CodeOutOfMemoryError) { - return earlyExit, err +// buildPhase constructs our internal hash map of rows seen. This is done +// entirely from one stream: h.storedSide (chosen during initial buffering). +// Arguments: +// - useTempStorage specifies whether the build phase can fall back to temp +// storage if necessary. +// - attemptMemoryBuild specifies whether the build phase should attempt to +// build an in-memory hashRowContainer representation as opposed to +// immediately building an on-disk hashRowContainer. +func (h *hashJoiner) buildPhase( + ctx context.Context, useTempStorage bool, attemptMemoryBuild bool, +) (_ hashRowContainer, earlyExit bool, _ error) { + if attemptMemoryBuild { + storedMemRows := makeHashMemRowContainer(&h.rows[h.storedSide]) + shouldMark := shouldEmitUnmatchedRow(h.storedSide, h.joinType) + if err := storedMemRows.Init( + ctx, + shouldMark, + h.rows[h.storedSide].types, + h.eqCols[h.storedSide], + ); err != nil { + return nil, false, err + } + if !shouldMark { + return &storedMemRows, false, nil + } + // If we should mark, we pre-reserve the memory needed to mark + // in-memory rows in this phase to not have to worry about hitting a + // memory limit in the probe phase. + err := storedMemRows.reserveMarkMemoryMaybe(ctx) + if h.testingKnobMemFailPoint == build && rand.Float64() < h.testingKnobFailProbability { + err = pgerror.NewErrorf( + pgerror.CodeOutOfMemoryError, + "%s test induced error", + h.testingKnobMemFailPoint, + ) + } + if err == nil { + return &storedMemRows, false, nil + } + storedMemRows.Close(ctx) + if pgErr, ok := pgerror.GetPGCause(err); !(ok && pgErr.Code == pgerror.CodeOutOfMemoryError) { + return nil, false, err + } } + if !useTempStorage { - return false, errors.Wrap(err, "external storage for large queries disabled") + return nil, false, errors.New( + "attempted to fall back to disk but external storage for large queries disabled", + ) } - log.VEventf(ctx, 2, "buffer phase falling back to disk") + log.VEventf(ctx, 2, "build phase falling back to disk") storedDiskRows := makeHashDiskRowContainer(h.flowCtx.diskMonitor, h.flowCtx.TempStorage) if err := storedDiskRows.Init( @@ -327,14 +364,7 @@ func (h *hashJoiner) bufferPhase( h.rows[h.storedSide].types, h.eqCols[h.storedSide], ); err != nil { - return false, err - } - - // Add the row that caused the memory error. - if row != nil { - if err := storedDiskRows.AddRow(ctx, row); err != nil { - return false, err - } + return nil, false, err } // Transfer rows from memory. @@ -342,24 +372,22 @@ func (h *hashJoiner) bufferPhase( defer i.Close() for i.Rewind(); ; i.Next() { if err := h.cancelChecker.Check(); err != nil { - return false, err + return nil, false, err } if ok, err := i.Valid(); err != nil { - return false, err + return nil, false, err } else if !ok { break } memRow, err := i.Row() if err != nil { - return false, err + return nil, false, err } if err := storedDiskRows.AddRow(ctx, memRow); err != nil { - return false, err + return nil, false, err } } - *storedRows = &storedDiskRows - // Finish consuming the chosen source. source := h.rightSource if h.storedSide == leftSide { @@ -367,23 +395,25 @@ func (h *hashJoiner) bufferPhase( } for { if err := h.cancelChecker.Check(); err != nil { - return false, err + return nil, false, err } row, earlyExit, err := h.receiveRow(ctx, source, h.storedSide) if row == nil { if err != nil { - return false, err + return nil, false, err } - return earlyExit, nil + // Done consuming rows. + return &storedDiskRows, earlyExit, nil } if err := storedDiskRows.AddRow(ctx, row); err != nil { - return false, err + return nil, false, err } } } -// bufferPhaseImpl is an initial phase where we read a portion of both streams, -// in the hope that one of them is small. +// bufferPhase reads a portion of both streams into memory (up to +// h.initialBufferSize) in the hope that one of them is small and should be used +// as h.storedSide. The phase consumes all the rows from the chosen side. // // Rows that contain NULLs on equality columns go straight to the output if it's // an outer join; otherwise they are discarded. @@ -396,7 +426,7 @@ func (h *hashJoiner) bufferPhase( // row would have been added to. // // If earlyExit is set, the output doesn't need more rows. -func (h *hashJoiner) bufferPhaseImpl( +func (h *hashJoiner) bufferPhase( ctx context.Context, ) (row sqlbase.EncDatumRow, earlyExit bool, _ error) { srcs := [2]RowSource{h.leftSource, h.rightSource} diff --git a/pkg/sql/distsqlrun/hashjoiner_test.go b/pkg/sql/distsqlrun/hashjoiner_test.go index b4d8a1b8302f..ce64d2462156 100644 --- a/pkg/sql/distsqlrun/hashjoiner_test.go +++ b/pkg/sql/distsqlrun/hashjoiner_test.go @@ -569,16 +569,17 @@ func TestHashJoiner(t *testing.T) { // Run tests with a probability of the run failing with a memory error. // These verify that the hashJoiner falls back to disk correctly in all // cases. - for i := 0; i < 5; i++ { - memFailPoint := buffer - t.Run(fmt.Sprintf("MemFailPoint=%s", memFailPoint), func(t *testing.T) { - if err := testFunc(t, func(h *hashJoiner) { - h.testingKnobMemFailPoint = memFailPoint - h.testingKnobFailProbability = 0.5 - }); err != nil { - t.Fatal(err) - } - }) + for _, memFailPoint := range []hashJoinPhase{buffer, build} { + for i := 0; i < 5; i++ { + t.Run(fmt.Sprintf("MemFailPoint=%s", memFailPoint), func(t *testing.T) { + if err := testFunc(t, func(h *hashJoiner) { + h.testingKnobMemFailPoint = memFailPoint + h.testingKnobFailProbability = 0.5 + }); err != nil { + t.Fatal(err) + } + }) + } } // Run test with a variety of memory limits.