Skip to content

Commit

Permalink
[Multi-stage] Only track max joined rows within each block
Browse files Browse the repository at this point in the history
  • Loading branch information
Jackie-Jiang committed Sep 11, 2024
1 parent 83a71d2 commit 845e9f6
Showing 1 changed file with 44 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,6 @@ public class HashJoinOperator extends MultiStageOperator {
private final JoinOverFlowMode _joinOverflowMode;

private boolean _isHashTableBuilt;
private int _currentRowsInHashTable;
private int _currentJoinedRows;
private TransferableBlock _upstreamErrorBlock;
private MultiStageQueryStats _leftSideStats;
private MultiStageQueryStats _rightSideStats;
Expand Down Expand Up @@ -150,8 +148,6 @@ public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator left
PlanNode.NodeHint nodeHint = node.getNodeHint();
_maxRowsInJoin = getMaxRowsInJoin(metadata, nodeHint);
_joinOverflowMode = getJoinOverflowMode(metadata, nodeHint);
_currentRowsInHashTable = 0;
_currentJoinedRows = 0;
}

@Override
Expand Down Expand Up @@ -225,17 +221,18 @@ protected TransferableBlock getNextBlock()
private void buildBroadcastHashTable()
throws ProcessingException {
long startTime = System.currentTimeMillis();
int numRowsInHashTable = 0;
TransferableBlock rightBlock = _rightInput.nextBlock();
while (!TransferableBlockUtils.isEndOfStream(rightBlock)) {
List<Object[]> container = rightBlock.getContainer();
// Row based overflow check.
if (container.size() + _currentRowsInHashTable > _maxRowsInJoin) {
if (container.size() + numRowsInHashTable > _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot build in memory hash table for join operator, "
+ "reached number of rows limit: " + _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot build in memory hash table for join operator, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Just fill up the buffer.
int remainingRows = _maxRowsInJoin - _currentRowsInHashTable;
int remainingRows = _maxRowsInJoin - numRowsInHashTable;
container = container.subList(0, remainingRows);
_statMap.merge(StatKey.MAX_ROWS_IN_JOIN_REACHED, true);
// setting only the rightTableOperator to be early terminated and awaits EOS block next.
Expand All @@ -252,7 +249,7 @@ private void buildBroadcastHashTable()
}
hashCollection.add(row);
}
_currentRowsInHashTable += container.size();
numRowsInHashTable += container.size();
sampleAndCheckInterruption();
rightBlock = _rightInput.nextBlock();
}
Expand Down Expand Up @@ -319,25 +316,6 @@ private List<Object[]> buildJoinedRows(TransferableBlock leftBlock)
}
}

private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
throws ProcessingException {
List<Object[]> container = leftBlock.getContainer();
Expand All @@ -349,7 +327,7 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
List<Object[]> rightRows = _broadcastRightTable.get(key);
if (rightRows == null) {
if (needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
Expand All @@ -359,13 +337,15 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
boolean hasMatchForLeftRow = false;
int numRightRows = rightRows.size();
rows.ensureCapacity(rows.size() + numRightRows);
boolean maxRowsLimitReached = false;
for (int i = 0; i < numRightRows; i++) {
Object[] rightRow = rightRows.get(i);
// TODO: Optimize this to avoid unnecessary object copy.
Object[] resultRow = joinRow(leftRow, rightRow);
if (_nonEquiEvaluators.isEmpty() || _nonEquiEvaluators.stream()
.allMatch(evaluator -> BooleanUtils.isTrueInternalValue(evaluator.apply(resultRow)))) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
maxRowsLimitReached = true;
break;
}
rows.add(resultRow);
Expand All @@ -375,11 +355,11 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
}
}
}
if (_currentJoinedRows > _maxRowsInJoin) {
if (maxRowsLimitReached) {
break;
}
if (!hasMatchForLeftRow && needUnmatchedLeftRows()) {
if (incrementJoinedRowsAndCheckLimit()) {
if (isMaxRowsLimitReached(rows.size())) {
break;
}
rows.add(joinRow(leftRow, null));
Expand All @@ -389,18 +369,29 @@ private List<Object[]> buildJoinedDataBlockDefault(TransferableBlock leftBlock)
return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock)
throws ProcessingException {
private List<Object[]> buildJoinedDataBlockSemi(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// SEMI-JOIN only checks existence of the key
if (_broadcastRightTable.containsKey(key)) {
rows.add(joinRow(leftRow, null));
}
}

return rows;
}

private List<Object[]> buildJoinedDataBlockAnti(TransferableBlock leftBlock) {
List<Object[]> container = leftBlock.getContainer();
List<Object[]> rows = new ArrayList<>(container.size());

for (Object[] leftRow : container) {
Object key = _leftKeySelector.getKey(leftRow);
// ANTI-JOIN only checks non-existence of the key
if (!_broadcastRightTable.containsKey(key)) {
if (incrementJoinedRowsAndCheckLimit()) {
break;
}
rows.add(joinRow(leftRow, null));
}
}
Expand Down Expand Up @@ -475,18 +466,17 @@ private void earlyTerminateLeftInput() {
}

/**
* Increments {@link #_currentJoinedRows} and checks if the limit has been exceeded. If the limit has been exceeded,
* either an exception is thrown or the left input is early terminated based on the {@link #_joinOverflowMode}.
* Checks if we have reached the rows limit for joined rows. If the limit has been reached, either an exception is
* thrown or the left input is early terminated based on the {@link #_joinOverflowMode}.
*
* @return {@code true} if the limit has been exceeded, {@code false} otherwise
* @return {@code true} if the limit has been reached, {@code false} otherwise.
*/
private boolean incrementJoinedRowsAndCheckLimit()
private boolean isMaxRowsLimitReached(int numJoinedRows)
throws ProcessingException {
_currentJoinedRows++;
if (_currentJoinedRows > _maxRowsInJoin) {
if (numJoinedRows == _maxRowsInJoin) {
if (_joinOverflowMode == JoinOverFlowMode.THROW) {
throwProcessingExceptionForJoinRowLimitExceeded("Cannot process join, reached number of rows limit: "
+ _maxRowsInJoin);
throwProcessingExceptionForJoinRowLimitExceeded(
"Cannot process join, reached number of rows limit: " + _maxRowsInJoin);
} else {
// Skip over remaining blocks until we reach the end of stream since we already breached the rows limit.
logger().info("Terminating join operator early as the maximum number of rows limit was reached: {}",
Expand All @@ -504,15 +494,15 @@ private void throwProcessingExceptionForJoinRowLimitExceeded(String reason)
throws ProcessingException {
ProcessingException resourceLimitExceededException =
new ProcessingException(QueryException.SERVER_RESOURCE_LIMIT_EXCEEDED_ERROR_CODE);
resourceLimitExceededException.setMessage(
reason + ". Consider increasing the limit for the maximum number of rows in a join either via the query "
+ "option '" + CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'. Alternatively, if partial results are acceptable, the join"
+ " overflow mode can be set to '" + JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '"
+ PinotHintOptions.JOIN_HINT_OPTIONS + "'.");
resourceLimitExceededException.setMessage(reason
+ ". Consider increasing the limit for the maximum number of rows in a join either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.MAX_ROWS_IN_JOIN + "' or the '"
+ PinotHintOptions.JoinHintOptions.MAX_ROWS_IN_JOIN + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'. Alternatively, if partial results are acceptable, the join overflow mode can be set to '"
+ JoinOverFlowMode.BREAK.name() + "' either via the query option '"
+ CommonConstants.Broker.Request.QueryOptionKey.JOIN_OVERFLOW_MODE + "' or the '"
+ PinotHintOptions.JoinHintOptions.JOIN_OVERFLOW_MODE + "' hint in the '" + PinotHintOptions.JOIN_HINT_OPTIONS
+ "'.");
throw resourceLimitExceededException;
}

Expand Down

0 comments on commit 845e9f6

Please sign in to comment.