Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rowexec: allow ordered joinReader to stream matches to the first row #85731

Merged
merged 1 commit into from
Aug 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions pkg/sql/opt/exec/execbuilder/testdata/lookup_join_limit
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ CREATE TABLE a (x INT PRIMARY KEY, y INT, z INT, INDEX (y));
CREATE TABLE b (x INT PRIMARY KEY);
INSERT INTO a VALUES (1, 1, 1), (2, 1, 1), (3, 2, 2), (4, 2, 2);
INSERT INTO b VALUES (1), (2), (3), (4);
CREATE TABLE xy (x INT, y INT, PRIMARY KEY(x, y));
INSERT INTO xy VALUES (1, 1), (1, 2), (1, 3), (2, 1);

# Query with an index join and a limit hint.
query T
Expand Down Expand Up @@ -344,3 +346,93 @@ regions: <hidden>
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: a@a_y_idx
spans: FULL SCAN (SOFT LIMIT)

# Query with a lookup join and a limit. The lookup join has to preserve the
# input ordering.
query T
EXPLAIN (OPT, VERBOSE) SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2
----
limit
├── columns: x:1 y:2 x:6 y:7
├── internal-ordering: +2,+(1|6)
├── cardinality: [0 - 2]
├── stats: [rows=2]
├── cost: 55.99
├── key: (6,7)
├── fd: (1)-->(2), (1)==(6), (6)==(1)
├── ordering: +2,+(1|6) [actual: +2,+1]
├── distribution: test
├── prune: (7)
├── interesting orderings: (+2,+1)
├── inner-join (lookup xy)
│ ├── columns: a.x:1 a.y:2 xy.x:6 xy.y:7
│ ├── flags: force lookup join (into right side)
│ ├── key columns: [1] = [6]
│ ├── stats: [rows=10, distinct(1)=1, null(1)=0, avgsize(1)=1, distinct(6)=1, null(6)=0, avgsize(6)=4]
│ ├── cost: 55.96
│ ├── key: (6,7)
│ ├── fd: (1)-->(2), (1)==(6), (6)==(1)
│ ├── ordering: +2,+(1|6) [actual: +2,+1]
│ ├── limit hint: 2.00
│ ├── distribution: test
│ ├── prune: (2,7)
│ ├── interesting orderings: (+1) (+2,+1) (+6,+7)
│ ├── scan a@a_y_idx
│ │ ├── columns: a.x:1 a.y:2
│ │ ├── stats: [rows=1, distinct(1)=1, null(1)=0, avgsize(1)=1]
│ │ ├── cost: 15.04
│ │ ├── key: (1)
│ │ ├── fd: (1)-->(2)
│ │ ├── ordering: +2,+1
│ │ ├── limit hint: 1.00
│ │ ├── distribution: test
│ │ ├── prune: (1,2)
│ │ ├── interesting orderings: (+1) (+2,+1)
│ │ └── unfiltered-cols: (1-5)
│ └── filters (true)
└── 2

# Perform a lookup join that preserves its input ordering. Make sure that only
# two rows are read from kv.
query T
EXPLAIN ANALYZE SELECT a.x, a.y, xy.x, xy.y FROM a INNER LOOKUP JOIN xy ON xy.x = a.x ORDER BY a.y, a.x LIMIT 2
----
planning time: 10µs
execution time: 100µs
distribution: <hidden>
vectorized: <hidden>
rows read from KV: 5 (40 B, 5 gRPC calls)
maximum memory usage: <hidden>
network usage: <hidden>
regions: <hidden>
·
• limit
│ count: 2
└── • lookup join
│ nodes: <hidden>
│ regions: <hidden>
│ actual row count: 2
│ KV time: 0µs
│ KV contention time: 0µs
│ KV rows read: 2
│ KV bytes read: 16 B
│ KV gRPC calls: 2
│ estimated max memory allocated: 0 B
│ estimated max sql temp disk usage: 0 B
│ table: xy@xy_pkey
│ equality: (x) = (x)
└── • scan
nodes: <hidden>
regions: <hidden>
actual row count: 3
KV time: 0µs
KV contention time: 0µs
KV rows read: 3
KV bytes read: 24 B
KV gRPC calls: 3
estimated max memory allocated: 0 B
estimated row count: 1 (100% of the table; stats collected <hidden> ago)
table: a@a_y_idx
spans: FULL SCAN (SOFT LIMIT)
73 changes: 64 additions & 9 deletions pkg/sql/rowexec/joinreader_strategies.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,16 @@ type joinReaderOrderingStrategy struct {
// outputRowIdx contains the index into the inputRowIdx'th row of
// inputRowIdxToLookedUpRowIndices that we're about to emit.
outputRowIdx int
// notBufferedRow, if non-nil, contains a looked-up row that matches the
// first input row of the batch. Since joinReaderOrderingStrategy returns
// results in input order, it is safe to return looked-up rows that match
// the first input row immediately.
// TODO(drewk): If we had a way of knowing when no more lookups will be
// performed for a given span ID, it would be possible to start immediately
// returning results for the second row once the first was finished, and so
// on. This could significantly decrease the overhead of buffering looked
// up rows.
notBufferedRow rowenc.EncDatumRow
}

groupingState *inputBatchGroupingState
Expand All @@ -527,6 +537,10 @@ type joinReaderOrderingStrategy struct {
// inputRowIdxToLookedUpRowIndices is a 1:1 mapping with the multimap
// with the same name, where each int64 indicates the memory usage of
// the corresponding []int that is currently registered with memAcc.
//
// Note that inputRowIdxToLookedUpRowIndices does not contain entries for
// the first input row, because matches to the first row are emitted
// immediately.
inputRowIdxToLookedUpRowIndices []int64
}

Expand Down Expand Up @@ -619,8 +633,11 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
ctx context.Context, row rowenc.EncDatumRow, spanID int,
) (joinReaderState, error) {
matchingInputRowIndices := s.getMatchingRowIndices(spanID)

// Avoid adding to the buffer if only the first input row was matched, since
// in this case we can just output the row immediately.
var containerIdx int
if !s.isPartialJoin {
if !s.isPartialJoin && (len(matchingInputRowIndices) != 1 || matchingInputRowIndices[0] != 0) {
// Replace missing values with nulls to appease the row container.
for i := range row {
if row[i].IsUnset() {
Expand All @@ -637,6 +654,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// Update our map from input rows to looked up rows.
for _, inputRowIdx := range matchingInputRowIndices {
if !s.isPartialJoin {
if inputRowIdx == 0 {
// Don't add to inputRowIdxToLookedUpRowIndices in order to avoid
// emitting more than once.
s.emitCursor.notBufferedRow = row
continue
}
s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = append(
s.inputRowIdxToLookedUpRowIndices[inputRowIdx], containerIdx)
continue
Expand All @@ -656,7 +679,14 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
// We failed our on-condition.
continue
}
s.groupingState.setMatched(inputRowIdx)
wasMatched := s.groupingState.setMatched(inputRowIdx)
if !wasMatched && inputRowIdx == 0 {
// This looked up row matches the first row, and we haven't seen a match
// for the first row yet. Don't add to inputRowIdxToLookedUpRowIndices
// in order to avoid emitting more than once.
s.emitCursor.notBufferedRow = row
continue
}
s.inputRowIdxToLookedUpRowIndices[inputRowIdx] = partialJoinSentinel
}
}
Expand All @@ -673,6 +703,12 @@ func (s *joinReaderOrderingStrategy) processLookedUpRow(
return jrStateUnknown, err
}

if s.emitCursor.notBufferedRow != nil {
// The looked up row matched the first input row. Render and emit them
// immediately, then return to performing lookups.
return jrEmittingRows, nil
}

return jrPerformingLookup, nil
}

Expand All @@ -699,7 +735,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(

inputRow := s.inputRows[s.emitCursor.inputRowIdx]
lookedUpRows := s.inputRowIdxToLookedUpRowIndices[s.emitCursor.inputRowIdx]
if s.emitCursor.outputRowIdx >= len(lookedUpRows) {
if s.emitCursor.notBufferedRow == nil && s.emitCursor.outputRowIdx >= len(lookedUpRows) {
// We have no more rows for the current input row. Emit an outer or anti
// row if we didn't see a match, and bump to the next input row.
inputRowIdx := s.emitCursor.inputRowIdx
Expand All @@ -725,20 +761,39 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
return nil, jrEmittingRows, nil
}

lookedUpRowIdx := lookedUpRows[s.emitCursor.outputRowIdx]
s.emitCursor.outputRowIdx++
var nextState joinReaderState
if s.emitCursor.notBufferedRow != nil {
// Make sure we return to looking up rows after outputting one that matches
// the first input row.
nextState = jrPerformingLookup
defer func() { s.emitCursor.notBufferedRow = nil }()
} else {
// All lookups have finished, and we are currently iterating through the
// input rows and emitting them.
nextState = jrEmittingRows
defer func() { s.emitCursor.outputRowIdx++ }()
}

switch s.joinType {
case descpb.LeftSemiJoin:
// A semi-join match means we emit our input row. This is the case where
// we used the partialJoinSentinel.
return inputRow, jrEmittingRows, nil
return inputRow, nextState, nil
case descpb.LeftAntiJoin:
// An anti-join match means we emit nothing. This is the case where
// we used the partialJoinSentinel.
return nil, jrEmittingRows, nil
return nil, nextState, nil
}

lookedUpRow, err := s.lookedUpRows.GetRow(s.Ctx, lookedUpRowIdx, false /* skip */)
var err error
var lookedUpRow rowenc.EncDatumRow
if s.emitCursor.notBufferedRow != nil {
lookedUpRow = s.emitCursor.notBufferedRow
} else {
lookedUpRow, err = s.lookedUpRows.GetRow(
s.Ctx, lookedUpRows[s.emitCursor.outputRowIdx], false, /* skip */
)
}
if err != nil {
return nil, jrStateUnknown, err
}
Expand All @@ -758,7 +813,7 @@ func (s *joinReaderOrderingStrategy) nextRowToEmit(
}
}
}
return outputRow, jrEmittingRows, nil
return outputRow, nextState, nil
}

func (s *joinReaderOrderingStrategy) spilled() bool {
Expand Down
6 changes: 4 additions & 2 deletions pkg/sql/rowexec/joinreader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1288,9 +1288,11 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil {
// DiskBackedIndexedRowContainer.
flowCtx.Cfg.TestingKnobs.MemoryLimitBytes = mon.DefaultPoolAllocationSize

// Input row is just a single 0.
// The two input rows are just a single 0 each. We use two input rows because
// matches to the first input row are never buffered.
inputRows := rowenc.EncDatumRows{
rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}},
rowenc.EncDatumRow{rowenc.EncDatum{Datum: tree.NewDInt(tree.DInt(key))}},
}
var fetchSpec descpb.IndexFetchSpec
if err := rowenc.InitIndexFetchSpec(
Expand Down Expand Up @@ -1341,7 +1343,7 @@ CREATE TABLE test.t (a INT, s STRING, INDEX (a, s))`); err != nil {
require.Equal(t, expected, actual)
count++
}
require.Equal(t, numRows, count)
require.Equal(t, numRows*len(inputRows), count)
require.True(t, jr.(*joinReader).Spilled())
}

Expand Down