Skip to content

Commit

Permalink
HIVE-28490: Stop removing retainable DPP sources (Seonggon Namgung, r…
Browse files Browse the repository at this point in the history
…eviewed by Denys Kuzmenko)

Closes #5425
  • Loading branch information
ngsg authored Nov 19, 2024
1 parent d85c239 commit afe05b9
Show file tree
Hide file tree
Showing 3 changed files with 1,532 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.IdentityHashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
Expand Down Expand Up @@ -1195,7 +1196,7 @@ private static boolean areMergeableExtendedCheck(ParseContext pctx, SharedWorkOp
Operator<?> op = dppsOp1.get(i);
if (op instanceof ReduceSinkOperator) {
Set<Operator<?>> ascendants =
findAscendantWorkOperators(pctx, optimizerCache, op);
findAscendantOperators(optimizerCache, op);
if (ascendants.contains(tsOp2)) {
// This should not happen, we cannot merge
return false;
Expand All @@ -1206,7 +1207,7 @@ private static boolean areMergeableExtendedCheck(ParseContext pctx, SharedWorkOp
Operator<?> op = dppsOp2.get(i);
if (op instanceof ReduceSinkOperator) {
Set<Operator<?>> ascendants =
findAscendantWorkOperators(pctx, optimizerCache, op);
findAscendantOperators(optimizerCache, op);
if (ascendants.contains(tsOp1)) {
// This should not happen, we cannot merge
return false;
Expand Down Expand Up @@ -1633,8 +1634,7 @@ private static Set<Operator<?>> gatherDPPBranchOps(ParseContext pctx,
Collection<Operator<?>> c = optimizerCache.tableScanToDPPSource
.get((TableScanOperator) op);
for (Operator<?> dppSource : c) {
Set<Operator<?>> ascendants =
findAscendantWorkOperators(pctx, optimizerCache, dppSource);
Set<Operator<?>> ascendants = findAscendantOperators(optimizerCache, dppSource);
if (!Collections.disjoint(ascendants, discardedOps)) {
// Remove branch
removeBranch(dppSource, dppBranches, ops, optimizerCache);
Expand Down Expand Up @@ -1938,97 +1938,28 @@ public boolean accept(Operator<?> s, Operator<?> t, OpEdge opEdge) {

}

private static Set<Operator<?>> findParentWorkOperators(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache, Operator<?> start) {
return findParentWorkOperators(pctx, optimizerCache, start, ImmutableSet.of());
}

private static Set<Operator<?>> findParentWorkOperators(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache, Operator<?> start,
Set<Operator<?>> excludeOps) {
// Find operators in work
Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
// Gather input works operators
Set<Operator<?>> set = new HashSet<Operator<?>>();
for (Operator<?> op : workOps) {
if (op.getParentOperators() != null) {
for (Operator<?> parent : op.getParentOperators()) {
if (parent instanceof ReduceSinkOperator && !excludeOps.contains(parent)) {
set.addAll(findWorkOperators(optimizerCache, parent));
}
}
}
if (op instanceof TableScanOperator) {
// Check for DPP and semijoin DPP
for (Operator<?> parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) {
if (!excludeOps.contains(parent)) {
set.addAll(findWorkOperators(optimizerCache, parent));
}
}
}
}
return set;
}

private static Set<Operator<?>> findAscendantWorkOperators(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache, Operator<?> start) {
// Find operators in work
Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
// Gather input works operators
Set<Operator<?>> result = new HashSet<Operator<?>>();
Set<Operator<?>> set;
while (!workOps.isEmpty()) {
set = new HashSet<Operator<?>>();
for (Operator<?> op : workOps) {
private static Set<Operator<?>> findAscendantOperators(SharedWorkOptimizerCache optimizerCache,
Operator<?> start) {
Set<Operator<?>> visited = new HashSet<>();
visited.add(start);

// Gather input operators
Queue<Operator<?>> remaining = new LinkedList<>(start.getParentOperators());
while (!remaining.isEmpty()) {
Operator<?> op = remaining.poll();
if (!visited.contains(op)) {
visited.add(op);
if (op.getParentOperators() != null) {
for (Operator<?> parent : op.getParentOperators()) {
if (parent instanceof ReduceSinkOperator) {
set.addAll(findWorkOperators(optimizerCache, parent));
}
}
} else if (op instanceof TableScanOperator) {
remaining.addAll(op.getParentOperators());
}
if (op instanceof TableScanOperator) {
// Check for DPP and semijoin DPP
for (Operator<?> parent : optimizerCache.tableScanToDPPSource.get((TableScanOperator) op)) {
set.addAll(findWorkOperators(optimizerCache, parent));
}
remaining.addAll(optimizerCache.tableScanToDPPSource.get((TableScanOperator) op));
}
}
workOps = set;
result.addAll(set);
}
return result;
}

private static Set<Operator<?>> findChildWorkOperators(ParseContext pctx,
SharedWorkOptimizerCache optimizerCache, Operator<?> start, boolean traverseEventOperators) {
// Find operators in work
Set<Operator<?>> workOps = findWorkOperators(optimizerCache, start);
// Gather output works operators
Set<Operator<?>> set = new HashSet<Operator<?>>();
for (Operator<?> op : workOps) {
if (op instanceof ReduceSinkOperator) {
if (op.getChildOperators() != null) {
// All children of RS are descendants
for (Operator<?> child : op.getChildOperators()) {
set.addAll(findWorkOperators(optimizerCache, child));
}
}
// Semijoin DPP work is considered a child because work needs
// to finish for it to execute
SemiJoinBranchInfo sjbi = pctx.getRsToSemiJoinBranchInfo().get(op);
if (sjbi != null) {
set.addAll(findWorkOperators(optimizerCache, sjbi.getTsOp()));
}
} else if(op.getConf() instanceof DynamicPruningEventDesc) {
// DPP work is considered a child because work needs
// to finish for it to execute
if (traverseEventOperators) {
set.addAll(findWorkOperators(
optimizerCache, ((DynamicPruningEventDesc) op.getConf()).getTableScan()));
}
}
}
return set;
return visited;
}

private static Set<Operator<?>> findDescendantWorkOperators(ParseContext pctx,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
create table x2_date_dim (d_date_sk bigint, d_week_seq string, d_date string);
create table x2_item (i_item_sk bigint, i_item_id string);
create table x2_store_returns
(sr_returned_date_sk bigint, sr_item_sk bigint, sr_return_quantity int, sr_some_field string, sr_other_field string);
create table x2_catalog_returns
(cr_returned_date_sk bigint, cr_item_sk bigint, cr_return_quantity int, cr_some_field string, cr_other_field string);

alter table x2_date_dim update statistics set('numRows'='35', 'rawDataSize'='81449');
alter table x2_item update statistics set('numRows'='12345', 'rawDataSize'='123456');
alter table x2_store_returns update statistics set('numRows'='123456', 'rawDataSize'='1234567');
alter table x2_catalog_returns update statistics set('numRows'='123456', 'rawDataSize'='1234567');

set hive.auto.convert.join=true;
set hive.tez.dynamic.semijoin.reduction=true;
set hive.tez.bigtable.minsize.semijoin.reduction=30; -- This should be less than numRows of x2_date_dim
set hive.tez.dynamic.semijoin.reduction.threshold=0.0; -- In order not to remove any SemiJoin branch
set hive.tez.dynamic.semijoin.reduction.for.mapjoin=true; -- In order not to remove any SemiJoin branch

-- To check whether the original query plan contains the following pattern:
-- date_dim ─┐
-- date_dim ─┴ MapJoin ─(DPP)─ date_dim ─ (... catalog_returns)
-- date_dim ─┐
-- date_dim ─┴ MapJoin ─(DPP)─ date_dim ─ (... store_returns)

set hive.optimize.shared.work=false;
explain
with sr_items as (
select i_item_id item_id, sum(sr_return_quantity) sr_item_qty
from x2_store_returns, x2_item, x2_date_dim
where
sr_item_sk = i_item_sk and
d_date in (
select d_date from x2_date_dim
where d_week_seq in (
select d_week_seq from x2_date_dim where d_date in ('1998-01-02','1998-10-15','1998-11-10'))) and
sr_returned_date_sk = d_date_sk group by i_item_id
),
cr_items as (
select i_item_id item_id, sum(cr_return_quantity) cr_item_qty
from x2_catalog_returns, x2_item, x2_date_dim
where
cr_item_sk = i_item_sk and
d_date in (
select d_date from x2_date_dim
where d_week_seq in (
select d_week_seq from x2_date_dim where d_date in ('1998-01-02','1998-10-15','1998-11-10'))) and
cr_returned_date_sk = d_date_sk group by i_item_id
)
select sr_items.item_id, sr_item_qty, cr_item_qty
from sr_items, cr_items
where sr_items.item_id=cr_items.item_id;

set hive.optimize.shared.work=true;
explain
with sr_items as (
select i_item_id item_id, sum(sr_return_quantity) sr_item_qty
from x2_store_returns, x2_item, x2_date_dim
where
sr_item_sk = i_item_sk and
d_date in (
select d_date from x2_date_dim
where d_week_seq in (
select d_week_seq from x2_date_dim where d_date in ('1998-01-02','1998-10-15','1998-11-10'))) and
sr_returned_date_sk = d_date_sk group by i_item_id
),
cr_items as (
select i_item_id item_id, sum(cr_return_quantity) cr_item_qty
from x2_catalog_returns, x2_item, x2_date_dim
where
cr_item_sk = i_item_sk and
d_date in (
select d_date from x2_date_dim
where d_week_seq in (
select d_week_seq from x2_date_dim where d_date in ('1998-01-02','1998-10-15','1998-11-10'))) and
cr_returned_date_sk = d_date_sk group by i_item_id
)
select sr_items.item_id, sr_item_qty, cr_item_qty
from sr_items, cr_items
where sr_items.item_id=cr_items.item_id;
Loading

0 comments on commit afe05b9

Please sign in to comment.