Skip to content

Commit

Permalink
Add a PinotLogicalSortExchange to replace usage of LogicalSortExchang…
Browse files Browse the repository at this point in the history
…e as it will be sender and receiver aware (#10408)

- MailboxSendOperator will be modified later to add sort support.
- Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
  • Loading branch information
somandal authored Mar 16, 2023
1 parent e01df9a commit bc9aa75
Show file tree
Hide file tree
Showing 22 changed files with 1,124 additions and 233 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.calcite.rel.logical;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.SortExchange;


/**
* Pinot's implementation of {@code SortExchange} which needs information about whether to sort on the sender
* and/or receiver side of the exchange. Every {@code Exchange} is broken into a send and a receive node and the
* decision on where to sort is made by the planner and this information has to b passed onto the send and receive
* nodes for the correct execution.
*
* Note: This class does not extend {@code LogicalSortExchange} because its constructor which takes the list of
* parameters is private.
*/
public class PinotLogicalSortExchange extends SortExchange {

protected final boolean _isSortOnSender;
protected final boolean _isSortOnReceiver;

private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
RelNode input, RelDistribution distribution, RelCollation collation,
boolean isSortOnSender, boolean isSortOnReceiver) {
super(cluster, traitSet, input, distribution, collation);
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
}

/**
* Creates a PinotLogicalSortExchange by parsing serialized output.
*/
public PinotLogicalSortExchange(RelInput input) {
super(input);
_isSortOnSender = false;
_isSortOnReceiver = true;
}

/**
* Creates a PinotLogicalSortExchange.
*
* @param input Input relational expression
* @param distribution Distribution specification
* @param collation array of sort specifications
* @param isSortOnSender whether to sort on the sender
* @param isSortOnReceiver whether to sort on receiver
*/
public static PinotLogicalSortExchange create(
RelNode input,
RelDistribution distribution,
RelCollation collation,
boolean isSortOnSender,
boolean isSortOnReceiver) {
RelOptCluster cluster = input.getCluster();
collation = RelCollationTraitDef.INSTANCE.canonize(collation);
distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
RelTraitSet traitSet =
input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
collation, isSortOnSender, isSortOnReceiver);
}

//~ Methods ----------------------------------------------------------------

@Override
public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
RelDistribution newDistribution, RelCollation newCollation) {
return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("isSortOnSender", _isSortOnSender)
.item("isSortOnReceiver", _isSortOnReceiver);
}

public boolean isSortOnSender() {
return _isSortOnSender;
}

public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void onMatch(RelOptRuleCall call) {
public interface Config extends RelRule.Config {

Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
.withOperandFor(LogicalSort.class, LogicalSortExchange.class);
.withOperandFor(LogicalSort.class, PinotLogicalSortExchange.class);

@Override default PinotSortExchangeCopyRule toRule() {
return new PinotSortExchangeCopyRule(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.tools.RelBuilderFactory;


Expand Down Expand Up @@ -62,10 +62,15 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Sort sort = call.rel(0);
LogicalSortExchange exchange = LogicalSortExchange.create(
// TODO: Assess whether sorting is needed on both sender and receiver side or only receiver side. Potentially add
// SqlHint support to determine this. For now setting sort only on receiver side as sender side sorting is
// not yet implemented.
PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create(
sort.getInput(),
RelDistributions.hash(Collections.emptyList()),
sort.getCollation());
sort.getCollation(),
false,
true);
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.LogicalWindow;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilderFactory;

Expand Down Expand Up @@ -86,8 +86,11 @@ public void onMatch(RelOptRuleCall call) {
} else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
// Only ORDER BY
// Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key
LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys);
// TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance.
// Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to
// sort on the sender side is not available thus setting this up to only sort on the receiver.
PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
} else {
Expand All @@ -106,8 +109,12 @@ public void onMatch(RelOptRuleCall call) {
} else {
// PARTITION BY and ORDER BY on different key(s)
// Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys
LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys);
// TODO: ORDER BY only type queries need to be sorted only on the receiver side unless a hint is set indicating
// that the data is already partitioned and sorting can be done on the sender side instead. This way
// sorting on the receiver side can be a no-op. Add support for this hint and pass it on. Until sender
// side sorting is implemented, setting this hint will throw an error on execution.
PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
Expand Down Expand Up @@ -74,12 +77,12 @@ public QueryPlan makePlan(RelRoot relRoot) {
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
globalSenderNode.addInput(globalStageRoot);

StageNode globalReceiverNode =
new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);

QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);

Expand All @@ -100,7 +103,19 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((Exchange) node).getDistribution();
return createSendReceivePair(nextStageRoot, distribution, currentStageId);
RelCollation collation = null;
boolean isSortOnSender = false;
boolean isSortOnReceiver = false;
if (isSortExchangeNode(node)) {
collation = ((SortExchange) node).getCollation();
if (node instanceof PinotLogicalSortExchange) {
// These flags only take meaning if the collation is not null or empty
isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
}
}
return createSendReceivePair(nextStageRoot, distribution, collation, isSortOnSender, isSortOnReceiver,
currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
List<RelNode> inputs = node.getInputs();
Expand All @@ -118,7 +133,8 @@ private void runPhysicalOptimizers(QueryPlan queryPlan) {
}
}

private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, int currentStageId) {
private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, RelCollation collation,
boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
List<Integer> distributionKeys = distribution.getKeys();
RelDistribution.Type exchangeType = distribution.getType();

Expand All @@ -129,9 +145,11 @@ private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution
? new FieldSelectionKeySelector(distributionKeys) : null;

StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
currentStageId, exchangeType, keySelector);
currentStageId, exchangeType, keySelector, collation == null ? null : collation.getFieldCollations(),
isSortOnSender);
StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
nextStageRoot.getStageId(), exchangeType, keySelector,
collation == null ? null : collation.getFieldCollations(), isSortOnSender, isSortOnReceiver, mailboxSender);
mailboxSender.addInput(nextStageRoot);

return mailboxReceiver;
Expand All @@ -141,6 +159,10 @@ private boolean isExchangeNode(RelNode node) {
return (node instanceof Exchange);
}

private boolean isSortExchangeNode(RelNode node) {
return (node instanceof SortExchange);
}

private int getNewStageId() {
return _stageIdCounter++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pinot.query.planner.stage;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;

Expand All @@ -32,6 +39,14 @@ public class MailboxReceiveNode extends AbstractStageNode {
private RelDistribution.Type _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
@ProtoProperties
private List<RexExpression> _collationKeys;
@ProtoProperties
private List<RelFieldCollation.Direction> _collationDirections;
@ProtoProperties
private boolean _isSortOnSender;
@ProtoProperties
private boolean _isSortOnReceiver;

// this is only available during planning and should not be relied
// on in any post-serialization code
Expand All @@ -43,11 +58,26 @@ public MailboxReceiveNode(int stageId) {

public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
StageNode sender) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
if (!CollectionUtils.isEmpty(fieldCollations)) {
_collationKeys = new ArrayList<>(fieldCollations.size());
_collationDirections = new ArrayList<>(fieldCollations.size());
for (RelFieldCollation fieldCollation : fieldCollations) {
_collationDirections.add(fieldCollation.getDirection());
_collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
}
} else {
_collationKeys = Collections.emptyList();
_collationDirections = Collections.emptyList();
}
_isSortOnSender = isSortOnSender;
Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
_isSortOnReceiver = isSortOnReceiver;
_sender = sender;
}

Expand All @@ -67,6 +97,22 @@ public KeySelector<Object[], Object[]> getPartitionKeySelector() {
return _partitionKeySelector;
}

public List<RexExpression> getCollationKeys() {
return _collationKeys;
}

public List<RelFieldCollation.Direction> getCollationDirections() {
return _collationDirections;
}

public boolean isSortOnSender() {
return _isSortOnSender;
}

public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}

public StageNode getSender() {
return _sender;
}
Expand Down
Loading

0 comments on commit bc9aa75

Please sign in to comment.