Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Planner: support window #5363

Merged
merged 15 commits into from
Jul 26, 2022
2 changes: 0 additions & 2 deletions dbms/src/Flash/Coprocessor/DAGContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@ extern const int DIVIDED_BY_ZERO;
extern const int INVALID_TIME;
} // namespace ErrorCodes

const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";

bool strictSqlMode(UInt64 sql_mode)
{
return sql_mode & TiDBSQLMode::STRICT_ALL_TABLES || sql_mode & TiDBSQLMode::STRICT_TRANS_TABLES;
Expand Down
8 changes: 1 addition & 7 deletions dbms/src/Flash/Coprocessor/DAGContext.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Common/Logger.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/TablesRegionsInfo.h>
#include <Flash/Mpp/MPPTaskId.h>
#include <Interpreters/SubqueryForSet.h>
Expand Down Expand Up @@ -117,13 +118,6 @@ constexpr UInt64 NO_ENGINE_SUBSTITUTION = 1ul << 30ul;
constexpr UInt64 ALLOW_INVALID_DATES = 1ul << 32ul;
} // namespace TiDBSQLMode

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

extern const String enableFineGrainedShuffleExtraInfo;

/// A context used to track the information that needs to be passed around during DAG planning.
class DAGContext
{
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Flash/Coprocessor/DAGQueryBlockInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/DAGUtils.h>
#include <Flash/Coprocessor/ExchangeSenderInterpreterHelper.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Coprocessor/JoinInterpreterHelper.h>
Expand Down
20 changes: 20 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Coprocessor/FineGrainedShuffle.h>

namespace DB
{
const String enableFineGrainedShuffleExtraInfo = "enable fine grained shuffle";
}
45 changes: 45 additions & 0 deletions dbms/src/Flash/Coprocessor/FineGrainedShuffle.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Flash/Coprocessor/DAGContext.h>
#include <common/types.h>
#include <tipb/executor.pb.h>

namespace DB
{
extern const String enableFineGrainedShuffleExtraInfo;

inline bool enableFineGrainedShuffle(uint64_t stream_count)
{
return stream_count > 0;
}

struct FineGrainedShuffle
{
explicit FineGrainedShuffle(const tipb::Executor * executor)
: stream_count(executor ? executor->fine_grained_shuffle_stream_count() : 0)
, batch_size(executor ? executor->fine_grained_shuffle_batch_size() : 0)
{}

bool enable() const
{
return enableFineGrainedShuffle(stream_count);
}

const UInt64 stream_count;
const UInt64 batch_size;
};
} // namespace DB
1 change: 1 addition & 0 deletions dbms/src/Flash/Mpp/ExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Coprocessor/CoprocessorReader.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <fmt/core.h>
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Planner/ExecutorIdGenerator.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <Flash/Planner/PhysicalPlanVisitor.h>
Expand All @@ -30,6 +31,8 @@
#include <Flash/Planner/plans/PhysicalSource.h>
#include <Flash/Planner/plans/PhysicalTableScan.h>
#include <Flash/Planner/plans/PhysicalTopN.h>
#include <Flash/Planner/plans/PhysicalWindow.h>
#include <Flash/Planner/plans/PhysicalWindowSort.h>
#include <Flash/Statistics/traverseExecutors.h>
#include <Interpreters/Context.h>

Expand Down Expand Up @@ -93,7 +96,7 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
if (unlikely(dagContext().isTest()))
pushBack(PhysicalMockExchangeSender::build(executor_id, log, popBack()));
else
pushBack(PhysicalExchangeSender::build(executor_id, log, executor->exchange_sender(), popBack()));
pushBack(PhysicalExchangeSender::build(executor_id, log, executor->exchange_sender(), FineGrainedShuffle(executor), popBack()));
break;
}
case tipb::ExecType::TypeExchangeReceiver:
Expand All @@ -107,6 +110,12 @@ void PhysicalPlan::build(const String & executor_id, const tipb::Executor * exec
case tipb::ExecType::TypeProjection:
pushBack(PhysicalProjection::build(context, executor_id, log, executor->projection(), popBack()));
break;
case tipb::ExecType::TypeWindow:
pushBack(PhysicalWindow::build(context, executor_id, log, executor->window(), FineGrainedShuffle(executor), popBack()));
break;
case tipb::ExecType::TypeSort:
pushBack(PhysicalWindowSort::build(context, executor_id, log, executor->sort(), FineGrainedShuffle(executor), popBack()));
break;
case tipb::ExecType::TypeTableScan:
case tipb::ExecType::TypePartitionTableScan:
{
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Planner/PhysicalPlanNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,10 @@ void PhysicalPlanNode::transform(DAGPipeline & pipeline, Context & context, size
transformImpl(pipeline, context, max_streams);
if (is_record_profile_streams)
recordProfileStreams(pipeline, context);
// todo modify logic after supporting window function.
context.getDAGContext()->updateFinalConcurrency(pipeline.streams.size(), max_streams);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
if (is_restore_concurrency)
{
context.getDAGContext()->updateFinalConcurrency(pipeline.streams.size(), max_streams);
restoreConcurrency(pipeline, context.getDAGContext()->final_concurrency, log);
}
}
} // namespace DB
4 changes: 4 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanNode.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class PhysicalPlanNode

void disableRecordProfileStreams() { is_record_profile_streams = false; }

void disableRestoreConcurrency() { is_restore_concurrency = false; }

String toString();

protected:
Expand All @@ -74,7 +76,9 @@ class PhysicalPlanNode
String executor_id;
PlanType type;
NamesAndTypes schema;

bool is_record_profile_streams = true;
bool is_restore_concurrency = true;

LoggerPtr log;
};
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ String PlanType::toString() const
return "MockExchangeReceiver";
case Projection:
return "Projection";
case Window:
return "Window";
case WindowSort:
return "WindowSort";
case TableScan:
return "TableScan";
case MockTableScan:
Expand Down
8 changes: 5 additions & 3 deletions dbms/src/Flash/Planner/PlanType.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,11 @@ struct PlanType
ExchangeReceiver = 7,
MockExchangeReceiver = 8,
Projection = 9,
TableScan = 10,
MockTableScan = 11,
Join = 12,
Window = 10,
WindowSort = 11,
TableScan = 12,
MockTableScan = 13,
Join = 14,
};
PlanTypeEnum enum_value;

Expand Down
16 changes: 10 additions & 6 deletions dbms/src/Flash/Planner/Planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,19 +85,16 @@ BlockInputStreams Planner::execute()

bool Planner::isSupported(const DAGQueryBlock & query_block)
{
/// todo support fine grained shuffle
static auto disable_fine_frained_shuffle = [](const DAGQueryBlock & query_block) {
return !enableFineGrainedShuffle(query_block.source->fine_grained_shuffle_stream_count())
&& (!query_block.exchange_sender || !enableFineGrainedShuffle(query_block.exchange_sender->fine_grained_shuffle_stream_count()));
};
static auto has_supported_source = [](const DAGQueryBlock & query_block) {
return query_block.source
&& (query_block.source->tp() == tipb::ExecType::TypeProjection
|| query_block.source->tp() == tipb::ExecType::TypeExchangeReceiver
|| query_block.source->tp() == tipb::ExecType::TypeWindow
|| (query_block.source->tp() == tipb::ExecType::TypeSort && query_block.source->sort().ispartialsort())
|| query_block.source->tp() == tipb::ExecType::TypeJoin
|| query_block.isTableScanSource());
};
return has_supported_source(query_block) && disable_fine_frained_shuffle(query_block);
return has_supported_source(query_block);
}

DAGContext & Planner::dagContext() const
Expand Down Expand Up @@ -127,5 +124,12 @@ void Planner::executeImpl(DAGPipeline & pipeline)
physical_plan.outputAndOptimize();

physical_plan.transform(pipeline, context, max_streams);

// TODO Now both PhysicalWindow and PhysicalSort are disabled restoreConcurrency.
// After DAGQueryBlock removed, we can only disable restoreConcurrency for
// the PhysicalWindow and PhysicalSort below PhysicalWindow and remove this line.
// PhysicalWindow <-- PhysicalWindow/PhysicalSort.
if (query_block.source->tp() == tipb::ExecType::TypeWindow)
restorePipelineConcurrency(pipeline);
}
} // namespace DB
22 changes: 17 additions & 5 deletions dbms/src/Flash/Planner/plans/PhysicalExchangeReceiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/TiRemoteBlockInputStream.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Coprocessor/FineGrainedShuffle.h>
#include <Flash/Coprocessor/GenSchemaAndColumn.h>
#include <Flash/Mpp/ExchangeReceiver.h>
#include <Flash/Planner/FinalizeHelper.h>
Expand Down Expand Up @@ -48,8 +49,6 @@ PhysicalPlanNodePtr PhysicalExchangeReceiver::build(
throw TiFlashException(
fmt::format("Can not find exchange receiver for {}", executor_id),
Errors::Planner::Internal);
/// todo support fine grained shuffle
assert(!enableFineGrainedShuffle(mpp_exchange_receiver->getFineGrainedShuffleStreamCount()));

NamesAndTypes schema = toNamesAndTypes(mpp_exchange_receiver->getOutputSchema());
auto physical_exchange_receiver = std::make_shared<PhysicalExchangeReceiver>(
Expand All @@ -66,12 +65,25 @@ void PhysicalExchangeReceiver::transformImpl(DAGPipeline & pipeline, Context & c
auto & dag_context = *context.getDAGContext();
// todo choose a more reasonable stream number
auto & exchange_receiver_io_input_streams = dag_context.getInBoundIOInputStreamsMap()[executor_id];
for (size_t i = 0; i < max_streams; ++i)

const bool enable_fine_grained_shuffle = enableFineGrainedShuffle(mpp_exchange_receiver->getFineGrainedShuffleStreamCount());
String extra_info = "squashing after exchange receiver";
size_t stream_count = max_streams;
if (enable_fine_grained_shuffle)
{
extra_info += ", " + enableFineGrainedShuffleExtraInfo;
stream_count = std::min(max_streams, mpp_exchange_receiver->getFineGrainedShuffleStreamCount());
}

for (size_t i = 0; i < stream_count; ++i)
{
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver, log->identifier(), executor_id, /*stream_id=*/0);
BlockInputStreamPtr stream = std::make_shared<ExchangeReceiverInputStream>(mpp_exchange_receiver,
log->identifier(),
execId(),
/*stream_id=*/enable_fine_grained_shuffle ? i : 0);
exchange_receiver_io_input_streams.push_back(stream);
stream = std::make_shared<SquashingBlockInputStream>(stream, 8192, 0, log->identifier());
stream->setExtraInfo("squashing after exchange receiver");
stream->setExtraInfo(extra_info);
pipeline.streams.push_back(stream);
}
}
Expand Down
59 changes: 42 additions & 17 deletions dbms/src/Flash/Planner/plans/PhysicalExchangeSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ PhysicalPlanNodePtr PhysicalExchangeSender::build(
const String & executor_id,
const LoggerPtr & log,
const tipb::ExchangeSender & exchange_sender,
const FineGrainedShuffle & fine_grained_shuffle,
const PhysicalPlanNodePtr & child)
{
assert(child);
Expand All @@ -42,7 +43,8 @@ PhysicalPlanNodePtr PhysicalExchangeSender::build(
child,
partition_col_ids,
partition_col_collators,
exchange_sender.tp());
exchange_sender.tp(),
fine_grained_shuffle);
return physical_exchange_sender;
}

Expand All @@ -55,23 +57,46 @@ void PhysicalExchangeSender::transformImpl(DAGPipeline & pipeline, Context & con

RUNTIME_ASSERT(dag_context.isMPPTask() && dag_context.tunnel_set != nullptr, log, "exchange_sender only run in MPP");

/// todo support fine grained shuffle
int stream_id = 0;
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>(
dag_context.tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_type,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dag_context,
0,
0);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
});

if (fine_grained_shuffle.enable())
{
pipeline.transform([&](auto & stream) {
// construct writer
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, true>>(
dag_context.tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_type,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dag_context,
fine_grained_shuffle.stream_count,
fine_grained_shuffle.batch_size);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
stream->setExtraInfo(enableFineGrainedShuffleExtraInfo);
});
RUNTIME_CHECK(exchange_type == tipb::ExchangeType::Hash, Exception, "exchange_sender has to be hash partition when fine grained shuffle is enabled");
RUNTIME_CHECK(fine_grained_shuffle.stream_count <= 1024, Exception, "fine_grained_shuffle_stream_count should not be greater than 1024");
}
else
{
pipeline.transform([&](auto & stream) {
std::unique_ptr<DAGResponseWriter> response_writer = std::make_unique<StreamingDAGResponseWriter<MPPTunnelSetPtr, false>>(
dag_context.tunnel_set,
partition_col_ids,
partition_col_collators,
exchange_type,
context.getSettingsRef().dag_records_per_chunk,
context.getSettingsRef().batch_send_min_limit,
stream_id++ == 0, /// only one stream needs to sending execution summaries for the last response
dag_context,
fine_grained_shuffle.stream_count,
fine_grained_shuffle.batch_size);
stream = std::make_shared<ExchangeSenderBlockInputStream>(stream, std::move(response_writer), log->identifier());
});
}
}

void PhysicalExchangeSender::finalize(const Names & parent_require)
Expand Down
Loading