Skip to content

Commit

Permalink
Introduce physical plan and add switch (#4820)
Browse files Browse the repository at this point in the history
ref #4739
  • Loading branch information
SeaRise authored Jun 13, 2022
1 parent 44c4400 commit aabe213
Show file tree
Hide file tree
Showing 22 changed files with 1,814 additions and 18 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/license-checker.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ name: License checker
on:
push:
branches:
- master
- planner_refactory
pull_request:
branches:
- master
- planner_refactory

jobs:
check-license:
Expand Down
17 changes: 17 additions & 0 deletions dbms/src/Common/TiFlashException.h
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,23 @@ namespace DB
"This error usually occurs when the TiFlash server is busy or the TiFlash node is down.\n", \
""); \
) \
C(Planner, \
E(BadRequest, "Bad TiDB DAGRequest.", \
"This error is usually caused by incorrect TiDB DAGRequest. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Unimplemented, "Some features are unimplemented.", \
"This error may caused by unmatched TiDB and TiFlash versions, \n" \
"and should not occur in common case. \n" \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
E(Internal, "TiFlash Planner internal error.", \
"Please contact with developer, \n" \
"better providing information about your cluster(log, topology information etc.).", \
""); \
) \
C(Table, \
E(SchemaVersionError, "Schema version of target table in TiFlash is different from that in query.", \
"TiFlash will sync the newest schema from TiDB before processing every query. \n" \
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ include(${TiFlash_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand Down
25 changes: 19 additions & 6 deletions dbms/src/Flash/Coprocessor/InterpreterDAG.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Flash/Coprocessor/DAGQueryBlockInterpreter.h>
#include <Flash/Coprocessor/InterpreterDAG.h>
#include <Flash/Coprocessor/InterpreterUtils.h>
#include <Flash/Planner/Planner.h>
#include <Interpreters/Context.h>

namespace DB
Expand Down Expand Up @@ -65,12 +66,24 @@ BlockInputStreams InterpreterDAG::executeQueryBlock(DAGQueryBlock & query_block)
BlockInputStreams child_streams = executeQueryBlock(*child);
input_streams_vec.push_back(child_streams);
}
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
if (context.getSettingsRef().enable_planner && Planner::isSupported(query_block))
{
Planner planner(
context,
input_streams_vec,
query_block,
max_streams);
return planner.execute();
}
else
{
DAGQueryBlockInterpreter query_block_interpreter(
context,
input_streams_vec,
query_block,
max_streams);
return query_block_interpreter.execute();
}
}

BlockIO InterpreterDAG::execute()
Expand Down
73 changes: 73 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/FmtUtils.h>
#include <Flash/Coprocessor/DAGContext.h>
#include <Flash/Coprocessor/DAGPipeline.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <Flash/Planner/PhysicalPlanHelper.h>
#include <Interpreters/Context.h>

namespace DB
{
PhysicalPlan::PhysicalPlan(
const String & executor_id_,
const PlanType & type_,
const NamesAndTypes & schema_,
const String & req_id)
: executor_id(executor_id_)
, type(type_)
, schema(schema_)
, log(Logger::get(type_.toString(), req_id))
{}

String PhysicalPlan::toString()
{
auto schema_to_string = [&]() {
FmtBuffer buffer;
buffer.joinStr(
schema.cbegin(),
schema.cend(),
[](const auto & item, FmtBuffer & buf) { buf.fmtAppend("<{}, {}>", item.name, item.type->getName()); },
", ");
return buffer.toString();
};
return fmt::format(
"type: {}, executor_id: {}, is_record_profile_streams: {}, schema: {}",
type.toString(),
executor_id,
is_record_profile_streams,
schema_to_string());
}

void PhysicalPlan::finalize()
{
finalize(PhysicalPlanHelper::schemaToNames(schema));
}

void PhysicalPlan::recordProfileStreams(DAGPipeline & pipeline, const Context & context)
{
if (is_record_profile_streams)
{
auto & profile_streams = context.getDAGContext()->getProfileStreamsMap()[executor_id];
pipeline.transform([&profile_streams](auto & stream) { profile_streams.push_back(stream); });
}
}

void PhysicalPlan::transform(DAGPipeline & pipeline, Context & context, size_t max_streams)
{
transformImpl(pipeline, context, max_streams);
recordProfileStreams(pipeline, context);
}
} // namespace DB
83 changes: 83 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlan.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Logger.h>
#include <Core/Block.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <Flash/Planner/PlanType.h>

#include <memory>

namespace DB
{
struct DAGPipeline;
class Context;
class DAGContext;

class PhysicalPlan;
using PhysicalPlanPtr = std::shared_ptr<PhysicalPlan>;

class PhysicalPlan
{
public:
PhysicalPlan(
const String & executor_id_,
const PlanType & type_,
const NamesAndTypes & schema_,
const String & req_id);

virtual ~PhysicalPlan() = default;

virtual PhysicalPlanPtr children(size_t /*i*/) const = 0;

virtual void setChild(size_t /*i*/, const PhysicalPlanPtr & /*new_child*/) = 0;

const PlanType & tp() const { return type; }

const String & execId() const { return executor_id; }

const NamesAndTypes & getSchema() const { return schema; }

virtual void appendChild(const PhysicalPlanPtr & /*new_child*/) = 0;

virtual size_t childrenSize() const = 0;

virtual void transform(DAGPipeline & pipeline, Context & context, size_t max_streams);

virtual void finalize(const Names & parent_require) = 0;
void finalize();

/// Obtain a sample block that contains the names and types of result columns.
virtual const Block & getSampleBlock() const = 0;

void disableRecordProfileStreams() { is_record_profile_streams = false; }

String toString();

protected:
virtual void transformImpl(DAGPipeline & /*pipeline*/, Context & /*context*/, size_t /*max_streams*/){};

void recordProfileStreams(DAGPipeline & pipeline, const Context & context);

String executor_id;
PlanType type;
NamesAndTypes schema;
bool is_record_profile_streams = true;

LoggerPtr log;
};
} // namespace DB
24 changes: 24 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Planner/PhysicalPlanBuilder.h>
#include <Flash/Planner/plans/PhysicalSource.h>

namespace DB
{
void PhysicalPlanBuilder::buildSource(const Block & sample_block)
{
cur_plans.push_back(PhysicalSource::build(sample_block, log->identifier()));
}
} // namespace DB
47 changes: 47 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanBuilder.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Common/Exception.h>
#include <Common/Logger.h>
#include <Flash/Planner/PhysicalPlan.h>
#include <common/logger_useful.h>

namespace DB
{
class PhysicalPlanBuilder
{
public:
explicit PhysicalPlanBuilder(Context & context_, const String & req_id)
: context(context_)
, log(Logger::get("PhysicalPlanBuilder", req_id))
{}

void buildSource(const Block & sample_block);

PhysicalPlanPtr getResult() const
{
RUNTIME_ASSERT(cur_plans.size() == 1, log, "There can only be one plan output, but here are {}", cur_plans.size());
return cur_plans.back();
}

private:
std::vector<PhysicalPlanPtr> cur_plans;

[[maybe_unused]] Context & context;

LoggerPtr log;
};
} // namespace DB
27 changes: 27 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Flash/Planner/PhysicalPlanHelper.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema)
{
Names names;
names.reserve(schema.size());
for (const auto & column : schema)
names.push_back(column.name);
return names;
}
} // namespace DB::PhysicalPlanHelper
22 changes: 22 additions & 0 deletions dbms/src/Flash/Planner/PhysicalPlanHelper.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Core/NamesAndTypes.h>

namespace DB::PhysicalPlanHelper
{
Names schemaToNames(const NamesAndTypes & schema);
} // namespace DB::PhysicalPlanHelper
30 changes: 30 additions & 0 deletions dbms/src/Flash/Planner/PlanType.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2022 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/TiFlashException.h>
#include <Flash/Planner/PlanType.h>

namespace DB
{
String PlanType::toString() const
{
switch (enum_value)
{
case Source:
return "Source";
default:
throw TiFlashException("Unknown PlanType", Errors::Planner::Internal);
}
}
} // namespace DB
Loading

0 comments on commit aabe213

Please sign in to comment.