diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java index a25c2e0b54ff..9436ec4bf08f 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/QueryRunner.java @@ -54,6 +54,7 @@ import org.apache.pinot.query.runtime.operator.MailboxSendOperator; import org.apache.pinot.query.runtime.operator.OpChain; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.PhysicalPlanVisitor; import org.apache.pinot.query.runtime.plan.PlanRequestContext; import org.apache.pinot.query.runtime.plan.ServerRequestPlanVisitor; @@ -209,12 +210,13 @@ private void runLeafStage(DistributedStagePlan distributedStagePlan, Map aggCalls, - List groupSet, DataSchema inputSchema, long requestId, int stageId, - VirtualServerAddress virtualServerAddress) { - this(inputOperator, dataSchema, aggCalls, groupSet, inputSchema, AggregateOperator.AggregateAccumulator.AGG_MERGERS, - requestId, stageId, virtualServerAddress); + public AggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, DataSchema dataSchema, + List aggCalls, List groupSet, DataSchema inputSchema) { + this(context, inputOperator, dataSchema, aggCalls, groupSet, inputSchema, + AggregateOperator.AggregateAccumulator.AGG_MERGERS); } @VisibleForTesting - AggregateOperator(MultiStageOperator inputOperator, DataSchema dataSchema, List aggCalls, - List groupSet, DataSchema inputSchema, - Map> mergers, long requestId, int stageId, - VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + AggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, DataSchema dataSchema, + List aggCalls, List groupSet, DataSchema inputSchema, + Map> mergers) { + super(context); _inputOperator = inputOperator; _groupSet = groupSet; _upstreamErrorBlock = null; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java index 0b819fdd4642..2851aa528af7 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/FilterOperator.java @@ -25,11 +25,11 @@ import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +55,9 @@ public class FilterOperator extends MultiStageOperator { private final DataSchema _dataSchema; private TransferableBlock _upstreamErrorBlock; - public FilterOperator(MultiStageOperator upstreamOperator, DataSchema dataSchema, RexExpression filter, - long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + public FilterOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, DataSchema dataSchema, + RexExpression filter) { + super(context); _upstreamOperator = upstreamOperator; _dataSchema = dataSchema; _filterOperand = TransformOperand.toTransformOperand(filter, dataSchema); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java index a813f35db36b..9bc8c28407b6 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/HashJoinOperator.java @@ -35,11 +35,11 @@ import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.planner.stage.JoinNode; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,9 +88,9 @@ public class HashJoinOperator extends MultiStageOperator { private KeySelector _leftKeySelector; private KeySelector _rightKeySelector; - public HashJoinOperator(MultiStageOperator leftTableOperator, MultiStageOperator rightTableOperator, - DataSchema leftSchema, JoinNode node, long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + public HashJoinOperator(OpChainExecutionContext context, MultiStageOperator leftTableOperator, + MultiStageOperator rightTableOperator, DataSchema leftSchema, JoinNode node) { + super(context); Preconditions.checkState(SUPPORTED_JOIN_TYPES.contains(node.getJoinRelType()), "Join type: " + node.getJoinRelType() + " is not supported!"); _joinType = node.getJoinRelType(); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java index d2b9c4db16af..9af6e8819d7c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperator.java @@ -39,8 +39,8 @@ import org.apache.pinot.core.operator.blocks.results.GroupByResultsBlock; import org.apache.pinot.core.operator.blocks.results.SelectionResultsBlock; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,9 +67,9 @@ public class LeafStageTransferableBlockOperator extends MultiStageOperator { private final DataSchema _desiredDataSchema; private int _currentIndex; - public LeafStageTransferableBlockOperator(List baseResultBlock, DataSchema dataSchema, - long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + public LeafStageTransferableBlockOperator(OpChainExecutionContext context, + List baseResultBlock, DataSchema dataSchema) { + super(context); _baseResultBlock = baseResultBlock; _desiredDataSchema = dataSchema; _errorBlock = baseResultBlock.stream().filter(e -> !e.getExceptions().isEmpty()).findFirst().orElse(null); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java index 36d3fe511fba..029e58e9a2a5 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/LiteralValueOperator.java @@ -25,9 +25,9 @@ import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,9 +40,9 @@ public class LiteralValueOperator extends MultiStageOperator { private final TransferableBlock _rexLiteralBlock; private boolean _isLiteralBlockReturned; - public LiteralValueOperator(DataSchema dataSchema, List> rexLiteralRows, - long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + public LiteralValueOperator(OpChainExecutionContext context, DataSchema dataSchema, + List> rexLiteralRows) { + super(context); _dataSchema = dataSchema; _rexLiteralBlock = constructBlock(rexLiteralRows); _isLiteralBlockReturned = false; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java index 74bd60e10d9f..520392985e2c 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperator.java @@ -36,6 +36,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.service.QueryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,12 +77,20 @@ private static MailboxIdentifier toMailboxId(VirtualServer sender, long jobId, i receiverStageId); } + public MailboxReceiveOperator(OpChainExecutionContext context, RelDistribution.Type exchangeType, int senderStageId, + int receiverStageId) { + this(context, context.getMetadataMap().get(senderStageId).getServerInstances(), exchangeType, senderStageId, + receiverStageId, context.getTimeoutMs()); + } + // TODO: Move deadlineInNanoSeconds to OperatorContext. - public MailboxReceiveOperator(MailboxService mailboxService, - List sendingStageInstances, RelDistribution.Type exchangeType, VirtualServerAddress receiver, - long jobId, int senderStageId, int receiverStageId, Long timeoutMs) { - super(jobId, senderStageId, receiver); - _mailboxService = mailboxService; + //TODO: Remove boxed timeoutMs value from here and use long deadlineMs from context. + public MailboxReceiveOperator(OpChainExecutionContext context, List sendingStageInstances, + RelDistribution.Type exchangeType, int senderStageId, int receiverStageId, Long timeoutMs) { + super(context); + _mailboxService = context.getMailboxService(); + VirtualServerAddress receiver = context.getServer(); + long jobId = context.getRequestId(); Preconditions.checkState(SUPPORTED_EXCHANGE_TYPES.contains(exchangeType), "Exchange/Distribution type: " + exchangeType + " is not supported!"); long timeoutNano = (timeoutMs != null ? timeoutMs : QueryConfig.DEFAULT_MAILBOX_TIMEOUT_MS) * 1_000_000L; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java index f4e79765276e..940e668cdc7a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MailboxSendOperator.java @@ -38,6 +38,7 @@ import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -68,24 +69,23 @@ interface MailboxIdGenerator { MailboxIdentifier generate(VirtualServer server); } - public MailboxSendOperator(MailboxService mailboxService, - MultiStageOperator dataTableBlockBaseOperator, List receivingStageInstances, - RelDistribution.Type exchangeType, KeySelector keySelector, - VirtualServerAddress sendingServer, long jobId, int senderStageId, int receiverStageId, long deadlineMs) { - this(mailboxService, dataTableBlockBaseOperator, receivingStageInstances, exchangeType, keySelector, - server -> toMailboxId(server, jobId, senderStageId, receiverStageId, sendingServer), BlockExchange::getExchange, - jobId, senderStageId, receiverStageId, sendingServer, deadlineMs); + public MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator, + RelDistribution.Type exchangeType, KeySelector keySelector, int senderStageId, + int receiverStageId) { + this(context, dataTableBlockBaseOperator, exchangeType, keySelector, + (server) -> toMailboxId(server, context.getRequestId(), senderStageId, receiverStageId, context.getServer()), + BlockExchange::getExchange, receiverStageId); } @VisibleForTesting - MailboxSendOperator(MailboxService mailboxService, - MultiStageOperator dataTableBlockBaseOperator, List receivingStageInstances, + MailboxSendOperator(OpChainExecutionContext context, MultiStageOperator dataTableBlockBaseOperator, RelDistribution.Type exchangeType, KeySelector keySelector, - MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, long jobId, int senderStageId, - int receiverStageId, VirtualServerAddress serverAddress, long deadlineMs) { - super(jobId, senderStageId, serverAddress); + MailboxIdGenerator mailboxIdGenerator, BlockExchangeFactory blockExchangeFactory, int receiverStageId) { + super(context); _dataTableBlockBaseOperator = dataTableBlockBaseOperator; - + MailboxService mailboxService = context.getMailboxService(); + List receivingStageInstances = + context.getMetadataMap().get(receiverStageId).getServerInstances(); List receivingMailboxes; if (exchangeType == RelDistribution.Type.SINGLETON) { // TODO: this logic should be moved into SingletonExchange @@ -112,8 +112,9 @@ public MailboxSendOperator(MailboxService mailboxService, } BlockSplitter splitter = TransferableBlockUtils::splitBlock; - _exchange = blockExchangeFactory.build(mailboxService, receivingMailboxes, exchangeType, keySelector, splitter, - deadlineMs); + _exchange = + blockExchangeFactory.build(context.getMailboxService(), receivingMailboxes, exchangeType, keySelector, splitter, + context.getDeadlineMs()); Preconditions.checkState(SUPPORTED_EXCHANGE_TYPE.contains(exchangeType), String.format("Exchange type '%s' is not supported yet", exchangeType)); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java index 55fbc8830d34..8f97f57d4625 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/MultiStageOperator.java @@ -24,10 +24,10 @@ import java.util.Map; import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.core.common.Operator; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.spi.exception.EarlyTerminationException; import org.apache.pinot.spi.trace.InvocationScope; import org.apache.pinot.spi.trace.Tracing; @@ -38,20 +38,18 @@ public abstract class MultiStageOperator implements Operator, private static final org.slf4j.Logger LOGGER = LoggerFactory.getLogger(MultiStageOperator.class); // TODO: Move to OperatorContext class. - protected final long _requestId; - protected final int _stageId; - protected final VirtualServerAddress _serverAddress; protected final OperatorStats _operatorStats; protected final Map _operatorStatsMap; private final String _operatorId; + private final OpChainExecutionContext _context; - public MultiStageOperator(long requestId, int stageId, VirtualServerAddress serverAddress) { - _requestId = requestId; - _stageId = stageId; - _operatorStats = new OperatorStats(requestId, stageId, serverAddress, toExplainString()); - _serverAddress = serverAddress; + public MultiStageOperator(OpChainExecutionContext context) { + _context = context; + _operatorStats = + new OperatorStats(_context, toExplainString()); _operatorStatsMap = new HashMap<>(); - _operatorId = Joiner.on("_").join(toExplainString(), _requestId, _stageId, _serverAddress); + _operatorId = + Joiner.on("_").join(toExplainString(), _context.getRequestId(), _context.getStageId(), _context.getServer()); } public Map getOperatorStatsMap() { diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java index 6de4e920407c..84fac943c20a 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OpChain.java @@ -24,6 +24,7 @@ import org.apache.pinot.core.common.Operator; import org.apache.pinot.query.mailbox.MailboxIdentifier; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; /** @@ -44,6 +45,10 @@ public OpChain(MultiStageOperator root, List receivingMailbox _stats = new OpChainStats(_id.toString()); } + public OpChain(OpChainExecutionContext context, MultiStageOperator root, List receivingMailboxes) { + this(root, receivingMailboxes, context.getServer().virtualId(), context.getRequestId(), context.getStageId()); + } + public Operator getRoot() { return _root; } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java index 715be163bb85..cb01623ee490 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/OperatorStats.java @@ -25,6 +25,7 @@ import org.apache.pinot.common.datatable.DataTable; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; public class OperatorStats { @@ -42,6 +43,11 @@ public class OperatorStats { private long _startTimeMs = -1; private final Map _executionStats; + public OperatorStats(OpChainExecutionContext context, String operatorType) { + this(context.getRequestId(), context.getStageId(), context.getServer(), operatorType); + } + + //TODO: remove this constructor after the context constructor can be used in serialization and deserialization public OperatorStats(long requestId, int stageId, VirtualServerAddress serverAddress, String operatorType) { _stageId = stageId; _requestId = requestId; diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java index 2298dbb4c443..9eeea863a9ea 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/SortOperator.java @@ -31,9 +31,9 @@ import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.core.query.selection.SelectionOperatorUtils; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,18 +54,18 @@ public class SortOperator extends MultiStageOperator { private boolean _isSortedBlockConstructed; private TransferableBlock _upstreamErrorBlock; - public SortOperator(MultiStageOperator upstreamOperator, List collationKeys, - List collationDirections, int fetch, int offset, DataSchema dataSchema, - long requestId, int stageId, VirtualServerAddress serverAddress) { - this(upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, - SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY, requestId, stageId, serverAddress); + public SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, + List collationKeys, List collationDirections, int fetch, int offset, + DataSchema dataSchema) { + this(context, upstreamOperator, collationKeys, collationDirections, fetch, offset, dataSchema, + SelectionOperatorUtils.MAX_ROW_HOLDER_INITIAL_CAPACITY); } @VisibleForTesting - SortOperator(MultiStageOperator upstreamOperator, List collationKeys, + SortOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, List collationKeys, List collationDirections, int fetch, int offset, DataSchema dataSchema, - int defaultHolderCapacity, long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + int defaultHolderCapacity) { + super(context); _upstreamOperator = upstreamOperator; _fetch = fetch; _offset = Math.max(offset, 0); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java index 03f54ee6f455..e53bcfdac232 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/TransformOperator.java @@ -26,11 +26,11 @@ import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.operands.TransformOperand; import org.apache.pinot.query.runtime.operator.utils.FunctionInvokeUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +55,9 @@ public class TransformOperator extends MultiStageOperator { private final DataSchema _resultSchema; private TransferableBlock _upstreamErrorBlock; - public TransformOperator(MultiStageOperator upstreamOperator, DataSchema resultSchema, List transforms, - DataSchema upstreamDataSchema, long requestId, int stageId, VirtualServerAddress serverAddress) { - super(requestId, stageId, serverAddress); + public TransformOperator(OpChainExecutionContext context, MultiStageOperator upstreamOperator, + DataSchema resultSchema, List transforms, DataSchema upstreamDataSchema) { + super(context); Preconditions.checkState(!transforms.isEmpty(), "transform operand should not be empty."); Preconditions.checkState(resultSchema.size() == transforms.size(), "result schema size:" + resultSchema.size() + " doesn't match transform operand size:" + transforms.size()); diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java index 55a93df43fb3..0d30dd394d27 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperator.java @@ -37,10 +37,10 @@ import org.apache.pinot.core.data.table.Key; import org.apache.pinot.query.planner.logical.RexExpression; import org.apache.pinot.query.planner.stage.WindowNode; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.utils.AggregationUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -88,26 +88,23 @@ public class WindowAggregateOperator extends MultiStageOperator { private boolean _readyToConstruct; private boolean _hasReturnedWindowAggregateBlock; - public WindowAggregateOperator(MultiStageOperator inputOperator, List groupSet, - List orderSet, List orderSetDirection, + public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, + List groupSet, List orderSet, List orderSetDirection, List orderSetNullDirection, List aggCalls, int lowerBound, int upperBound, WindowNode.WindowFrameType windowFrameType, List constants, - DataSchema resultSchema, DataSchema inputSchema, long requestId, int stageId, - VirtualServerAddress virtualServerAddress) { - this(inputOperator, groupSet, orderSet, orderSetDirection, orderSetNullDirection, aggCalls, lowerBound, - upperBound, windowFrameType, constants, resultSchema, inputSchema, AggregationUtils.Accumulator.MERGERS, - requestId, stageId, virtualServerAddress); + DataSchema resultSchema, DataSchema inputSchema) { + this(context, inputOperator, groupSet, orderSet, orderSetDirection, orderSetNullDirection, aggCalls, lowerBound, + upperBound, windowFrameType, constants, resultSchema, inputSchema, AggregationUtils.Accumulator.MERGERS); } @VisibleForTesting - public WindowAggregateOperator(MultiStageOperator inputOperator, List groupSet, - List orderSet, List orderSetDirection, + public WindowAggregateOperator(OpChainExecutionContext context, MultiStageOperator inputOperator, + List groupSet, List orderSet, List orderSetDirection, List orderSetNullDirection, List aggCalls, int lowerBound, int upperBound, WindowNode.WindowFrameType windowFrameType, List constants, - DataSchema resultSchema, DataSchema inputSchema, Map> mergers, long requestId, int stageId, - VirtualServerAddress virtualServerAddress) { - super(requestId, stageId, virtualServerAddress); + DataSchema resultSchema, DataSchema inputSchema, + Map> mergers) { + super(context); boolean isPartitionByOnly = isPartitionByOnlyQuery(groupSet, orderSet, orderSetDirection, orderSetNullDirection); Preconditions.checkState(orderSet == null || orderSet.isEmpty() || isPartitionByOnly, diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java new file mode 100644 index 000000000000..1f500ce8b28f --- /dev/null +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/OpChainExecutionContext.java @@ -0,0 +1,86 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.query.runtime.plan; + +import java.util.Map; +import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.StageMetadata; +import org.apache.pinot.query.routing.VirtualServerAddress; +import org.apache.pinot.query.runtime.blocks.TransferableBlock; + + +/** + * The {@code OpChainExecutionContext} class contains the information derived from the PlanRequestContext. + * Members of this class should not be changed once initialized. + * This information is then used by the OpChain to create the Operators for a query. + */ +public class OpChainExecutionContext { + private final MailboxService _mailboxService; + private final long _requestId; + private final int _stageId; + private final VirtualServerAddress _server; + private final long _timeoutMs; + private final long _deadlineMs; + private final Map _metadataMap; + + public OpChainExecutionContext(MailboxService mailboxService, long requestId, int stageId, + VirtualServerAddress server, long timeoutMs, long deadlineMs, Map metadataMap) { + _mailboxService = mailboxService; + _requestId = requestId; + _stageId = stageId; + _server = server; + _timeoutMs = timeoutMs; + _deadlineMs = deadlineMs; + _metadataMap = metadataMap; + } + + public OpChainExecutionContext(PlanRequestContext planRequestContext) { + this(planRequestContext.getMailboxService(), planRequestContext.getRequestId(), planRequestContext.getStageId(), + planRequestContext.getServer(), planRequestContext.getTimeoutMs(), planRequestContext.getDeadlineMs(), + planRequestContext.getMetadataMap()); + } + + public MailboxService getMailboxService() { + return _mailboxService; + } + + public long getRequestId() { + return _requestId; + } + + public int getStageId() { + return _stageId; + } + + public VirtualServerAddress getServer() { + return _server; + } + + public long getTimeoutMs() { + return _timeoutMs; + } + + public long getDeadlineMs() { + return _deadlineMs; + } + + public Map getMetadataMap() { + return _metadataMap; + } +} diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java index 8ff1c041d1ec..4a9892a4dd85 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PhysicalPlanVisitor.java @@ -18,8 +18,6 @@ */ package org.apache.pinot.query.runtime.plan; -import java.util.List; -import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.stage.AggregateNode; import org.apache.pinot.query.planner.stage.FilterNode; import org.apache.pinot.query.planner.stage.JoinNode; @@ -32,7 +30,6 @@ import org.apache.pinot.query.planner.stage.TableScanNode; import org.apache.pinot.query.planner.stage.ValueNode; import org.apache.pinot.query.planner.stage.WindowNode; -import org.apache.pinot.query.routing.VirtualServer; import org.apache.pinot.query.runtime.operator.AggregateOperator; import org.apache.pinot.query.runtime.operator.FilterOperator; import org.apache.pinot.query.runtime.operator.HashJoinOperator; @@ -59,17 +56,14 @@ public class PhysicalPlanVisitor implements StageNodeVisitor sendingInstances = context.getMetadataMap().get(node.getSenderStageId()).getServerInstances(); MailboxReceiveOperator mailboxReceiveOperator = - new MailboxReceiveOperator(context.getMailboxService(), sendingInstances, - node.getExchangeType(), context.getServer(), - context.getRequestId(), node.getSenderStageId(), node.getStageId(), context.getTimeoutMs()); + new MailboxReceiveOperator(context.getOpChainExecutionContext(), node.getExchangeType(), + node.getSenderStageId(), node.getStageId()); context.addReceivingMailboxes(mailboxReceiveOperator.getSendingMailbox()); return mailboxReceiveOperator; } @@ -77,34 +71,31 @@ public MultiStageOperator visitMailboxReceive(MailboxReceiveNode node, PlanReque @Override public MultiStageOperator visitMailboxSend(MailboxSendNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - StageMetadata receivingStageMetadata = context.getMetadataMap().get(node.getReceiverStageId()); - return new MailboxSendOperator(context.getMailboxService(), nextOperator, - receivingStageMetadata.getServerInstances(), node.getExchangeType(), node.getPartitionKeySelector(), - context.getServer(), context.getRequestId(), node.getStageId(), node.getReceiverStageId(), - context.getDeadlineMs()); + return new MailboxSendOperator(context.getOpChainExecutionContext(), nextOperator, node.getExchangeType(), + node.getPartitionKeySelector(), node.getStageId(), node.getReceiverStageId()); } @Override public MultiStageOperator visitAggregate(AggregateNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new AggregateOperator(nextOperator, node.getDataSchema(), node.getAggCalls(), node.getGroupSet(), - node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId, context.getServer()); + return new AggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), + node.getAggCalls(), node.getGroupSet(), node.getInputs().get(0).getDataSchema()); } @Override public MultiStageOperator visitWindow(WindowNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new WindowAggregateOperator(nextOperator, node.getGroupSet(), node.getOrderSet(), - node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), node.getLowerBound(), - node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), node.getDataSchema(), - node.getInputs().get(0).getDataSchema(), context._requestId, context._stageId, context.getServer()); + return new WindowAggregateOperator(context.getOpChainExecutionContext(), nextOperator, node.getGroupSet(), + node.getOrderSet(), node.getOrderSetDirection(), node.getOrderSetNullDirection(), node.getAggCalls(), + node.getLowerBound(), node.getUpperBound(), node.getWindowFrameType(), node.getConstants(), + node.getDataSchema(), node.getInputs().get(0).getDataSchema()); } @Override public MultiStageOperator visitFilter(FilterNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new FilterOperator(nextOperator, node.getDataSchema(), node.getCondition(), context.getRequestId(), - context.getStageId(), context.getServer()); + return new FilterOperator(context.getOpChainExecutionContext(), + nextOperator, node.getDataSchema(), node.getCondition()); } @Override @@ -115,22 +106,22 @@ public MultiStageOperator visitJoin(JoinNode node, PlanRequestContext context) { MultiStageOperator leftOperator = left.visit(this, context); MultiStageOperator rightOperator = right.visit(this, context); - return new HashJoinOperator(leftOperator, rightOperator, left.getDataSchema(), node, context.getRequestId(), - context.getStageId(), context.getServer()); + return new HashJoinOperator(context.getOpChainExecutionContext(), leftOperator, rightOperator, left.getDataSchema(), + node); } @Override public MultiStageOperator visitProject(ProjectNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new TransformOperator(nextOperator, node.getDataSchema(), node.getProjects(), - node.getInputs().get(0).getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer()); + return new TransformOperator(context.getOpChainExecutionContext(), nextOperator, node.getDataSchema(), + node.getProjects(), node.getInputs().get(0).getDataSchema()); } @Override public MultiStageOperator visitSort(SortNode node, PlanRequestContext context) { MultiStageOperator nextOperator = node.getInputs().get(0).visit(this, context); - return new SortOperator(nextOperator, node.getCollationKeys(), node.getCollationDirections(), node.getFetch(), - node.getOffset(), node.getDataSchema(), context.getRequestId(), context.getStageId(), context.getServer()); + return new SortOperator(context.getOpChainExecutionContext(), nextOperator, node.getCollationKeys(), + node.getCollationDirections(), node.getFetch(), node.getOffset(), node.getDataSchema()); } @Override @@ -140,7 +131,6 @@ public MultiStageOperator visitTableScan(TableScanNode node, PlanRequestContext @Override public MultiStageOperator visitValue(ValueNode node, PlanRequestContext context) { - return new LiteralValueOperator(node.getDataSchema(), node.getLiteralRows(), context.getRequestId(), - context.getStageId(), context.getServer()); + return new LiteralValueOperator(context.getOpChainExecutionContext(), node.getDataSchema(), node.getLiteralRows()); } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java index dac0f9757587..fb610ebb4616 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/runtime/plan/PlanRequestContext.java @@ -39,7 +39,7 @@ public class PlanRequestContext { protected final VirtualServerAddress _server; protected final Map _metadataMap; protected final List _receivingMailboxes = new ArrayList<>(); - + private final OpChainExecutionContext _opChainExecutionContext; public PlanRequestContext(MailboxService mailboxService, long requestId, int stageId, long timeoutMs, long deadlineMs, VirtualServerAddress server, Map metadataMap) { @@ -50,6 +50,7 @@ public PlanRequestContext(MailboxService mailboxService, long _deadlineMs = deadlineMs; _server = server; _metadataMap = metadataMap; + _opChainExecutionContext = new OpChainExecutionContext(this); } public long getRequestId() { @@ -87,4 +88,8 @@ public void addReceivingMailboxes(List ids) { public List getReceivingMailboxes() { return ImmutableList.copyOf(_receivingMailboxes); } + + public OpChainExecutionContext getOpChainExecutionContext() { + return _opChainExecutionContext; + } } diff --git a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java index 65aab54df073..a295ef06f6d9 100644 --- a/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java +++ b/pinot-query-runtime/src/main/java/org/apache/pinot/query/service/dispatch/QueryDispatcher.java @@ -57,6 +57,7 @@ import org.apache.pinot.query.runtime.operator.OperatorStats; import org.apache.pinot.query.runtime.operator.utils.OperatorUtils; import org.apache.pinot.query.runtime.plan.DistributedStagePlan; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.runtime.plan.serde.QueryPlanSerDeUtils; import org.apache.pinot.query.service.QueryConfig; import org.roaringbitmap.RoaringBitmap; @@ -173,10 +174,10 @@ int submit(long requestId, QueryPlan queryPlan, long timeoutMs, Map mailboxService, Map statsAggregatorMap) { MailboxReceiveNode reduceNode = (MailboxReceiveNode) queryPlan.getQueryStageMap().get(reduceStageId); - MailboxReceiveOperator mailboxReceiveOperator = createReduceStageOperator(mailboxService, - queryPlan.getStageMetadataMap().get(reduceNode.getSenderStageId()).getServerInstances(), requestId, - reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), - new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); + MailboxReceiveOperator mailboxReceiveOperator = + createReduceStageOperator(mailboxService, queryPlan.getStageMetadataMap(), requestId, + reduceNode.getSenderStageId(), reduceStageId, reduceNode.getDataSchema(), + new VirtualServerAddress(mailboxService.getHostname(), mailboxService.getMailboxPort(), 0), timeoutMs); List resultDataBlocks = reduceMailboxReceive(mailboxReceiveOperator, timeoutMs, statsAggregatorMap, queryPlan); return toResultTable(resultDataBlocks, queryPlan.getQueryResultFields(), @@ -275,12 +276,13 @@ private static DataSchema toResultSchema(DataSchema inputSchema, List mailboxService, - List sendingInstances, long jobId, int stageId, int reducerStageId, DataSchema dataSchema, + Map stageMetadataMap, long jobId, int stageId, int reducerStageId, DataSchema dataSchema, VirtualServerAddress server, long timeoutMs) { + OpChainExecutionContext context = + new OpChainExecutionContext(mailboxService, jobId, stageId, server, timeoutMs, timeoutMs, stageMetadataMap); // timeout is set for reduce stage MailboxReceiveOperator mailboxReceiveOperator = - new MailboxReceiveOperator(mailboxService, sendingInstances, - RelDistribution.Type.RANDOM_DISTRIBUTED, server, jobId, stageId, reducerStageId, timeoutMs); + new MailboxReceiveOperator(context, RelDistribution.Type.RANDOM_DISTRIBUTED, stageId, reducerStageId); return mailboxReceiveOperator; } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java index 85c9b7c2a30e..fce25e7e3f66 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/AggregateOperatorTest.java @@ -77,7 +77,8 @@ public void shouldHandleUpstreamErrorBlocks() { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build @@ -97,7 +98,8 @@ public void shouldHandleEndOfStreamBlockWithNoOtherInputs() { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block = operator.nextBlock(); @@ -119,7 +121,8 @@ public void shouldHandleUpstreamNoOpBlocksWhileConstructing() { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block @@ -142,7 +145,8 @@ public void shouldAggregateSingleInputBlock() { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -167,7 +171,8 @@ public void shouldAggregateSingleInputBlockWithLiteralInput() { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -199,8 +204,8 @@ public void shouldCallMergerWhenAggregatingMultipleRows() { Mockito.when(merger.initialize(Mockito.any(), Mockito.any())).thenReturn(1d); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); AggregateOperator operator = - new AggregateOperator(_input, outSchema, calls, group, inSchema, ImmutableMap.of("SUM", cdt -> merger), 1, 2, - _serverAddress); + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema, + ImmutableMap.of("SUM", cdt -> merger)); // When: TransferableBlock resultBlock = operator.nextBlock(); // (output result) @@ -220,9 +225,9 @@ public void testGroupByAggregateWithHashCollision() { // Create an aggregation call with sum for first column and group by second column. RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new ColumnDataType[]{INT, INT}); - AggregateOperator sum0GroupBy1 = - new AggregateOperator(upstreamOperator, OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1), - Arrays.asList(agg), Arrays.asList(new RexExpression.InputRef(1)), inSchema, 1, 2, _serverAddress); + AggregateOperator sum0GroupBy1 = new AggregateOperator(OperatorTestUtil.getDefaultContext(), upstreamOperator, + OperatorTestUtil.getDataSchema(OperatorTestUtil.OP_1), Arrays.asList(agg), + Arrays.asList(new RexExpression.InputRef(1)), inSchema); TransferableBlock result = sum0GroupBy1.getNextBlock(); while (result.isNoOpBlock()) { result = sum0GroupBy1.getNextBlock(); @@ -245,7 +250,8 @@ public void shouldThrowOnUnknownAggFunction() { DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new ColumnDataType[]{DOUBLE}); // When: - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); } @Test @@ -262,7 +268,8 @@ public void shouldReturnErrorBlockOnUnexpectedInputType() { .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema outSchema = new DataSchema(new String[]{"sum"}, new ColumnDataType[]{DOUBLE}); - AggregateOperator operator = new AggregateOperator(_input, outSchema, calls, group, inSchema, 1, 2, _serverAddress); + AggregateOperator operator = + new AggregateOperator(OperatorTestUtil.getDefaultContext(), _input, outSchema, calls, group, inSchema); // When: TransferableBlock block = operator.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java index 26e40cc4ab82..ba02d44fe174 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/FilterOperatorTest.java @@ -25,7 +25,6 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.spi.data.FieldSpec; @@ -43,13 +42,9 @@ public class FilterOperatorTest { @Mock private MultiStageOperator _upstreamOperator; - @Mock - private VirtualServerAddress _serverAddress; - @BeforeMethod public void setUp() { _mocks = MockitoAnnotations.openMocks(this); - Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString()); } @AfterMethod @@ -66,7 +61,8 @@ public void shouldPropagateUpstreamErrorBlock() { DataSchema inputSchema = new DataSchema(new String[]{"boolCol"}, new DataSchema.ColumnDataType[]{ DataSchema.ColumnDataType.BOOLEAN }); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock error = errorBlock.getDataBlock(); @@ -81,7 +77,8 @@ public void shouldPropagateUpstreamEOS() { DataSchema.ColumnDataType.INT }); Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertTrue(dataBlock.isEndOfStreamBlock()); } @@ -94,7 +91,8 @@ public void shouldPropagateUpstreamNoop() { DataSchema.ColumnDataType.INT }); Mockito.when(_upstreamOperator.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertTrue(dataBlock.isNoOpBlock()); } @@ -109,7 +107,8 @@ public void shouldHandleTrueBooleanLiteralFilter() { Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{0}, new Object[]{1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -127,7 +126,8 @@ public void shouldHandleFalseBooleanLiteralFilter() { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -142,7 +142,8 @@ public void shouldThrowOnNonBooleanTypeBooleanLiteral() { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, booleanLiteral, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, booleanLiteral); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock data = errorBlock.getDataBlock(); @@ -157,7 +158,7 @@ public void shouldThrowOnNonBooleanTypeInputRef() { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1}, new Object[]{2})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref0, 1, 2, _serverAddress); + FilterOperator op = new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, ref0); TransferableBlock errorBlock = op.getNextBlock(); Assert.assertTrue(errorBlock.isErrorBlock()); DataBlock data = errorBlock.getDataBlock(); @@ -172,7 +173,7 @@ public void shouldHandleBooleanInputRef() { }); Mockito.when(_upstreamOperator.nextBlock()) .thenReturn(OperatorTestUtil.block(inputSchema, new Object[]{1, true}, new Object[]{2, false})); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, ref1, 1, 2, _serverAddress); + FilterOperator op = new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, ref1); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -192,7 +193,8 @@ public void shouldHandleAndFilter() { RexExpression.FunctionCall andCall = new RexExpression.FunctionCall(SqlKind.AND, FieldSpec.DataType.BOOLEAN, "AND", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, andCall, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, andCall); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -212,7 +214,8 @@ public void shouldHandleOrFilter() { RexExpression.FunctionCall orCall = new RexExpression.FunctionCall(SqlKind.OR, FieldSpec.DataType.BOOLEAN, "OR", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, orCall, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, orCall); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -234,7 +237,8 @@ public void shouldHandleNotFilter() { RexExpression.FunctionCall notCall = new RexExpression.FunctionCall(SqlKind.NOT, FieldSpec.DataType.BOOLEAN, "NOT", ImmutableList.of(new RexExpression.InputRef(0))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, notCall, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, notCall); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -253,7 +257,8 @@ public void shouldHandleGreaterThanFilter() { RexExpression.FunctionCall greaterThan = new RexExpression.FunctionCall(SqlKind.GREATER_THAN, FieldSpec.DataType.BOOLEAN, "greaterThan", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.InputRef(1))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, greaterThan, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, greaterThan); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -273,7 +278,8 @@ public void shouldHandleBooleanFunction() { new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWith", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, startsWith); TransferableBlock dataBlock = op.getNextBlock(); Assert.assertFalse(dataBlock.isErrorBlock()); List result = dataBlock.getContainer(); @@ -294,6 +300,7 @@ public void shouldThrowOnUnfoundFunction() { new RexExpression.FunctionCall(SqlKind.OTHER, FieldSpec.DataType.BOOLEAN, "startsWithError", ImmutableList.of(new RexExpression.InputRef(0), new RexExpression.Literal(FieldSpec.DataType.STRING, "star"))); - FilterOperator op = new FilterOperator(_upstreamOperator, inputSchema, startsWith, 1, 2, _serverAddress); + FilterOperator op = + new FilterOperator(OperatorTestUtil.getDefaultContext(), _upstreamOperator, inputSchema, startsWith); } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java index c77107a58cd4..c9a48f9ba93b 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/HashJoinOperatorTest.java @@ -96,7 +96,7 @@ public void shouldHandleHashJoinKeyCollisionInnerJoin() { JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); HashJoinOperator joinOnString = - new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnString.nextBlock(); while (result.isNoOpBlock()) { @@ -134,7 +134,7 @@ public void shouldHandleInnerJoinOnInt() { JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); HashJoinOperator joinOnInt = - new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnInt.nextBlock(); while (result.isNoOpBlock()) { result = joinOnInt.nextBlock(); @@ -169,7 +169,7 @@ public void shouldHandleJoinOnEmptySelector() { JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); HashJoinOperator joinOnInt = - new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnInt.nextBlock(); while (result.isNoOpBlock()) { result = joinOnInt.nextBlock(); @@ -210,7 +210,8 @@ public void shouldHandleLeftJoin() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -244,7 +245,8 @@ public void shouldPassLeftTableEOS() { List joinClauses = new ArrayList<>(); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -275,7 +277,8 @@ public void shouldHandleLeftJoinOneToN() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.LEFT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -310,7 +313,8 @@ public void shouldPassRightTableEOS() { JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -348,7 +352,8 @@ public void shouldHandleInequiJoinOnString() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -386,7 +391,8 @@ public void shouldHandleInequiJoinOnInt() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(new ArrayList<>(), new ArrayList<>()), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -421,7 +427,7 @@ public void shouldHandleRightJoin() { JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.RIGHT, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); HashJoinOperator joinOnNum = - new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = joinOnNum.nextBlock(); while (result.isNoOpBlock()) { result = joinOnNum.nextBlock(); @@ -470,7 +476,8 @@ public void shouldHandleSemiJoin() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.SEMI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -509,7 +516,8 @@ public void shouldHandleFullJoin() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.FULL, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -561,7 +569,8 @@ public void shouldHandleAntiJoin() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.ANTI, getJoinKeys(Arrays.asList(1), Arrays.asList(1)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { result = join.nextBlock(); @@ -599,7 +608,8 @@ public void shouldPropagateRightTableError() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -632,7 +642,8 @@ public void shouldPropagateLeftTableError() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); while (result.isNoOpBlock()) { @@ -668,7 +679,8 @@ public void shouldHandleNoOpBlock() { }); JoinNode node = new JoinNode(1, resultSchema, leftSchema, rightSchema, JoinRelType.INNER, getJoinKeys(Arrays.asList(0), Arrays.asList(0)), joinClauses); - HashJoinOperator join = new HashJoinOperator(_leftOperator, _rightOperator, leftSchema, node, 1, 2, _serverAddress); + HashJoinOperator join = + new HashJoinOperator(OperatorTestUtil.getDefaultContext(), _leftOperator, _rightOperator, leftSchema, node); TransferableBlock result = join.nextBlock(); // first no-op consumes first right data block. Assert.assertTrue(result.isNoOpBlock()); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java index a8332f22d222..9d04d07df93d 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LeafStageTransferableBlockOperatorTest.java @@ -75,7 +75,7 @@ public void shouldReturnDataBlockThenMetadataBlock() { List resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new SelectionResultsBlock(schema, Arrays.asList(new Object[]{"foo", 1}, new Object[]{"", 2})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -101,7 +101,7 @@ public void shouldHandleDesiredDataSchemaConversionCorrectly() { new SelectionResultsBlock(resultSchema, Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, desiredSchema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, desiredSchema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -123,7 +123,7 @@ public void shouldHandleCanonicalizationCorrectly() { new SelectionResultsBlock(schema, Arrays.asList(new Object[]{1, 1660000000000L}, new Object[]{0, 1600000000000L})), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -148,7 +148,7 @@ public void shouldReturnMultipleDataBlockThenMetadataBlock() { queryContext), new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock1 = operator.nextBlock(); @@ -178,7 +178,7 @@ public void shouldGetErrorBlockWhenInstanceResponseContainsError() { errorBlock, new InstanceResponseBlock(new SelectionResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -199,7 +199,7 @@ public void shouldReorderWhenQueryContextAskForNotInOrderGroupByAsDistinct() { new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, Arrays.asList(new Record(new Object[]{1, "foo"}), new Record(new Object[]{2, "bar"})))), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -220,7 +220,7 @@ public void shouldParsedBlocksSuccessfullyWithDistinctQuery() { new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(schema, Arrays.asList(new Record(new Object[]{"foo", 1}), new Record(new Object[]{"bar", 2})))), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -244,7 +244,7 @@ public void shouldReorderWhenQueryContextAskForGroupByOutOfOrder() { List resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -267,7 +267,7 @@ public void shouldNotErrorOutWhenQueryContextAskForGroupByOutOfOrderWithHaving() List resultsBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(schema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -286,7 +286,7 @@ public void shouldNotErrorOutWhenDealingWithAggregationResults() { List resultsBlockList = Collections.singletonList(new InstanceResponseBlock( new AggregationResultsBlock(queryContext.getAggregationFunctions(), Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(resultsBlockList, schema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), resultsBlockList, schema); // When: TransferableBlock resultBlock = operator.nextBlock(); @@ -308,7 +308,7 @@ public void shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsSelecti List responseBlockList = Collections.singletonList( new InstanceResponseBlock(new SelectionResultsBlock(resultSchema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -331,7 +331,7 @@ public void shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsDistinc new DistinctResultsBlock(mock(DistinctAggregationFunction.class), new DistinctTable(resultSchema, Collections.emptyList())), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: @@ -353,7 +353,7 @@ public void shouldNotErrorOutWhenIncorrectDataSchemaProvidedWithEmptyRowsGroupBy List responseBlockList = Collections.singletonList( new InstanceResponseBlock(new GroupByResultsBlock(resultSchema, Collections.emptyList()), queryContext)); LeafStageTransferableBlockOperator operator = - new LeafStageTransferableBlockOperator(responseBlockList, desiredSchema, 1, 2, _serverAddress); + new LeafStageTransferableBlockOperator(OperatorTestUtil.getDefaultContext(), responseBlockList, desiredSchema); TransferableBlock resultBlock = operator.nextBlock(); // Then: diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java index 749431e090ac..8276c647aacd 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/LiteralValueOperatorTest.java @@ -66,7 +66,7 @@ public void shouldReturnLiteralBlock() { List> literals = ImmutableList.of( ImmutableList.of(new RexExpression.Literal(DataType.STRING, "foo"), new RexExpression.Literal(DataType.INT, 1)), ImmutableList.of(new RexExpression.Literal(DataType.STRING, ""), new RexExpression.Literal(DataType.INT, 2))); - LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress); + LiteralValueOperator operator = new LiteralValueOperator(OperatorTestUtil.getDefaultContext(), schema, literals); // When: TransferableBlock transferableBlock = operator.nextBlock(); @@ -82,7 +82,7 @@ public void shouldHandleEmptyLiteralRows() { // Given: DataSchema schema = new DataSchema(new String[]{}, new ColumnDataType[]{}); List> literals = ImmutableList.of(ImmutableList.of()); - LiteralValueOperator operator = new LiteralValueOperator(schema, literals, 1, 2, _serverAddress); + LiteralValueOperator operator = new LiteralValueOperator(OperatorTestUtil.getDefaultContext(), schema, literals); // When: TransferableBlock transferableBlock = operator.nextBlock(); diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java index d325612ff140..ebd712a0c452 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxReceiveOperatorTest.java @@ -20,6 +20,7 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.MetadataBlock; @@ -32,6 +33,7 @@ import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -79,9 +81,11 @@ public void tearDown() public void shouldTimeoutOnExtraLongSleep() throws InterruptedException { // shorter timeoutMs should result in error. + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 10L, 10L, + new HashMap<>()); MailboxReceiveOperator receiveOp = - new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, _testAddr, 456, - 789, DEFAULT_RECEIVER_STAGE_ID, 10L); + new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 10L); Thread.sleep(200L); TransferableBlock mailbox = receiveOp.nextBlock(); Assert.assertTrue(mailbox.isErrorBlock()); @@ -89,15 +93,15 @@ public void shouldTimeoutOnExtraLongSleep() Assert.assertTrue(errorBlock.getExceptions().containsKey(QueryException.EXECUTION_TIMEOUT_ERROR_CODE)); // longer timeout or default timeout (10s) doesn't result in error. - receiveOp = - new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, _testAddr, 456, - 789, DEFAULT_RECEIVER_STAGE_ID, 2000L); + context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, 2000L, 2000L, + new HashMap<>()); + receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, 2000L); Thread.sleep(200L); mailbox = receiveOp.nextBlock(); Assert.assertFalse(mailbox.isErrorBlock()); - receiveOp = - new MailboxReceiveOperator(_mailboxService, new ArrayList<>(), RelDistribution.Type.SINGLETON, _testAddr, 456, - 789, DEFAULT_RECEIVER_STAGE_ID, null); + context = new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + receiveOp = new MailboxReceiveOperator(context, new ArrayList<>(), RelDistribution.Type.SINGLETON, 456, 789, null); Thread.sleep(200L); mailbox = receiveOp.nextBlock(); Assert.assertFalse(mailbox.isErrorBlock()); @@ -116,8 +120,12 @@ public void shouldThrowReceiveSingletonFromMultiMatchMailboxServer() { Mockito.when(_server2.getHostname()).thenReturn("singleton"); Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, _testAddr, 456, 789, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, 456, + 789, null); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*RANGE_DISTRIBUTED.*") @@ -131,8 +139,11 @@ public void shouldThrowRangeDistributionNotSupported() { Mockito.when(_server2.getHostname()).thenReturn("singleton"); Mockito.when(_server2.getQueryMailboxPort()).thenReturn(123); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.RANGE_DISTRIBUTED, _testAddr, 456, 789, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, 1, DEFAULT_RECEIVER_STAGE_ID, _testAddr, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), + RelDistribution.Type.RANGE_DISTRIBUTED, 456, 789, null); } @Test @@ -156,8 +167,13 @@ public void shouldReceiveSingletonNoMatchMailboxServer() { String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); // Receive end of stream block directly when there is no match. Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); @@ -184,14 +200,19 @@ public void shouldReceiveSingletonCloseMailbox() { String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(serverHost, server2port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(serverHost, server2port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(true); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); + // Receive end of stream block directly when mailbox is close. Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); } @@ -218,16 +239,19 @@ public void shouldReceiveSingletonNullMailbox() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(serverHost, server2port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(serverHost, server2port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); // Receive null mailbox during timeout. Mockito.when(_mailbox.receive()).thenReturn(null); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); // Receive NoOpBlock. Assert.assertTrue(receiveOp.nextBlock().isNoOpBlock()); } @@ -254,15 +278,18 @@ public void shouldReceiveEosDirectlyFromSender() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(serverHost, server2port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(serverHost, server2port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); // Receive EosBloc. Assert.assertTrue(receiveOp.nextBlock().isEndOfStreamBlock()); } @@ -289,17 +316,20 @@ public void shouldReceiveSingletonMailbox() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(serverHost, server2port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(serverHost, server2port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Object[] expRow = new Object[]{1, 1}; DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); Mockito.when(_mailbox.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); TransferableBlock receivedBlock = receiveOp.nextBlock(); List resultRows = receivedBlock.getContainer(); Assert.assertEquals(resultRows.size(), 1); @@ -328,16 +358,19 @@ public void shouldReceiveSingletonErrorMailbox() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(serverHost, server2port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(serverHost, server2port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Exception e = new Exception("errorBlock"); Mockito.when(_mailbox.receive()).thenReturn(TransferableBlockUtils.getErrorTransferableBlock(e)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.SINGLETON, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.SINGLETON, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); TransferableBlock receivedBlock = receiveOp.nextBlock(); Assert.assertTrue(receivedBlock.isErrorBlock()); MetadataBlock error = (MetadataBlock) receivedBlock.getDataBlock(); @@ -363,24 +396,25 @@ public void shouldReceiveMailboxFromTwoServersOneClose() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId1 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server1Host, server1Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(true); - JsonMailboxIdentifier expectedMailboxId2 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server2Host, server2Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2); Mockito.when(_mailbox2.isClosed()).thenReturn(false); Object[] expRow = new Object[]{1, 1}; DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); TransferableBlock receivedBlock = receiveOp.nextBlock(); List resultRows = receivedBlock.getContainer(); Assert.assertEquals(resultRows.size(), 1); @@ -406,25 +440,26 @@ public void shouldReceiveMailboxFromTwoServersOneNull() String toHost = "toHost"; VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); - JsonMailboxIdentifier expectedMailboxId1 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server1Host, server1Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Mockito.when(_mailbox.receive()).thenReturn(null); - JsonMailboxIdentifier expectedMailboxId2 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server2Host, server2Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2); Mockito.when(_mailbox2.isClosed()).thenReturn(false); Object[] expRow = new Object[]{1, 1}; DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); TransferableBlock receivedBlock = receiveOp.nextBlock(); List resultRows = receivedBlock.getContainer(); Assert.assertEquals(resultRows.size(), 1); @@ -451,10 +486,8 @@ public void shouldReceiveMailboxFromTwoServers() VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); - JsonMailboxIdentifier expectedMailboxId1 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server1Host, server1Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Object[] expRow1 = new Object[]{1, 1}; @@ -464,15 +497,18 @@ public void shouldReceiveMailboxFromTwoServers() TransferableBlockUtils.getEndOfStreamTransferableBlock()); Object[] expRow3 = new Object[]{3, 3}; - JsonMailboxIdentifier expectedMailboxId2 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server2Host, server2Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2); Mockito.when(_mailbox2.isClosed()).thenReturn(false); Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); // Receive first block from first server. TransferableBlock receivedBlock = receiveOp.nextBlock(); List resultRows = receivedBlock.getContainer(); @@ -511,25 +547,26 @@ public void shouldGetReceptionReceiveErrorMailbox() VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); - JsonMailboxIdentifier expectedMailboxId1 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server1Host, server1Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Mockito.when(_mailbox.receive()) .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("mailboxError"))); Object[] expRow3 = new Object[]{3, 3}; - JsonMailboxIdentifier expectedMailboxId2 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server2Host, server2Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2); Mockito.when(_mailbox2.isClosed()).thenReturn(false); Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); // Receive error block from first server. TransferableBlock receivedBlock = receiveOp.nextBlock(); Assert.assertTrue(receivedBlock.isErrorBlock()); @@ -557,24 +594,25 @@ public void shouldThrowReceiveWhenOneServerReceiveThrowException() VirtualServerAddress toAddress = new VirtualServerAddress(toHost, toPort, 0); DataSchema inSchema = new DataSchema(new String[]{"col1", "col2"}, new DataSchema.ColumnDataType[]{INT, INT}); - JsonMailboxIdentifier expectedMailboxId1 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server1Host, server1Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId1 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server1Host, server1Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId1)).thenReturn(_mailbox); Mockito.when(_mailbox.isClosed()).thenReturn(false); Mockito.when(_mailbox.receive()).thenThrow(new Exception("mailboxError")); Object[] expRow3 = new Object[]{3, 3}; - JsonMailboxIdentifier expectedMailboxId2 = - new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), - new VirtualServerAddress(server2Host, server2Port, 0), - toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); + JsonMailboxIdentifier expectedMailboxId2 = new JsonMailboxIdentifier(String.format("%s_%s", jobId, stageId), + new VirtualServerAddress(server2Host, server2Port, 0), toAddress, stageId, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_mailboxService.getReceivingMailbox(expectedMailboxId2)).thenReturn(_mailbox2); Mockito.when(_mailbox2.isClosed()).thenReturn(false); Mockito.when(_mailbox2.receive()).thenReturn(OperatorTestUtil.block(inSchema, expRow3)); - MailboxReceiveOperator receiveOp = new MailboxReceiveOperator(_mailboxService, ImmutableList.of(_server1, _server2), - RelDistribution.Type.HASH_DISTRIBUTED, toAddress, jobId, stageId, DEFAULT_RECEIVER_STAGE_ID, null); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, jobId, DEFAULT_RECEIVER_STAGE_ID, toAddress, Long.MAX_VALUE, + Long.MAX_VALUE, new HashMap<>()); + + MailboxReceiveOperator receiveOp = + new MailboxReceiveOperator(context, ImmutableList.of(_server1, _server2), RelDistribution.Type.HASH_DISTRIBUTED, + stageId, DEFAULT_RECEIVER_STAGE_ID, null); TransferableBlock receivedBlock = receiveOp.nextBlock(); Assert.assertTrue(receivedBlock.isErrorBlock(), "server-1 should have returned an error-block"); } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java index ea7df4982996..13735ed883d7 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/MailboxSendOperatorTest.java @@ -20,17 +20,21 @@ import com.google.common.collect.ImmutableList; import java.util.Arrays; +import java.util.Collections; +import java.util.Map; import org.apache.calcite.rel.RelDistribution; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.mailbox.JsonMailboxIdentifier; import org.apache.pinot.query.mailbox.MailboxService; +import org.apache.pinot.query.planner.StageMetadata; import org.apache.pinot.query.planner.partitioning.KeySelector; import org.apache.pinot.query.routing.VirtualServer; import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.query.runtime.operator.exchange.BlockExchange; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.mockito.ArgumentCaptor; import org.mockito.Mock; import org.mockito.Mockito; @@ -65,8 +69,7 @@ public class MailboxSendOperatorTest { public void setUp() { _mocks = MockitoAnnotations.openMocks(this); Mockito.when(_exchangeFactory.build(Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), Mockito.any(), - Mockito.anyLong())) - .thenReturn(_exchange); + Mockito.anyLong())).thenReturn(_exchange); Mockito.when(_server.getHostname()).thenReturn("mock"); Mockito.when(_server.getQueryMailboxPort()).thenReturn(0); @@ -84,11 +87,13 @@ public void shouldSwallowNoOpBlockFromUpstream() throws Exception { long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), - RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server), deadlineMs); + OpChainExecutionContext context = getOpChainContext(deadlineMs); + + MailboxSendOperator operator = + new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, + server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID); + Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); // When: @@ -104,11 +109,12 @@ public void shouldSendErrorBlock() throws Exception { long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), - RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server), deadlineMs); + OpChainExecutionContext context = getOpChainContext(deadlineMs); + + MailboxSendOperator operator = + new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, + server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID); TransferableBlock errorBlock = TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!")); Mockito.when(_input.nextBlock()).thenReturn(errorBlock); @@ -125,11 +131,12 @@ public void shouldSendErrorBlockWhenInputThrows() throws Exception { long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), - RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server), deadlineMs); + OpChainExecutionContext context = getOpChainContext(deadlineMs); + + MailboxSendOperator operator = + new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, + server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID); Mockito.when(_input.nextBlock()).thenThrow(new RuntimeException("foo!")); ArgumentCaptor captor = ArgumentCaptor.forClass(TransferableBlock.class); @@ -147,11 +154,13 @@ public void shouldSendEosBlock() throws Exception { long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), - RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server), deadlineMs); + OpChainExecutionContext context = getOpChainContext(deadlineMs); + + MailboxSendOperator operator = + new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, + server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID); + TransferableBlock eosBlock = TransferableBlockUtils.getEndOfStreamTransferableBlock(); Mockito.when(_input.nextBlock()).thenReturn(eosBlock); @@ -168,11 +177,12 @@ public void shouldSendDataBlock() throws Exception { long deadlineMs = System.currentTimeMillis() + 10_000; // Given: - MailboxSendOperator operator = new MailboxSendOperator(_mailboxService, _input, ImmutableList.of(_server), - RelDistribution.Type.HASH_DISTRIBUTED, _selector, - server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, - DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, 1, DEFAULT_SENDER_STAGE_ID, DEFAULT_RECEIVER_STAGE_ID, - new VirtualServerAddress(_server), deadlineMs); + OpChainExecutionContext context = getOpChainContext(deadlineMs); + + MailboxSendOperator operator = + new MailboxSendOperator(context, _input, RelDistribution.Type.HASH_DISTRIBUTED, _selector, + server -> new JsonMailboxIdentifier("123", "0@from:1", "0@to:2", DEFAULT_SENDER_STAGE_ID, + DEFAULT_RECEIVER_STAGE_ID), _exchangeFactory, DEFAULT_RECEIVER_STAGE_ID); TransferableBlock dataBlock = block(new DataSchema(new String[]{}, new DataSchema.ColumnDataType[]{})); Mockito.when(_input.nextBlock()).thenReturn(dataBlock) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); @@ -189,4 +199,14 @@ public void shouldSendDataBlock() private static TransferableBlock block(DataSchema schema, Object[]... rows) { return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW); } + + private OpChainExecutionContext getOpChainContext(long deadlineMs) { + StageMetadata stageMetadata = new StageMetadata(); + stageMetadata.setServerInstances(ImmutableList.of(_server)); + Map stageMetadataMap = Collections.singletonMap(DEFAULT_RECEIVER_STAGE_ID, stageMetadata); + OpChainExecutionContext context = + new OpChainExecutionContext(_mailboxService, 1, DEFAULT_SENDER_STAGE_ID, new VirtualServerAddress(_server), + deadlineMs, deadlineMs, stageMetadataMap); + return context; + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java index 39921e324d92..cd1f3ffe8dd2 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/OperatorTestUtil.java @@ -19,18 +19,20 @@ package org.apache.pinot.query.runtime.operator; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import org.apache.pinot.common.datablock.DataBlock; import org.apache.pinot.common.utils.DataSchema; +import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; +import org.apache.pinot.query.runtime.plan.OpChainExecutionContext; import org.apache.pinot.query.testutils.MockDataBlockOperatorFactory; - public class OperatorTestUtil { // simple key-value collision schema/data test set: "Aa" and "BB" have same hash code in java. - private static final List> SIMPLE_KV_DATA_ROWS = Arrays.asList( - Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}), - Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"})); + private static final List> SIMPLE_KV_DATA_ROWS = + Arrays.asList(Arrays.asList(new Object[]{1, "Aa"}, new Object[]{2, "BB"}, new Object[]{3, "BB"}), + Arrays.asList(new Object[]{1, "AA"}, new Object[]{2, "Aa"})); private static final MockDataBlockOperatorFactory MOCK_OPERATOR_FACTORY; public static final DataSchema SIMPLE_KV_DATA_SCHEMA = new DataSchema(new String[]{"foo", "bar"}, @@ -40,10 +42,8 @@ public class OperatorTestUtil { public static final String OP_2 = "op2"; static { - MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory() - .registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA) - .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA) - .addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0)) + MOCK_OPERATOR_FACTORY = new MockDataBlockOperatorFactory().registerOperator(OP_1, SIMPLE_KV_DATA_SCHEMA) + .registerOperator(OP_2, SIMPLE_KV_DATA_SCHEMA).addRows(OP_1, SIMPLE_KV_DATA_ROWS.get(0)) .addRows(OP_2, SIMPLE_KV_DATA_ROWS.get(1)); } @@ -61,4 +61,16 @@ public static DataSchema getDataSchema(String operatorName) { public static TransferableBlock block(DataSchema schema, Object[]... rows) { return new TransferableBlock(Arrays.asList(rows), schema, DataBlock.Type.ROW); } + + public static OpChainExecutionContext getDefaultContext() { + VirtualServerAddress virtualServerAddress = new VirtualServerAddress("mock", 80, 0); + return new OpChainExecutionContext(null, 1, 2, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE, + new HashMap<>()); + } + + public static OpChainExecutionContext getContext(long requestId, int stageId, + VirtualServerAddress virtualServerAddress) { + return new OpChainExecutionContext(null, requestId, stageId, virtualServerAddress, Long.MAX_VALUE, Long.MAX_VALUE, + new HashMap<>()); + } } diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java index 04c3b8c700b4..11e525221d18 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/SortOperatorTest.java @@ -69,7 +69,8 @@ public void shouldHandleUpstreamErrorBlock() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()) .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"))); @@ -87,7 +88,8 @@ public void shouldHandleUpstreamNoOpBlock() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()); @@ -104,7 +106,8 @@ public void shouldCreateEmptyBlockOnUpstreamEOS() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -121,7 +124,8 @@ public void shouldConsumeAndSortInputOneBlockWithTwoRows() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -143,7 +147,8 @@ public void shouldConsumeAndSortOnNonZeroIdxCollation() { List collation = collation(1); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"ignored", "sort"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{1, 2}, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -165,7 +170,8 @@ public void shouldConsumeAndSortInputOneBlockWithTwoRowsNonNumeric() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{STRING}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{"b"}, new Object[]{"a"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -187,7 +193,8 @@ public void shouldConsumeAndSortDescending() { List collation = collation(0); List directions = ImmutableList.of(Direction.DESCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -209,7 +216,8 @@ public void shouldOffsetSortInputOneBlockWithThreeRows() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 1, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 1, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -231,7 +239,8 @@ public void shouldOffsetLimitSortInputOneBlockWithThreeRows() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 1, 1, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 1, 1, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -252,7 +261,8 @@ public void shouldRespectDefaultLimit() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 0, 0, schema, 1, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 0, 0, schema, 1); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -273,7 +283,8 @@ public void shouldFetchAllWithNegativeFetch() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, -1, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, -1, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2}, new Object[]{1}, new Object[]{3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); @@ -293,7 +304,8 @@ public void shouldConsumeAndSortTwoInputBlocksWithOneRowEach() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2})) .thenReturn(block(schema, new Object[]{1})) @@ -316,7 +328,8 @@ public void shouldBreakTiesUsingSecondCollationKey() { List collation = collation(0, 1); List directions = ImmutableList.of(Direction.ASCENDING, Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3})) @@ -340,7 +353,8 @@ public void shouldBreakTiesUsingSecondCollationKeyWithDifferentDirection() { List collation = collation(0, 1); List directions = ImmutableList.of(Direction.ASCENDING, Direction.DESCENDING); DataSchema schema = new DataSchema(new String[]{"first", "second"}, new DataSchema.ColumnDataType[]{INT, INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()) .thenReturn(block(schema, new Object[]{1, 2}, new Object[]{1, 1}, new Object[]{1, 3})) @@ -364,7 +378,8 @@ public void shouldHandleNoOpUpstreamBlockWhileConstructing() { List collation = collation(0); List directions = ImmutableList.of(Direction.ASCENDING); DataSchema schema = new DataSchema(new String[]{"sort"}, new DataSchema.ColumnDataType[]{INT}); - SortOperator op = new SortOperator(_input, collation, directions, 10, 0, schema, 1, 2, _serverAddress); + SortOperator op = + new SortOperator(OperatorTestUtil.getDefaultContext(), _input, collation, directions, 10, 0, schema); Mockito.when(_input.nextBlock()).thenReturn(block(schema, new Object[]{2})) .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()).thenReturn(block(schema, new Object[]{1})) diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java index f990882f7426..46279f03a23f 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/TransformOperatorTest.java @@ -26,7 +26,6 @@ import org.apache.pinot.common.exception.QueryException; import org.apache.pinot.common.utils.DataSchema; import org.apache.pinot.query.planner.logical.RexExpression; -import org.apache.pinot.query.routing.VirtualServerAddress; import org.apache.pinot.query.runtime.blocks.TransferableBlock; import org.apache.pinot.query.runtime.blocks.TransferableBlockUtils; import org.apache.pinot.spi.data.FieldSpec; @@ -48,13 +47,9 @@ public class TransformOperatorTest { @Mock private MultiStageOperator _upstreamOp; - @Mock - private VirtualServerAddress _serverAddress; - @BeforeMethod public void setUp() { _mocks = MockitoAnnotations.openMocks(this); - Mockito.when(_serverAddress.toString()).thenReturn(new VirtualServerAddress("mock", 80, 0).toString()); } @AfterMethod @@ -76,8 +71,8 @@ public void shouldHandleRefTransform() { RexExpression.InputRef ref0 = new RexExpression.InputRef(0); RexExpression.InputRef ref1 = new RexExpression.InputRef(1); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema, 1, 2, - _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(ref0, ref1), upStreamSchema); TransferableBlock result = op.nextBlock(); Assert.assertTrue(!result.isErrorBlock()); @@ -101,8 +96,8 @@ public void shouldHandleLiteralTransform() { RexExpression.Literal boolLiteral = new RexExpression.Literal(FieldSpec.DataType.BOOLEAN, true); RexExpression.Literal strLiteral = new RexExpression.Literal(FieldSpec.DataType.STRING, "str"); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, - 2, _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); TransferableBlock result = op.nextBlock(); // Literal operands should just output original literals. Assert.assertTrue(!result.isErrorBlock()); @@ -132,8 +127,8 @@ public void shouldHandlePlusMinusFuncTransform() { DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2, - _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); TransferableBlock result = op.nextBlock(); Assert.assertTrue(!result.isErrorBlock()); List resultRows = result.getContainer(); @@ -161,8 +156,8 @@ public void shouldThrowOnTypeMismatchFuncTransform() { DataSchema resultSchema = new DataSchema(new String[]{"plusR", "minusR"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.DOUBLE, DataSchema.ColumnDataType.DOUBLE}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema, 1, 2, - _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(plus01, minus01), upStreamSchema); TransferableBlock result = op.nextBlock(); Assert.assertTrue(result.isErrorBlock()); @@ -182,8 +177,8 @@ public void shouldPropagateUpstreamError() { DataSchema resultSchema = new DataSchema(new String[]{"inCol", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.INT, DataSchema.ColumnDataType.STRING}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, - 2, _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); TransferableBlock result = op.nextBlock(); Assert.assertTrue(result.isErrorBlock()); DataBlock data = result.getDataBlock(); @@ -206,8 +201,8 @@ public void testNoopBlock() { DataSchema resultSchema = new DataSchema(new String[]{"boolCol", "strCol"}, new DataSchema.ColumnDataType[]{DataSchema.ColumnDataType.BOOLEAN, DataSchema.ColumnDataType.STRING}); TransformOperator op = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema, 1, - 2, _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), + _upstreamOp, resultSchema, ImmutableList.of(boolLiteral, strLiteral), upStreamSchema); TransferableBlock result = op.nextBlock(); // First block has two rows Assert.assertFalse(result.isErrorBlock()); @@ -239,7 +234,8 @@ public void testWrongNumTransform() { DataSchema.ColumnDataType.STRING, DataSchema.ColumnDataType.STRING }); TransformOperator transform = - new TransformOperator(_upstreamOp, resultSchema, new ArrayList<>(), upStreamSchema, 1, 2, _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), _upstreamOp, resultSchema, new ArrayList<>(), + upStreamSchema); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*doesn't match " @@ -252,6 +248,7 @@ public void testMismatchedSchemaOperandSize() { }); RexExpression.InputRef ref0 = new RexExpression.InputRef(0); TransformOperator transform = - new TransformOperator(_upstreamOp, resultSchema, ImmutableList.of(ref0), upStreamSchema, 1, 2, _serverAddress); + new TransformOperator(OperatorTestUtil.getDefaultContext(), _upstreamOp, resultSchema, ImmutableList.of(ref0), + upStreamSchema); } }; diff --git a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java index 42606e5422c6..ea63f8573125 100644 --- a/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java +++ b/pinot-query-runtime/src/test/java/org/apache/pinot/query/runtime/operator/WindowAggregateOperatorTest.java @@ -78,11 +78,12 @@ public void testShouldHandleUpstreamErrorBlocks() { .thenReturn(TransferableBlockUtils.getErrorTransferableBlock(new Exception("foo!"))); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, INT}); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build @@ -101,11 +102,12 @@ public void testShouldHandleEndOfStreamBlockWithNoOtherInputs() { Mockito.when(_input.nextBlock()).thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, INT}); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block = operator.nextBlock(); @@ -126,11 +128,12 @@ public void testShouldHandleUpstreamNoOpBlocksWhileConstructing() { .thenReturn(TransferableBlockUtils.getNoOpTransferableBlock()) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block @@ -156,11 +159,12 @@ public void testShouldHandleUpstreamNoOpBlocksWhileConstructingMultipleRows() { .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{1, 2})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); // build when reading NoOp block @@ -189,11 +193,12 @@ public void testShouldWindowAggregateOverSingleInputBlock() { Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -218,12 +223,13 @@ public void testShouldWindowAggregateOverSingleInputBlockWithSameOrderByKeys() { Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, order, - Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), - calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), - outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, order, + Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), + calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), + outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -246,12 +252,12 @@ public void testShouldWindowAggregateOverSingleInputBlockWithoutPartitionByKeys( Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 1})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, - Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, - _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, + Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -275,11 +281,12 @@ public void testShouldWindowAggregateOverSingleInputBlockWithLiteralInput() { Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, 3})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -309,12 +316,13 @@ public void testShouldCallMergerWhenWindowAggregatingMultipleRows() { AggregationUtils.Merger merger = Mockito.mock(AggregationUtils.Merger.class); Mockito.when(merger.merge(Mockito.any(), Mockito.any())).thenReturn(12d); Mockito.when(merger.initialize(Mockito.any(), Mockito.any())).thenReturn(1d); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, - ImmutableMap.of("SUM", cdt -> merger), 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, + ImmutableMap.of("SUM", cdt -> merger)); // When: TransferableBlock resultBlock = operator.nextBlock(); // (output result) @@ -332,28 +340,27 @@ public void testShouldCallMergerWhenWindowAggregatingMultipleRows() { "Expected three columns (original two columns, agg literal value)"); } - @Test public void testPartitionByWindowAggregateWithHashCollision() { MultiStageOperator upstreamOperator = OperatorTestUtil.getOperator(OperatorTestUtil.OP_1); // Create an aggregation call with sum for first column and group by second column. RexExpression.FunctionCall agg = getSum(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, INT}); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, INT, DOUBLE}); WindowAggregateOperator sum0PartitionBy1 = - new WindowAggregateOperator(upstreamOperator, Arrays.asList(new RexExpression.InputRef(1)), - Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Arrays.asList(agg), - Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, - inSchema, 1, 2, _serverAddress); + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), upstreamOperator, + Arrays.asList(new RexExpression.InputRef(1)), Collections.emptyList(), Collections.emptyList(), + Collections.emptyList(), Arrays.asList(agg), Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); TransferableBlock result = sum0PartitionBy1.getNextBlock(); while (result.isNoOpBlock()) { result = sum0PartitionBy1.getNextBlock(); } List resultRows = result.getContainer(); - List expectedRows = Arrays.asList(new Object[]{2, "BB", 5.0}, new Object[]{3, "BB", 5.0}, - new Object[]{1, "Aa", 1}); + List expectedRows = + Arrays.asList(new Object[]{2, "BB", 5.0}, new Object[]{3, "BB", 5.0}, new Object[]{1, "Aa", 1}); Assert.assertEquals(resultRows.size(), expectedRows.size()); Assert.assertEquals(resultRows.get(0), expectedRows.get(0)); Assert.assertEquals(resultRows.get(1), expectedRows.get(1)); @@ -371,9 +378,10 @@ public void testShouldThrowOnUnknownAggFunction() { DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new DataSchema.ColumnDataType[]{DOUBLE}); // When: - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = ".*Unexpected aggregation " @@ -388,9 +396,10 @@ public void testShouldThrowOnUnknownRankAggFunction() { DataSchema inSchema = new DataSchema(new String[]{"unknown"}, new DataSchema.ColumnDataType[]{DOUBLE}); // When: - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Order by is not yet " @@ -403,16 +412,16 @@ public void testShouldThrowOnNonEmptyOrderByKeysNotMatchingPartitionByKeys() { List order = ImmutableList.of(new RexExpression.InputRef(1)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, order, - Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), - calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), - outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, order, + Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), + calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), + outSchema, inSchema); } @Test @@ -426,18 +435,18 @@ public void testShouldThrowOnNonEmptyOrderByKeysMatchingPartitionByKeysWithDiffe List order = ImmutableList.of(new RexExpression.InputRef(1)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "bar"})) .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{3, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, order, - Arrays.asList(RelFieldCollation.Direction.DESCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), - calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), - outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, order, + Arrays.asList(RelFieldCollation.Direction.DESCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), + calls, Integer.MIN_VALUE, Integer.MAX_VALUE, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), + outSchema, inSchema); // When: TransferableBlock resultBlock = operator.nextBlock(); // (output result) @@ -460,16 +469,15 @@ public void testShouldThrowOnCustomFramesRows() { List group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.ROW, Collections.emptyList(), outSchema, inSchema, 1, 2, - _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.ROW, Collections.emptyList(), outSchema, inSchema); } @Test @@ -480,16 +488,16 @@ public void testShouldNotThrowCurrentRowPartitionByOrderByOnSameKey() { List order = ImmutableList.of(new RexExpression.InputRef(1)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, order, - Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), - calls, Integer.MIN_VALUE, 0, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, - inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, order, + Arrays.asList(RelFieldCollation.Direction.ASCENDING), Arrays.asList(RelFieldCollation.NullDirection.LAST), + calls, Integer.MIN_VALUE, 0, WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, + inSchema); // When: TransferableBlock block1 = operator.nextBlock(); @@ -512,16 +520,15 @@ public void testShouldThrowOnCustomFramesCustomPreceding() { List group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, 5, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, - _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, 5, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); } @Test(expectedExceptions = IllegalStateException.class, expectedExceptionsMessageRegExp = "Only default frame is " @@ -533,15 +540,15 @@ public void testShouldThrowOnCustomFramesCustomFollowing() { List group = ImmutableList.of(new RexExpression.InputRef(0)); DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "foo"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, 5, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, 5, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); } @Test @@ -553,16 +560,16 @@ public void testShouldReturnErrorBlockOnUnexpectedInputType() { DataSchema inSchema = new DataSchema(new String[]{"group", "arg"}, new DataSchema.ColumnDataType[]{INT, STRING}); // TODO: it is necessary to produce two values here, the operator only throws on second // (see the comment in WindowAggregate operator) - Mockito.when(_input.nextBlock()) - .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "metallica"})) + Mockito.when(_input.nextBlock()).thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "metallica"})) .thenReturn(OperatorTestUtil.block(inSchema, new Object[]{2, "pink floyd"})) .thenReturn(TransferableBlockUtils.getEndOfStreamTransferableBlock()); - DataSchema outSchema = new DataSchema(new String[]{"group", "arg", "sum"}, - new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); - WindowAggregateOperator operator = new WindowAggregateOperator(_input, group, Collections.emptyList(), - Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, - WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema, 1, 2, _serverAddress); + DataSchema outSchema = + new DataSchema(new String[]{"group", "arg", "sum"}, new DataSchema.ColumnDataType[]{INT, STRING, DOUBLE}); + WindowAggregateOperator operator = + new WindowAggregateOperator(OperatorTestUtil.getDefaultContext(), _input, group, Collections.emptyList(), + Collections.emptyList(), Collections.emptyList(), calls, Integer.MIN_VALUE, Integer.MAX_VALUE, + WindowNode.WindowFrameType.RANGE, Collections.emptyList(), outSchema, inSchema); // When: TransferableBlock block = operator.nextBlock();