diff --git a/dbms/src/Flash/Planner/PhysicalPlan.cpp b/dbms/src/Flash/Planner/PhysicalPlan.cpp index 745abff296e..c1d0c7e0f7b 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.cpp +++ b/dbms/src/Flash/Planner/PhysicalPlan.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -117,6 +118,33 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec dagContext().table_scan_executor_id = executor_id; break; } + case tipb::ExecType::TypeJoin: + { + auto right = popBack(); + auto left = popBack(); + + /// Both sides of the join need to have non-root-final-projection to ensure that + /// there are no duplicate columns in the blocks on the build and probe sides. + + /// After DAGQueryBlock removed, `dagContext().isTest() && right->tp() != PlanType::Source` + /// and `dagContext().isTest() && right->tp() != PlanType::Source` will be removed. + if (dagContext().isTest() && right->tp() != PlanType::Source) + { + pushBack(right); + buildFinalProjection(fmt::format("{}_r_", executor_id), false); + right = popBack(); + } + + if (dagContext().isTest() && right->tp() != PlanType::Source) + { + pushBack(left); + buildFinalProjection(fmt::format("{}_l_", executor_id), false); + left = popBack(); + } + + pushBack(PhysicalJoin::build(context, executor_id, log, executor->join(), left, right)); + break; + } default: throw TiFlashException(fmt::format("{} executor is not supported", executor->tp()), Errors::Planner::Unimplemented); } @@ -162,23 +190,28 @@ PhysicalPlanNodePtr PhysicalPlan::popBack() return back; } -void PhysicalPlan::buildSource(const BlockInputStreams & source_streams) +void PhysicalPlan::buildSource(const String & executor_id, const BlockInputStreams & source_streams) { - pushBack(PhysicalSource::build(source_streams, log)); + pushBack(PhysicalSource::build(executor_id, source_streams, log)); } void PhysicalPlan::outputAndOptimize() { RUNTIME_ASSERT(!root_node, log, "root_node shoud be nullptr before `outputAndOptimize`"); RUNTIME_ASSERT(cur_plan_nodes.size() == 1, log, "There can only be one plan node output, but here are {}", cur_plan_nodes.size()); - root_node = popBack(); + root_node = popBack(); LOG_FMT_DEBUG( log, "build unoptimized physical plan: \n{}", toString()); root_node = optimize(context, root_node); + LOG_FMT_DEBUG( + log, + "build optimized physical plan: \n{}", + toString()); + RUNTIME_ASSERT(root_node, log, "root_node shoudn't be nullptr after `outputAndOptimize`"); } diff --git a/dbms/src/Flash/Planner/PhysicalPlan.h b/dbms/src/Flash/Planner/PhysicalPlan.h index 0dfdda1b941..00b7669f9e5 100644 --- a/dbms/src/Flash/Planner/PhysicalPlan.h +++ b/dbms/src/Flash/Planner/PhysicalPlan.h @@ -36,7 +36,7 @@ class PhysicalPlan void build(const String & executor_id, const tipb::Executor * executor); - void buildSource(const BlockInputStreams & source_streams); + void buildSource(const String & executor_id, const BlockInputStreams & source_streams); void buildFinalProjection(const String & column_prefix, bool is_root); diff --git a/dbms/src/Flash/Planner/PlanType.cpp b/dbms/src/Flash/Planner/PlanType.cpp index 5759c1a2288..84e56ce1e8d 100644 --- a/dbms/src/Flash/Planner/PlanType.cpp +++ b/dbms/src/Flash/Planner/PlanType.cpp @@ -45,6 +45,8 @@ String PlanType::toString() const return "TableScan"; case MockTableScan: return "MockTableScan"; + case Join: + return "Join"; default: throw TiFlashException("Unknown PlanType", Errors::Planner::Internal); } diff --git a/dbms/src/Flash/Planner/PlanType.h b/dbms/src/Flash/Planner/PlanType.h index 6ef25af94f1..f05d852b08b 100644 --- a/dbms/src/Flash/Planner/PlanType.h +++ b/dbms/src/Flash/Planner/PlanType.h @@ -34,6 +34,7 @@ struct PlanType Projection = 9, TableScan = 10, MockTableScan = 11, + Join = 12, }; PlanTypeEnum enum_value; diff --git a/dbms/src/Flash/Planner/Planner.cpp b/dbms/src/Flash/Planner/Planner.cpp index 1481eadb905..fff63442afe 100644 --- a/dbms/src/Flash/Planner/Planner.cpp +++ b/dbms/src/Flash/Planner/Planner.cpp @@ -94,6 +94,7 @@ bool Planner::isSupported(const DAGQueryBlock & query_block) return query_block.source && (query_block.source->tp() == tipb::ExecType::TypeProjection || query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver + || query_block.source->tp() == tipb::ExecType::TypeJoin || query_block.isTableScanSource()); }; return has_supported_source(query_block) && disable_fine_frained_shuffle(query_block); @@ -113,21 +114,18 @@ void Planner::restorePipelineConcurrency(DAGPipeline & pipeline) void Planner::executeImpl(DAGPipeline & pipeline) { PhysicalPlan physical_plan{context, log->identifier()}; - for (const auto & input_streams : input_streams_vec) + assert(query_block.children.size() == input_streams_vec.size()); + for (size_t i = 0; i < input_streams_vec.size(); ++i) { - RUNTIME_ASSERT(!input_streams.empty(), log, "input streams cannot be empty"); - physical_plan.buildSource(input_streams); + RUNTIME_ASSERT(!input_streams_vec[i].empty(), log, "input streams cannot be empty"); + assert(query_block.children[i] && query_block.children[i]->root && query_block.children[i]->root->has_executor_id()); + physical_plan.buildSource(query_block.children[i]->root->executor_id(), input_streams_vec[i]); } analyzePhysicalPlan(physical_plan, query_block); physical_plan.outputAndOptimize(); - LOG_FMT_DEBUG( - log, - "build physical plan: \n{}", - physical_plan.toString()); - physical_plan.transform(pipeline, context, max_streams); } } // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalBinary.h b/dbms/src/Flash/Planner/plans/PhysicalBinary.h new file mode 100644 index 00000000000..fe9688a1ee1 --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalBinary.h @@ -0,0 +1,65 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include + +namespace DB +{ +/** + * A physical plan node with two children: left and right. + */ +class PhysicalBinary : public PhysicalPlanNode +{ +public: + PhysicalBinary( + const String & executor_id_, + const PlanType & type_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & left_, + const PhysicalPlanNodePtr & right_) + : PhysicalPlanNode(executor_id_, type_, schema_, req_id) + , left(left_) + , right(right_) + { + RUNTIME_ASSERT(left, log, "children(0) shouldn't be nullptr"); + RUNTIME_ASSERT(right, log, "children(1) shouldn't be nullptr"); + } + + PhysicalPlanNodePtr children(size_t i) const override + { + RUNTIME_ASSERT(i <= 1, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); + return i == 0 ? left : right; + } + + void setChild(size_t i, const PhysicalPlanNodePtr & new_child) override + { + RUNTIME_ASSERT(i <= 1, log, "child_index({}) should not >= childrenSize({})", i, childrenSize()); + RUNTIME_ASSERT(new_child, log, "new_child for child_index({}) shouldn't be nullptr", i); + RUNTIME_ASSERT(new_child.get() != this, log, "new_child for child_index({}) shouldn't be itself", i); + auto & child = i == 0 ? left : right; + child = new_child; + } + + size_t childrenSize() const override { return 2; }; + +protected: + PhysicalPlanNodePtr left; + PhysicalPlanNodePtr right; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp b/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp new file mode 100644 index 00000000000..e79fe547dbf --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalJoin.cpp @@ -0,0 +1,271 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace DB +{ +namespace FailPoints +{ +extern const char minimum_block_size_for_cross_join[]; +} // namespace FailPoints + +namespace +{ +void recordJoinExecuteInfo( + DAGContext & dag_context, + const String & executor_id, + const String & build_side_executor_id, + const JoinPtr & join_ptr) +{ + JoinExecuteInfo join_execute_info; + join_execute_info.build_side_root_executor_id = build_side_executor_id; + join_execute_info.join_ptr = join_ptr; + assert(join_execute_info.join_ptr); + dag_context.getJoinExecuteInfoMap()[executor_id] = std::move(join_execute_info); +} +} // namespace + +PhysicalPlanNodePtr PhysicalJoin::build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Join & join, + const PhysicalPlanNodePtr & left, + const PhysicalPlanNodePtr & right) +{ + assert(left); + assert(right); + + left->finalize(); + right->finalize(); + + const Block & left_input_header = left->getSampleBlock(); + const Block & right_input_header = right->getSampleBlock(); + + JoinInterpreterHelper::TiFlashJoin tiflash_join{join}; + + const auto & probe_plan = tiflash_join.build_side_index == 0 ? right : left; + const auto & build_plan = tiflash_join.build_side_index == 0 ? left : right; + + const Block & probe_side_header = probe_plan->getSampleBlock(); + const Block & build_side_header = build_plan->getSampleBlock(); + + String match_helper_name = tiflash_join.genMatchHelperName(left_input_header, right_input_header); + NamesAndTypesList columns_added_by_join = tiflash_join.genColumnsAddedByJoin(build_side_header, match_helper_name); + NamesAndTypes join_output_schema = tiflash_join.genJoinOutputColumns(left_input_header, right_input_header, match_helper_name); + + auto & dag_context = *context.getDAGContext(); + + /// add necessary transformation if the join key is an expression + + bool is_tiflash_right_join = tiflash_join.isTiFlashRightJoin(); + + // prepare probe side + auto [probe_side_prepare_actions, probe_key_names, probe_filter_column_name] = JoinInterpreterHelper::prepareJoin( + context, + probe_side_header, + tiflash_join.getProbeJoinKeys(), + tiflash_join.join_key_types, + true, + is_tiflash_right_join, + tiflash_join.getProbeConditions()); + RUNTIME_ASSERT(probe_side_prepare_actions, log, "probe_side_prepare_actions cannot be nullptr"); + + // prepare build side + auto [build_side_prepare_actions, build_key_names, build_filter_column_name] = JoinInterpreterHelper::prepareJoin( + context, + build_side_header, + tiflash_join.getBuildJoinKeys(), + tiflash_join.join_key_types, + false, + is_tiflash_right_join, + tiflash_join.getBuildConditions()); + RUNTIME_ASSERT(build_side_prepare_actions, log, "build_side_prepare_actions cannot be nullptr"); + + auto [other_condition_expr, other_filter_column_name, other_eq_filter_from_in_column_name] + = tiflash_join.genJoinOtherConditionAction(context, left_input_header, right_input_header, probe_side_prepare_actions); + + const Settings & settings = context.getSettingsRef(); + size_t max_block_size_for_cross_join = settings.max_block_size; + fiu_do_on(FailPoints::minimum_block_size_for_cross_join, { max_block_size_for_cross_join = 1; }); + + JoinPtr join_ptr = std::make_shared( + probe_key_names, + build_key_names, + true, + SizeLimits(settings.max_rows_in_join, settings.max_bytes_in_join, settings.join_overflow_mode), + tiflash_join.kind, + tiflash_join.strictness, + log->identifier(), + tiflash_join.join_key_collators, + probe_filter_column_name, + build_filter_column_name, + other_filter_column_name, + other_eq_filter_from_in_column_name, + other_condition_expr, + max_block_size_for_cross_join, + match_helper_name); + + recordJoinExecuteInfo(dag_context, executor_id, build_plan->execId(), join_ptr); + + auto physical_join = std::make_shared( + executor_id, + join_output_schema, + log->identifier(), + probe_plan, + build_plan, + join_ptr, + columns_added_by_join, + probe_side_prepare_actions, + build_side_prepare_actions, + is_tiflash_right_join, + PhysicalPlanHelper::constructBlockFromSchema(join_output_schema)); + return physical_join; +} + +void PhysicalJoin::probeSideTransform(DAGPipeline & probe_pipeline, Context & context) +{ + const auto & settings = context.getSettingsRef(); + auto & dag_context = *context.getDAGContext(); + + /// probe side streams + executeExpression(probe_pipeline, probe_side_prepare_actions, log, "append join key and join filters for probe side"); + auto join_probe_actions = PhysicalPlanHelper::newActions(probe_pipeline.firstStream()->getHeader(), context); + join_probe_actions->add(ExpressionAction::ordinaryJoin(join_ptr, columns_added_by_join)); + /// add join input stream + if (has_non_joined) + { + auto & join_execute_info = dag_context.getJoinExecuteInfoMap()[execId()]; + size_t not_joined_concurrency = join_ptr->getNotJoinedStreamConcurrency(); + const auto & input_header = probe_pipeline.firstStream()->getHeader(); + for (size_t i = 0; i < not_joined_concurrency; ++i) + { + auto non_joined_stream = join_ptr->createStreamWithNonJoinedRows(input_header, i, not_joined_concurrency, settings.max_block_size); + non_joined_stream->setExtraInfo("add stream with non_joined_data if full_or_right_join"); + probe_pipeline.streams_with_non_joined_data.push_back(non_joined_stream); + join_execute_info.non_joined_streams.push_back(non_joined_stream); + } + } + String join_probe_extra_info = fmt::format("join probe, join_executor_id = {}", execId()); + for (auto & stream : probe_pipeline.streams) + { + stream = std::make_shared(stream, join_probe_actions, log->identifier()); + stream->setExtraInfo(join_probe_extra_info); + } +} + +void PhysicalJoin::buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams) +{ + auto & dag_context = *context.getDAGContext(); + const auto & settings = context.getSettingsRef(); + + size_t join_build_concurrency = settings.join_concurrent_build ? std::min(max_streams, build_pipeline.streams.size()) : 1; + + /// build side streams + executeExpression(build_pipeline, build_side_prepare_actions, log, "append join key and join filters for build side"); + // add a HashJoinBuildBlockInputStream to build a shared hash table + auto get_concurrency_build_index = JoinInterpreterHelper::concurrencyBuildIndexGenerator(join_build_concurrency); + String join_build_extra_info = fmt::format("join build, build_side_root_executor_id = {}", build()->execId()); + auto & join_execute_info = dag_context.getJoinExecuteInfoMap()[execId()]; + build_pipeline.transform([&](auto & stream) { + stream = std::make_shared(stream, join_ptr, get_concurrency_build_index(), log->identifier()); + stream->setExtraInfo(join_build_extra_info); + join_execute_info.join_build_streams.push_back(stream); + }); + // for test, join executor need the return blocks to output. + executeUnion(build_pipeline, max_streams, log, /*ignore_block=*/!dag_context.isTest(), "for join"); + + SubqueryForSet build_query; + build_query.source = build_pipeline.firstStream(); + build_query.join = join_ptr; + join_ptr->init(build_query.source->getHeader(), join_build_concurrency); + dag_context.addSubquery(execId(), std::move(build_query)); +} + +void PhysicalJoin::transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) +{ + /// The build side needs to be transformed first. + { + DAGPipeline build_pipeline; + build()->transform(build_pipeline, context, max_streams); + buildSideTransform(build_pipeline, context, max_streams); + } + + { + DAGPipeline & probe_pipeline = pipeline; + probe()->transform(probe_pipeline, context, max_streams); + probeSideTransform(probe_pipeline, context); + } + + doSchemaProject(pipeline, context); +} + +void PhysicalJoin::doSchemaProject(DAGPipeline & pipeline, Context & context) +{ + /// add a project to remove all the useless column + NamesWithAliases schema_project_cols; + for (auto & c : schema) + { + /// do not need to care about duplicated column names because + /// it is guaranteed by its children physical plan nodes + schema_project_cols.emplace_back(c.name, c.name); + } + assert(!schema_project_cols.empty()); + ExpressionActionsPtr schema_project = generateProjectExpressionActions(pipeline.firstStream(), context, schema_project_cols); + assert(schema_project && !schema_project->getActions().empty()); + executeExpression(pipeline, schema_project, log, "remove useless column after join"); +} + +void PhysicalJoin::finalize(const Names & parent_require) +{ + // schema.size() >= parent_require.size() + FinalizeHelper::checkSchemaContainsParentRequire(schema, parent_require); +} + +const Block & PhysicalJoin::getSampleBlock() const +{ + return sample_block; +} +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalJoin.h b/dbms/src/Flash/Planner/plans/PhysicalJoin.h new file mode 100644 index 00000000000..e2c30038e1a --- /dev/null +++ b/dbms/src/Flash/Planner/plans/PhysicalJoin.h @@ -0,0 +1,85 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB +{ +class PhysicalJoin : public PhysicalBinary +{ +public: + static PhysicalPlanNodePtr build( + const Context & context, + const String & executor_id, + const LoggerPtr & log, + const tipb::Join & join, + const PhysicalPlanNodePtr & left, + const PhysicalPlanNodePtr & right); + + PhysicalJoin( + const String & executor_id_, + const NamesAndTypes & schema_, + const String & req_id, + const PhysicalPlanNodePtr & probe_, + const PhysicalPlanNodePtr & build_, + const JoinPtr & join_ptr_, + const NamesAndTypesList & columns_added_by_join_, + const ExpressionActionsPtr & probe_side_prepare_actions_, + const ExpressionActionsPtr & build_side_prepare_actions_, + bool has_non_joined_, + const Block & sample_block_) + : PhysicalBinary(executor_id_, PlanType::Join, schema_, req_id, probe_, build_) + , join_ptr(join_ptr_) + , columns_added_by_join(columns_added_by_join_) + , probe_side_prepare_actions(probe_side_prepare_actions_) + , build_side_prepare_actions(build_side_prepare_actions_) + , has_non_joined(has_non_joined_) + , sample_block(sample_block_) + {} + + void finalize(const Names & parent_require) override; + + const Block & getSampleBlock() const override; + +private: + void probeSideTransform(DAGPipeline & probe_pipeline, Context & context); + + void buildSideTransform(DAGPipeline & build_pipeline, Context & context, size_t max_streams); + + void doSchemaProject(DAGPipeline & pipeline, Context & context); + + void transformImpl(DAGPipeline & pipeline, Context & context, size_t max_streams) override; + + /// the right side is the build side. + const PhysicalPlanNodePtr & probe() const { return left; } + const PhysicalPlanNodePtr & build() const { return right; } + +private: + JoinPtr join_ptr; + + NamesAndTypesList columns_added_by_join; + + ExpressionActionsPtr probe_side_prepare_actions; + ExpressionActionsPtr build_side_prepare_actions; + + bool has_non_joined; + + Block sample_block; +}; +} // namespace DB diff --git a/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp index 0835fd557b1..7b78f98b51a 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalProjection.cpp @@ -86,7 +86,7 @@ PhysicalPlanNodePtr PhysicalProjection::buildNonRootFinal( } auto physical_projection = std::make_shared( - "NonRootFinalProjection", + child->execId(), schema, log->identifier(), child, @@ -131,7 +131,7 @@ PhysicalPlanNodePtr PhysicalProjection::buildRootFinal( } auto physical_projection = std::make_shared( - "RootFinalProjection", + child->execId(), schema, log->identifier(), child, diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.cpp b/dbms/src/Flash/Planner/plans/PhysicalSource.cpp index b694622314a..970ce0fcb7a 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalSource.cpp +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.cpp @@ -19,6 +19,7 @@ namespace DB { PhysicalPlanNodePtr PhysicalSource::build( + const String & executor_id, const BlockInputStreams & source_streams, const LoggerPtr & log) { @@ -27,7 +28,7 @@ PhysicalPlanNodePtr PhysicalSource::build( NamesAndTypes schema; for (const auto & col : sample_block) schema.emplace_back(col.name, col.type); - return std::make_shared("source", schema, log->identifier(), sample_block, source_streams); + return std::make_shared(executor_id, schema, log->identifier(), sample_block, source_streams); } void PhysicalSource::transformImpl(DAGPipeline & pipeline, Context & /*context*/, size_t /*max_streams*/) diff --git a/dbms/src/Flash/Planner/plans/PhysicalSource.h b/dbms/src/Flash/Planner/plans/PhysicalSource.h index eb178583c6a..592f121a9a7 100644 --- a/dbms/src/Flash/Planner/plans/PhysicalSource.h +++ b/dbms/src/Flash/Planner/plans/PhysicalSource.h @@ -23,6 +23,7 @@ class PhysicalSource : public PhysicalLeaf { public: static PhysicalPlanNodePtr build( + const String & executor_id, const BlockInputStreams & source_streams, const LoggerPtr & log); diff --git a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp index 9c563a67b41..4b6f28642c2 100644 --- a/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp +++ b/dbms/src/Flash/Planner/tests/gtest_physical_plan.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include @@ -36,6 +38,15 @@ class PhysicalPlanTestRunner : public DB::tests::ExecutorTest {{"s1", TiDB::TP::TypeString}, {"s2", TiDB::TP::TypeString}}, {toNullableVec("s1", {"banana", {}, "banana"}), toNullableVec("s2", {"apple", {}, "banana"})}); + + context.addExchangeReceiver("exchange_r_table", + {{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toNullableVec("s", {"banana", "banana"}), + toNullableVec("join_c", {"apple", "banana"})}); + context.addExchangeReceiver("exchange_l_table", + {{"s1", TiDB::TP::TypeString}, {"join_c", TiDB::TP::TypeString}}, + {toNullableVec("s", {"banana", "banana"}), + toNullableVec("join_c", {"apple", "banana"})}); } void execute( @@ -61,9 +72,27 @@ class PhysicalPlanTestRunner : public DB::tests::ExecutorTest { DAGPipeline pipeline; physical_plan.transform(pipeline, context.context, max_streams); - // TODO support non-joined streams. - assert(pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty()); - final_stream = pipeline.firstStream(); + if (pipeline.streams.size() == 1 && pipeline.streams_with_non_joined_data.empty() && !dag_context.hasSubquery()) + { + final_stream = pipeline.firstStream(); + } + else // for join + { + // for non joined probe streams. + BlockInputStreams inputs{}; + inputs.insert(inputs.end(), pipeline.streams.cbegin(), pipeline.streams.cend()); + inputs.insert(inputs.end(), pipeline.streams_with_non_joined_data.cbegin(), pipeline.streams_with_non_joined_data.cend()); + auto probe_stream = std::make_shared(inputs, log->identifier()); + + // for join build side streams + assert(dag_context.hasSubquery()); + const Settings & settings = context.context.getSettingsRef(); + final_stream = std::make_shared( + probe_stream, + std::move(dag_context.moveSubqueries()), + SizeLimits(settings.max_rows_to_transfer, settings.max_bytes_to_transfer, settings.transfer_overflow_mode), + dag_context.log->identifier()); + } FmtBuffer fb; final_stream->dumpTree(fb); ASSERT_EQ(Poco::trim(expected_streams), Poco::trim(fb.toString())); @@ -238,5 +267,97 @@ MockTableScan)", } CATCH +TEST_F(PhysicalPlanTestRunner, Join) +try +{ + auto get_request = [&](const ASTTableJoin::Kind & kind) { + return context + .receive("exchange_l_table") + .join(context.receive("exchange_r_table"), {col("join_c"), col("join_c")}, kind) + .build(context); + }; + + auto request = get_request(ASTTableJoin::Kind::Inner); + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , , , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +CreatingSets + HashJoinBuildBlockInputStream: , join_kind = Inner + Expression: + Expression: + MockExchangeReceiver + Concat + Expression: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver)", + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); + + request = get_request(ASTTableJoin::Kind::Left); + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , , , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +CreatingSets + HashJoinBuildBlockInputStream: , join_kind = Left + Expression: + Expression: + MockExchangeReceiver + Concat + Expression: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver)", + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); + + request = get_request(ASTTableJoin::Kind::Right); + execute( + request, + /*expected_physical_plan=*/R"( + | is_record_profile_streams: true, schema: , , , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , + | is_record_profile_streams: false, schema: , + | is_record_profile_streams: true, schema: , )", + /*expected_streams=*/R"( +CreatingSets + HashJoinBuildBlockInputStream: , join_kind = Right + Expression: + Expression: + MockExchangeReceiver + Concat + Expression: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver + Expression: + NonJoined: )", + {toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"}), + toNullableVec({"banana", "banana"}), + toNullableVec({"apple", "banana"})}); +} +CATCH + } // namespace tests } // namespace DB diff --git a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp index 5ab90ead996..1bebb52abf6 100644 --- a/dbms/src/Flash/tests/gtest_planner_interpreter.cpp +++ b/dbms/src/Flash/tests/gtest_planner_interpreter.cpp @@ -862,12 +862,14 @@ CreatingSets MockTableScan Union: Expression x 10: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - HashJoinProbe: - Expression: - MockTableScan)"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + HashJoinProbe: + Expression: + MockTableScan)"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -891,15 +893,18 @@ CreatingSets MockTableScan Union: Expression x 10: - SharedQuery: - ParallelAggregating, max_threads: 10, final: true - Expression x 10: - HashJoinProbe: - Expression: - Expression: - MockTableScan - Expression x 10: - NonJoined: )"; + Expression: + SharedQuery: + ParallelAggregating, max_threads: 10, final: true + Expression x 10: + Expression: + HashJoinProbe: + Expression: + Expression: + MockTableScan + Expression x 10: + Expression: + NonJoined: )"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 10); } @@ -925,21 +930,23 @@ CreatingSets MockExchangeReceiver Union: MockExchangeSender x 20 - SharedQuery: - Limit, limit = 10 - Union: - Limit x 20, limit = 10 - Expression: - Expression: + Expression: + SharedQuery: + Limit, limit = 10 + Union: + Limit x 20, limit = 10 + Expression: SharedQuery: ParallelAggregating, max_threads: 20, final: true - Expression x 20: - HashJoinProbe: - Expression: - Expression: - MockExchangeReceiver - Expression x 20: - NonJoined: )"; + Expression x 20: + Expression: + HashJoinProbe: + Expression: + Expression: + MockExchangeReceiver + Expression x 20: + Expression: + NonJoined: )"; ASSERT_BLOCKINPUTSTREAM_EQAUL(expected, request, 20); } }