Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Multi-stage] Only track max joined rows within each block #13981

Merged
merged 1 commit into from
Sep 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public class HashJoinOperator extends MultiStageOperator {
private final JoinOverFlowMode _joinOverflowMode;

private boolean _isHashTableBuilt;
private int _currentRowsInHashTable;
private int _currentJoinedRows;
private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
Expand Down Expand Up @@ -150,8 +148,6 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left
PlanNode.NodeHint nodeHint = node.getNodeHint();
_maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
_joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
_currentRowsInHashTable = 0;
_currentJoinedRows = 0;
}

@Override
Expand Down Expand Up @@ -225,17 +221,18 @@ protected TransferableBlock getNextBlock()
private void buildBroadcastHashTable()
throws ProcessingException {
long startTime = System.currentTimeMillis();
int numRowsInHashTable = 0;
TransferableBlock rightBlock = _rightInput.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, "
+ "reached number of rows limit: " + _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
int remainingRows = _maxRowsInJoin - numRowsInHashTable;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
Expand All @@ -252,7 +249,7 @@ private void buildBroadcastHashTable()
}
hashCollection.add(row);
}
_currentRowsInHashTable += container.size();
numRowsInHashTable += container.size();
sampleAndCheckInterruption();
rightBlock = _rightInput.nextBlock();
}
Expand Down Expand Up @@ -319,25 +316,6 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock)
}
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
Expand All @@ -349,7 +327,7 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
List<Object[]> rightRows = _broadcastRightTable.get(key);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
Expand All @@ -359,13 +337,15 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
boolean hasMatchForLeftRow = false;
int numRightRows = rightRows.size();
rows.ensureCapacity(rows.size() + numRightRows);
boolean maxRowsLimitReached = false;
for (int i = 0; i < numRightRows; i++) {
Object[] rightRow = rightRows.get(i);
// TODO: Optimize this to avoid unnecessary object copy.
Object[] resultRow = joinRow(leftRow, rightRow);
if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
.allMatch(evaluator -> BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
maxRowsLimitReached = true;
break;
}
rows.add(resultRow);
Expand All @@ -375,11 +355,11 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
}
}
}
if (_currentJoinedRows > _maxRowsInJoin) {
if (maxRowsLimitReached) {
break;
}
if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
Expand All @@ -389,18 +369,29 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
throws ProcessingException {
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
Comment on lines -401 to -403
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't we need the rows limit check for semi and anti joins?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because we want this protection mainly for cross joins and other similar join conditions where the number of joined rows can be much more than the sum of individual rows from the left and right blocks?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are going to apply the limit per block, right? semi and anti join (and I guess inner) cannot produce more rows that the ones received (and I guess we assume each received block will have an acceptable size)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct. We should never run a setting where it allows rows less than a block

rows.add(joinRow(leftRow, null));
}
}
Expand Down Expand Up @@ -475,18 +466,17 @@ private void earlyTerminateLeftInput() {
}

/**
* Increments {@link #_currentJoinedRows} and checks if the limit has been exceeded. If the limit has been exceeded,
* either an exception is thrown or the left input is early terminated based on the {@link #_joinOverflowMode}.
* Checks if we have reached the rows limit for joined rows. If the limit has been reached, either an exception is
* thrown or the left input is early terminated based on the {@link #_joinOverflowMode}.
*
* @return {@code true} if the limit has been exceeded, {@code false} otherwise
* @return {@code true} if the limit has been reached, {@code false} otherwise.
*/
private boolean incrementJoinedRowsAndCheckLimit()
private boolean isMaxRowsLimitReached(int numJoinedRows)
throws ProcessingException {
_currentJoinedRows++;
if (_currentJoinedRows > _maxRowsInJoin) {
if (numJoinedRows == _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
+ _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot process join, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
Expand All @@ -504,15 +494,15 @@ private void throwProcessingExceptionForJoinRowLimitExceeded(String reason)
throws ProcessingException {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query "
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
resourceLimitExceededException.setMessage(reason
+ ". Consider increasing the limit for the maximum number of rows in a join either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'. Alternatively, if partial results are acceptable, the join overflow mode can be set to '"
+ JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'.");
throw resourceLimitExceededException;
}

Expand Down
Loading