Skip to content

Commit

Permalink
Add a PinotLogicalSortExchange to replace usage of LogicalSortExchang…
Browse files Browse the repository at this point in the history
…e as it will be sender and receiver aware

Add support for ordering based on collation keys in the MailboxReceiveOperator when PinotLogicalSortExchange is used.
MailboxSendOperator will be modified later to add sort support.
Modify SortOperator to avoid sorting if the input is already sorted. It should still apply offset + limit
  • Loading branch information
somandal committed Mar 16, 2023
1 parent 40a7911 commit bfa4811
Show file tree
Hide file tree
Showing 22 changed files with 1,124 additions and 233 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.calcite.rel.logical;

import org.apache.calcite.plan.Convention;
import org.apache.calcite.plan.RelOptCluster;
import org.apache.calcite.plan.RelTraitSet;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollationTraitDef;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelDistributionTraitDef;
import org.apache.calcite.rel.RelInput;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelWriter;
import org.apache.calcite.rel.core.SortExchange;


/**
* Pinot's implementation of {@code SortExchange} which needs information about whether to sort on the sender
* and/or receiver side of the exchange. Every {@code Exchange} is broken into a send and a receive node and the
* decision on where to sort is made by the planner and this information has to b passed onto the send and receive
* nodes for the correct execution.
*
* Note: This class does not extend {@code LogicalSortExchange} because its constructor which takes the list of
* parameters is private.
*/
public class PinotLogicalSortExchange extends SortExchange {

protected final boolean _isSortOnSender;
protected final boolean _isSortOnReceiver;

private PinotLogicalSortExchange(RelOptCluster cluster, RelTraitSet traitSet,
RelNode input, RelDistribution distribution, RelCollation collation,
boolean isSortOnSender, boolean isSortOnReceiver) {
super(cluster, traitSet, input, distribution, collation);
_isSortOnSender = isSortOnSender;
_isSortOnReceiver = isSortOnReceiver;
}

/**
* Creates a PinotLogicalSortExchange by parsing serialized output.
*/
public PinotLogicalSortExchange(RelInput input) {
super(input);
_isSortOnSender = false;
_isSortOnReceiver = true;
}

/**
* Creates a PinotLogicalSortExchange.
*
* @param input Input relational expression
* @param distribution Distribution specification
* @param collation array of sort specifications
* @param isSortOnSender whether to sort on the sender
* @param isSortOnReceiver whether to sort on receiver
*/
public static PinotLogicalSortExchange create(
RelNode input,
RelDistribution distribution,
RelCollation collation,
boolean isSortOnSender,
boolean isSortOnReceiver) {
RelOptCluster cluster = input.getCluster();
collation = RelCollationTraitDef.INSTANCE.canonize(collation);
distribution = RelDistributionTraitDef.INSTANCE.canonize(distribution);
RelTraitSet traitSet =
input.getTraitSet().replace(Convention.NONE).replace(distribution).replace(collation);
return new PinotLogicalSortExchange(cluster, traitSet, input, distribution,
collation, isSortOnSender, isSortOnReceiver);
}

//~ Methods ----------------------------------------------------------------

@Override
public SortExchange copy(RelTraitSet traitSet, RelNode newInput,
RelDistribution newDistribution, RelCollation newCollation) {
return new PinotLogicalSortExchange(this.getCluster(), traitSet, newInput,
newDistribution, newCollation, _isSortOnSender, _isSortOnReceiver);
}

@Override
public RelWriter explainTerms(RelWriter pw) {
return super.explainTerms(pw)
.item("isSortOnSender", _isSortOnSender)
.item("isSortOnReceiver", _isSortOnReceiver);
}

public boolean isSortOnSender() {
return _isSortOnSender;
}

public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.rel.metadata.RelMdUtil;
import org.apache.calcite.rel.metadata.RelMetadataQuery;
import org.apache.calcite.rex.RexBuilder;
Expand Down Expand Up @@ -99,7 +99,7 @@ public void onMatch(RelOptRuleCall call) {
public interface Config extends RelRule.Config {

Config DEFAULT = ImmutableSortExchangeCopyRule.Config.of()
.withOperandFor(LogicalSort.class, LogicalSortExchange.class);
.withOperandFor(LogicalSort.class, PinotLogicalSortExchange.class);

@Override default PinotSortExchangeCopyRule toRule() {
return new PinotSortExchangeCopyRule(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.calcite.rel.RelDistributions;
import org.apache.calcite.rel.core.Sort;
import org.apache.calcite.rel.logical.LogicalSort;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.tools.RelBuilderFactory;


Expand Down Expand Up @@ -62,10 +62,15 @@ public boolean matches(RelOptRuleCall call) {
@Override
public void onMatch(RelOptRuleCall call) {
Sort sort = call.rel(0);
LogicalSortExchange exchange = LogicalSortExchange.create(
// TODO: Assess whether sorting is needed on both sender and receiver side or only receiver side. Potentially add
// SqlHint support to determine this. For now setting sort only on receiver side as sender side sorting is
// not yet implemented.
PinotLogicalSortExchange exchange = PinotLogicalSortExchange.create(
sort.getInput(),
RelDistributions.hash(Collections.emptyList()),
sort.getCollation());
sort.getCollation(),
false,
true);
call.transformTo(LogicalSort.create(exchange, sort.getCollation(), sort.offset, sort.fetch));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Window;
import org.apache.calcite.rel.logical.LogicalExchange;
import org.apache.calcite.rel.logical.LogicalSortExchange;
import org.apache.calcite.rel.logical.LogicalWindow;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.tools.RelBuilderFactory;

Expand Down Expand Up @@ -86,8 +86,11 @@ public void onMatch(RelOptRuleCall call) {
} else if (windowGroup.keys.isEmpty() && !windowGroup.orderKeys.getKeys().isEmpty()) {
// Only ORDER BY
// Add a LogicalSortExchange with collation on the order by key(s) and an empty hash partition key
LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys);
// TODO: ORDER BY only type queries need to be sorted on both sender and receiver side for better performance.
// Sorted input data can use a k-way merge instead of a PriorityQueue for sorting. For now support to
// sort on the sender side is not available thus setting this up to only sort on the receiver.
PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
RelDistributions.hash(Collections.emptyList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
} else {
Expand All @@ -106,8 +109,12 @@ public void onMatch(RelOptRuleCall call) {
} else {
// PARTITION BY and ORDER BY on different key(s)
// Add a LogicalSortExchange hashed on the partition by keys and collation based on order by keys
LogicalSortExchange sortExchange = LogicalSortExchange.create(windowInput,
RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys);
// TODO: ORDER BY only type queries need to be sorted only on the receiver side unless a hint is set indicating
// that the data is already partitioned and sorting can be done on the sender side instead. This way
// sorting on the receiver side can be a no-op. Add support for this hint and pass it on. Until sender
// side sorting is implemented, setting this hint will throw an error on execution.
PinotLogicalSortExchange sortExchange = PinotLogicalSortExchange.create(windowInput,
RelDistributions.hash(windowGroup.keys.toList()), windowGroup.orderKeys, false, true);
call.transformTo(LogicalWindow.create(window.getTraitSet(), sortExchange, window.constants, window.getRowType(),
window.groups));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

import java.util.List;
import java.util.Map;
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.RelRoot;
import org.apache.calcite.rel.core.Exchange;
import org.apache.calcite.rel.core.SortExchange;
import org.apache.calcite.rel.logical.PinotLogicalSortExchange;
import org.apache.pinot.common.config.provider.TableCache;
import org.apache.pinot.query.context.PlannerContext;
import org.apache.pinot.query.planner.QueryPlan;
Expand Down Expand Up @@ -74,12 +77,12 @@ public QueryPlan makePlan(RelRoot relRoot) {
// global root needs to send results back to the ROOT, a.k.a. the client response node. the last stage only has one
// receiver so doesn't matter what the exchange type is. setting it to SINGLETON by default.
StageNode globalSenderNode = new MailboxSendNode(globalStageRoot.getStageId(), globalStageRoot.getDataSchema(),
0, RelDistribution.Type.RANDOM_DISTRIBUTED, null);
0, RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false);
globalSenderNode.addInput(globalStageRoot);

StageNode globalReceiverNode =
new MailboxReceiveNode(0, globalStageRoot.getDataSchema(), globalStageRoot.getStageId(),
RelDistribution.Type.RANDOM_DISTRIBUTED, null, globalSenderNode);
RelDistribution.Type.RANDOM_DISTRIBUTED, null, null, false, false, globalSenderNode);

QueryPlan queryPlan = StageMetadataVisitor.attachMetadata(relRoot.fields, globalReceiverNode);

Expand All @@ -100,7 +103,19 @@ private StageNode walkRelPlan(RelNode node, int currentStageId) {
if (isExchangeNode(node)) {
StageNode nextStageRoot = walkRelPlan(node.getInput(0), getNewStageId());
RelDistribution distribution = ((Exchange) node).getDistribution();
return createSendReceivePair(nextStageRoot, distribution, currentStageId);
RelCollation collation = null;
boolean isSortOnSender = false;
boolean isSortOnReceiver = false;
if (isSortExchangeNode(node)) {
collation = ((SortExchange) node).getCollation();
if (node instanceof PinotLogicalSortExchange) {
// These flags only take meaning if the collation is not null or empty
isSortOnSender = ((PinotLogicalSortExchange) node).isSortOnSender();
isSortOnReceiver = ((PinotLogicalSortExchange) node).isSortOnReceiver();
}
}
return createSendReceivePair(nextStageRoot, distribution, collation, isSortOnSender, isSortOnReceiver,
currentStageId);
} else {
StageNode stageNode = RelToStageConverter.toStageNode(node, currentStageId);
List<RelNode> inputs = node.getInputs();
Expand All @@ -118,7 +133,8 @@ private void runPhysicalOptimizers(QueryPlan queryPlan) {
}
}

private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, int currentStageId) {
private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution distribution, RelCollation collation,
boolean isSortOnSender, boolean isSortOnReceiver, int currentStageId) {
List<Integer> distributionKeys = distribution.getKeys();
RelDistribution.Type exchangeType = distribution.getType();

Expand All @@ -129,9 +145,11 @@ private StageNode createSendReceivePair(StageNode nextStageRoot, RelDistribution
? new FieldSelectionKeySelector(distributionKeys) : null;

StageNode mailboxSender = new MailboxSendNode(nextStageRoot.getStageId(), nextStageRoot.getDataSchema(),
currentStageId, exchangeType, keySelector);
currentStageId, exchangeType, keySelector, collation == null ? null : collation.getFieldCollations(),
isSortOnSender);
StageNode mailboxReceiver = new MailboxReceiveNode(currentStageId, nextStageRoot.getDataSchema(),
nextStageRoot.getStageId(), exchangeType, keySelector, mailboxSender);
nextStageRoot.getStageId(), exchangeType, keySelector,
collation == null ? null : collation.getFieldCollations(), isSortOnSender, isSortOnReceiver, mailboxSender);
mailboxSender.addInput(nextStageRoot);

return mailboxReceiver;
Expand All @@ -141,6 +159,10 @@ private boolean isExchangeNode(RelNode node) {
return (node instanceof Exchange);
}

private boolean isSortExchangeNode(RelNode node) {
return (node instanceof SortExchange);
}

private int getNewStageId() {
return _stageIdCounter++;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@
*/
package org.apache.pinot.query.planner.stage;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.calcite.rel.RelDistribution;
import org.apache.calcite.rel.RelFieldCollation;
import org.apache.commons.collections.CollectionUtils;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.query.planner.logical.RexExpression;
import org.apache.pinot.query.planner.partitioning.KeySelector;
import org.apache.pinot.query.planner.serde.ProtoProperties;

Expand All @@ -32,6 +39,14 @@ public class MailboxReceiveNode extends AbstractStageNode {
private RelDistribution.Type _exchangeType;
@ProtoProperties
private KeySelector<Object[], Object[]> _partitionKeySelector;
@ProtoProperties
private List<RexExpression> _collationKeys;
@ProtoProperties
private List<RelFieldCollation.Direction> _collationDirections;
@ProtoProperties
private boolean _isSortOnSender;
@ProtoProperties
private boolean _isSortOnReceiver;

// this is only available during planning and should not be relied
// on in any post-serialization code
Expand All @@ -43,11 +58,26 @@ public MailboxReceiveNode(int stageId) {

public MailboxReceiveNode(int stageId, DataSchema dataSchema, int senderStageId,
RelDistribution.Type exchangeType, @Nullable KeySelector<Object[], Object[]> partitionKeySelector,
@Nullable List<RelFieldCollation> fieldCollations, boolean isSortOnSender, boolean isSortOnReceiver,
StageNode sender) {
super(stageId, dataSchema);
_senderStageId = senderStageId;
_exchangeType = exchangeType;
_partitionKeySelector = partitionKeySelector;
if (!CollectionUtils.isEmpty(fieldCollations)) {
_collationKeys = new ArrayList<>(fieldCollations.size());
_collationDirections = new ArrayList<>(fieldCollations.size());
for (RelFieldCollation fieldCollation : fieldCollations) {
_collationDirections.add(fieldCollation.getDirection());
_collationKeys.add(new RexExpression.InputRef(fieldCollation.getFieldIndex()));
}
} else {
_collationKeys = Collections.emptyList();
_collationDirections = Collections.emptyList();
}
_isSortOnSender = isSortOnSender;
Preconditions.checkState(!isSortOnSender, "Input shouldn't be sorted as ordering on send is not yet implemented!");
_isSortOnReceiver = isSortOnReceiver;
_sender = sender;
}

Expand All @@ -67,6 +97,22 @@ public KeySelector<Object[], Object[]> getPartitionKeySelector() {
return _partitionKeySelector;
}

public List<RexExpression> getCollationKeys() {
return _collationKeys;
}

public List<RelFieldCollation.Direction> getCollationDirections() {
return _collationDirections;
}

public boolean isSortOnSender() {
return _isSortOnSender;
}

public boolean isSortOnReceiver() {
return _isSortOnReceiver;
}

public StageNode getSender() {
return _sender;
}
Expand Down
Loading

0 comments on commit bfa4811

Please sign in to comment.