Skip to content

Commit

Permalink
Planner: support Join (#5320)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Jul 25, 2022
1 parent 42b4055 commit a528f48
Show file tree
Hide file tree
Showing 13 changed files with 631 additions and 46 deletions.
39 changes: 36 additions & 3 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <Flash/Planner/plans/PhysicalExchangeReceiver.h>
#include <Flash/Planner/plans/PhysicalExchangeSender.h>
#include <Flash/Planner/plans/PhysicalFilter.h>
#include <Flash/Planner/plans/PhysicalJoin.h>
#include <Flash/Planner/plans/PhysicalLimit.h>
#include <Flash/Planner/plans/PhysicalMockExchangeReceiver.h>
#include <Flash/Planner/plans/PhysicalMockExchangeSender.h>
Expand Down Expand Up @@ -117,6 +118,33 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
dagContext().table_scan_executor_id = executor_id;
break;
}
case tipb::ExecType::TypeJoin:
{
auto right = popBack();
auto left = popBack();

/// Both sides of the join need to have non-root-final-projection to ensure that
/// there are no duplicate columns in the blocks on the build and probe sides.

/// After DAGQueryBlock removed, `dagContext().isTest() && right->tp() != PlanType::Source`
/// and `dagContext().isTest() && right->tp() != PlanType::Source` will be removed.
if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(right);
buildFinalProjection(fmt::format("{}_r_", executor_id), false);
right = popBack();
}

if (dagContext().isTest() && right->tp() != PlanType::Source)
{
pushBack(left);
buildFinalProjection(fmt::format("{}_l_", executor_id), false);
left = popBack();
}

pushBack(PhysicalJoin::build(context, executor_id, log, executor->join(), left, right));
break;
}
default:
throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented);
}
Expand Down Expand Up @@ -162,23 +190,28 @@ PhysicalPlanNodePtr PhysicalPlan::popBack()
return back;
}

void PhysicalPlan::buildSource(const BlockInputStreams & source_streams)
void PhysicalPlan::buildSource(const String & executor_id, const BlockInputStreams & source_streams)
{
pushBack(PhysicalSource::build(source_streams, log));
pushBack(PhysicalSource::build(executor_id, source_streams, log));
}

void PhysicalPlan::outputAndOptimize()
{
RUNTIME_ASSERT(!root_node, log, "root_node shoud be nullptr before `outputAndOptimize`");
RUNTIME_ASSERT(cur_plan_nodes.size() == 1, log, "There can only be one plan node output, but here are {}", cur_plan_nodes.size());
root_node = popBack();

root_node = popBack();
LOG_FMT_DEBUG(
log,
"build unoptimized physical plan: \n{}",
toString());

root_node = optimize(context, root_node);
LOG_FMT_DEBUG(
log,
"build optimized physical plan: \n{}",
toString());

RUNTIME_ASSERT(root_node, log, "root_node shoudn't be nullptr after `outputAndOptimize`");
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ class PhysicalPlan

void build(const String & executor_id, const tipb::Executor * executor);

void buildSource(const BlockInputStreams & source_streams);
void buildSource(const String & executor_id, const BlockInputStreams & source_streams);

void buildFinalProjection(const String & column_prefix, bool is_root);

Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ String PlanType::toString() const
return "TableScan";
case MockTableScan:
return "MockTableScan";
case Join:
return "Join";
default:
throw TiFlashException("Unknown PlanType", Errors::Planner::Internal);
}
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ struct PlanType
Projection = 9,
TableScan = 10,
MockTableScan = 11,
Join = 12,
};
PlanTypeEnum enum_value;

Expand Down
14 changes: 6 additions & 8 deletions dbms/src/Flash/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ bool Planner::isSupported(const DAGQueryBlock & query_block)
return query_block.source
&& (query_block.source->tp() == tipb::ExecType::TypeProjection
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver
|| query_block.source->tp() == tipb::ExecType::TypeJoin
|| query_block.isTableScanSource());
};
return has_supported_source(query_block) && disable_fine_frained_shuffle(query_block);
Expand All @@ -113,21 +114,18 @@ void Planner::restorePipelineConcurrency(DAGPipeline & pipeline)
void Planner::executeImpl(DAGPipeline & pipeline)
{
PhysicalPlan physical_plan{context, log->identifier()};
for (const auto & input_streams : input_streams_vec)
assert(query_block.children.size() == input_streams_vec.size());
for (size_t i = 0; i < input_streams_vec.size(); ++i)
{
RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty");
physical_plan.buildSource(input_streams);
RUNTIME_ASSERT(!input_streams_vec[i].empty(), log, "input streams cannot be empty");
assert(query_block.children[i] && query_block.children[i]->root && query_block.children[i]->root->has_executor_id());
physical_plan.buildSource(query_block.children[i]->root->executor_id(), input_streams_vec[i]);
}

analyzePhysicalPlan(physical_plan, query_block);

physical_plan.outputAndOptimize();

LOG_FMT_DEBUG(
log,
"build physical plan: \n{}",
physical_plan.toString());

physical_plan.transform(pipeline, context, max_streams);
}
} // namespace DB
65 changes: 65 additions & 0 deletions dbms/src/Flash/Planner/plans/PhysicalBinary.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Flash/Planner/PhysicalPlanNode.h>
#include <common/logger_useful.h>

namespace DB
{
/**
* A physical plan node with two children: left and right.
*/
class PhysicalBinary : public PhysicalPlanNode
{
public:
PhysicalBinary(
const String & executor_id_,
const PlanType & type_,
const NamesAndTypes & schema_,
const String & req_id,
const PhysicalPlanNodePtr & left_,
const PhysicalPlanNodePtr & right_)
: PhysicalPlanNode(executor_id_, type_, schema_, req_id)
, left(left_)
, right(right_)
{
RUNTIME_ASSERT(left, log, "children(0) shouldn't be nullptr");
RUNTIME_ASSERT(right, log, "children(1) shouldn't be nullptr");
}

PhysicalPlanNodePtr children(size_t i) const override
{
RUNTIME_ASSERT(i <= 1, log, "child_index({}) should not >= childrenSize({})", i, childrenSize());
return i == 0 ? left : right;
}

void setChild(size_t i, const PhysicalPlanNodePtr & new_child) override
{
RUNTIME_ASSERT(i <= 1, log, "child_index({}) should not >= childrenSize({})", i, childrenSize());
RUNTIME_ASSERT(new_child, log, "new_child for child_index({}) shouldn't be nullptr", i);
RUNTIME_ASSERT(new_child.get() != this, log, "new_child for child_index({}) shouldn't be itself", i);
auto & child = i == 0 ? left : right;
child = new_child;
}

size_t childrenSize() const override { return 2; };

protected:
PhysicalPlanNodePtr left;
PhysicalPlanNodePtr right;
};
} // namespace DB
Loading

0 comments on commit a528f48

Please sign in to comment.