Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: increase the batch size for join reader ordering strategy #84433

Merged
merged 1 commit into from
Jul 15, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/information_schema
Original file line number Diff line number Diff line change
Expand Up @@ -4657,7 +4657,7 @@ integer_datetimes on
intervalstyle postgres
intervalstyle_enabled on
is_superuser on
join_reader_ordering_strategy_batch_size 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB
large_full_scan_rows 1000
lc_collate C.UTF-8
lc_ctype C.UTF-8
Expand Down
4 changes: 2 additions & 2 deletions pkg/sql/logictest/testdata/logic_test/pg_catalog
Original file line number Diff line number Diff line change
Expand Up @@ -4186,7 +4186,7 @@ inject_retry_errors_enabled off NULL
integer_datetimes on NULL NULL NULL string
intervalstyle postgres NULL NULL NULL string
is_superuser on NULL NULL NULL string
join_reader_ordering_strategy_batch_size 10 KiB NULL NULL NULL string
join_reader_ordering_strategy_batch_size 100 KiB NULL NULL NULL string
large_full_scan_rows 1000 NULL NULL NULL string
lc_collate C.UTF-8 NULL NULL NULL string
lc_ctype C.UTF-8 NULL NULL NULL string
Expand Down Expand Up @@ -4310,7 +4310,7 @@ inject_retry_errors_enabled off NULL
integer_datetimes on NULL user NULL on on
intervalstyle postgres NULL user NULL postgres postgres
is_superuser on NULL user NULL on on
join_reader_ordering_strategy_batch_size 10 KiB NULL user NULL 10 KiB 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB NULL user NULL 100 KiB 100 KiB
large_full_scan_rows 1000 NULL user NULL 1000 1000
lc_collate C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8
lc_ctype C.UTF-8 NULL user NULL C.UTF-8 C.UTF-8
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/logictest/testdata/logic_test/show_source
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ inject_retry_errors_enabled off
integer_datetimes on
intervalstyle postgres
is_superuser on
join_reader_ordering_strategy_batch_size 10 KiB
join_reader_ordering_strategy_batch_size 100 KiB
large_full_scan_rows 1000
lc_collate C.UTF-8
lc_ctype C.UTF-8
Expand Down
19 changes: 19 additions & 0 deletions pkg/sql/rowexec/joinreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,9 +1047,28 @@ func (jr *joinReader) performLookup() (joinReaderState, *execinfrapb.ProducerMet
log.VEvent(jr.Ctx, 1, "done joining rows")
jr.strategy.prepareToEmit(jr.Ctx)

// Check if the strategy spilled to disk and reduce the batch size if it
// did.
// TODO(yuzefovich): we should probably also grow the batch size bytes limit
// dynamically if we haven't spilled and are not close to spilling (say not
// exceeding half of the memory limit of the disk-backed container), up to
// some limit. (This would only apply to the joinReaderOrderingStrategy
// since other strategies cannot spill in the first place.) Probably it'd be
// good to look at not just the current batch of input rows, but to keep
// some statistics over the last several batches to make a more informed
// decision.
if jr.strategy.spilled() && jr.batchSizeBytes > joinReaderMinBatchSize {
jr.batchSizeBytes = jr.batchSizeBytes / 2
if jr.batchSizeBytes < joinReaderMinBatchSize {
jr.batchSizeBytes = joinReaderMinBatchSize
}
}

return jrEmittingRows, nil
}

const joinReaderMinBatchSize = 10 << 10 /* 10 KiB */

// emitRow returns the next row from jr.toEmit, if present. Otherwise it
// prepares for another input batch.
func (jr *joinReader) emitRow() (
Expand Down
6 changes: 2 additions & 4 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -451,7 +451,7 @@ var partialJoinSentinel = []int{-1}
//
// Say the joinReader looks up rows in order: (red, x), then (blue, y). Once
// (red, x) is fetched, it is handed to
// joinReaderOderingStrategy.processLookedUpRow(), which will match it against
// joinReaderOrderingStrategy.processLookedUpRow(), which will match it against
// all the corresponding input rows, producing (1, x), (4, x). These two rows
// are not emitted because that would violate the input ordering (well, (1, x)
// could be emitted, but we're not smart enough). So, they are buffered until
Expand Down Expand Up @@ -535,7 +535,7 @@ type joinReaderOrderingStrategy struct {
testingInfoSpilled bool
}

const joinReaderOrderingStrategyBatchSizeDefault = 10 << 10 /* 10 KiB */
const joinReaderOrderingStrategyBatchSizeDefault = 100 << 10 /* 100 KiB */

// JoinReaderOrderingStrategyBatchSize determines the size of input batches used
// to construct a single lookup KV batch by joinReaderOrderingStrategy.
Expand All @@ -548,8 +548,6 @@ var JoinReaderOrderingStrategyBatchSize = settings.RegisterByteSizeSetting(
)

func (s *joinReaderOrderingStrategy) getLookupRowsBatchSizeHint(sd *sessiondata.SessionData) int64 {
// TODO(asubiotto): Eventually we might want to adjust this batch size
// dynamically based on whether the result row container spilled or not.
if sd.JoinReaderOrderingStrategyBatchSize == 0 {
// In some tests the session data might not be set - use the default
// value then.
Expand Down