From 2513bd4702337f66466725679656b8eb319cce96 Mon Sep 17 00:00:00 2001 From: Wenxuan Date: Thu, 25 May 2023 16:25:39 +0800 Subject: [PATCH] storage: Introduce AddExtraTableIDColumnTransformOp (#7545) ref pingcap/tiflash#6827 --- .../AddExtraTableIDColumnInputStream.cpp | 7 +-- .../AddExtraTableIDColumnInputStream.h | 1 + .../AddExtraTableIDColumnTransformAction.cpp | 10 ++-- .../AddExtraTableIDColumnTransformAction.h | 10 ++-- .../Flash/Coprocessor/GenSchemaAndColumn.cpp | 2 +- .../Flash/Coprocessor/GenSchemaAndColumn.h | 2 +- .../AddExtraTableIDColumnTransformOp.cpp | 32 ++++++++++++ .../AddExtraTableIDColumnTransformOp.h | 52 +++++++++++++++++++ .../src/Operators/DMSegmentThreadSourceOp.cpp | 12 ++--- dbms/src/Operators/DMSegmentThreadSourceOp.h | 9 +--- .../Storages/DeltaMerge/DeltaMergeStore.cpp | 44 ++++++++++------ 11 files changed, 129 insertions(+), 52 deletions(-) create mode 100644 dbms/src/Operators/AddExtraTableIDColumnTransformOp.cpp create mode 100644 dbms/src/Operators/AddExtraTableIDColumnTransformOp.h diff --git a/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp b/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp index 1b6be4bef9d..037ff600524 100644 --- a/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp +++ b/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp @@ -20,8 +20,9 @@ namespace DB AddExtraTableIDColumnInputStream::AddExtraTableIDColumnInputStream( BlockInputStreamPtr input, int extra_table_id_index, - TableID physical_table_id) - : action(input->getHeader(), extra_table_id_index, physical_table_id) + TableID physical_table_id_) + : physical_table_id(physical_table_id_) + , action(input->getHeader(), extra_table_id_index) { children.push_back(input); } @@ -32,7 +33,7 @@ Block AddExtraTableIDColumnInputStream::readImpl() if (!res) return res; - auto ok = action.transform(res); + auto ok = action.transform(res, physical_table_id); if (!ok) return {}; diff --git a/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h b/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h index 4f3ccb44389..b09c407d24f 100644 --- a/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h +++ b/dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h @@ -42,6 +42,7 @@ class AddExtraTableIDColumnInputStream : public IProfilingBlockInputStream Block readImpl() override; private: + const TableID physical_table_id; AddExtraTableIDColumnTransformAction action; }; diff --git a/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp b/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp index 47687a7d344..49196785155 100644 --- a/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp +++ b/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp @@ -47,21 +47,17 @@ Block AddExtraTableIDColumnTransformAction::buildHeader( AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction( const Block & inner_header_, - int extra_table_id_index_, - TableID physical_table_id_) + int extra_table_id_index_) : header(buildHeader(inner_header_, extra_table_id_index_)) , extra_table_id_index(extra_table_id_index_) - , physical_table_id(physical_table_id_) { } AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction( const DM::ColumnDefines & columns_to_read_, - int extra_table_id_index_, - TableID physical_table_id_) + int extra_table_id_index_) : header(buildHeader(columns_to_read_, extra_table_id_index_)) , extra_table_id_index(extra_table_id_index_) - , physical_table_id(physical_table_id_) { } @@ -70,7 +66,7 @@ Block AddExtraTableIDColumnTransformAction::getHeader() const return header; } -bool AddExtraTableIDColumnTransformAction::transform(Block & block) +bool AddExtraTableIDColumnTransformAction::transform(Block & block, TableID physical_table_id) { if (unlikely(!block)) return true; diff --git a/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h b/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h index 3a79d114532..6d19621e106 100644 --- a/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h +++ b/dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h @@ -31,15 +31,13 @@ struct AddExtraTableIDColumnTransformAction AddExtraTableIDColumnTransformAction( const Block & inner_header_, - int extra_table_id_index_, - TableID physical_table_id_); + int extra_table_id_index_); AddExtraTableIDColumnTransformAction( const DM::ColumnDefines & columns_to_read_, - int extra_table_id_index_, - TableID physical_table_id_); + int extra_table_id_index_); - bool transform(Block & block); + bool transform(Block & block, TableID physical_table_id); Block getHeader() const; @@ -48,12 +46,10 @@ struct AddExtraTableIDColumnTransformAction return total_rows; } - private: Block header; // position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function. const int extra_table_id_index; - const TableID physical_table_id; size_t total_rows = 0; }; diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp index da5f8ed22ff..84840cd178b 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp @@ -83,7 +83,7 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef return genNamesAndTypes(table_scan.getColumns(), column_prefix); } -std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan) +std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan) { auto column_defines = std::make_shared(); size_t extra_table_id_index = InvalidColumnID; diff --git a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h index d865c8010c8..49ec3796cfd 100644 --- a/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h +++ b/dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h @@ -39,7 +39,7 @@ using ColumnDefinesPtr = std::shared_ptr>; } // namespace DM // The column defines and `extra table id index` -std::tuple +std::tuple genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan); } // namespace DB diff --git a/dbms/src/Operators/AddExtraTableIDColumnTransformOp.cpp b/dbms/src/Operators/AddExtraTableIDColumnTransformOp.cpp new file mode 100644 index 00000000000..80c534b61f2 --- /dev/null +++ b/dbms/src/Operators/AddExtraTableIDColumnTransformOp.cpp @@ -0,0 +1,32 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +namespace DB +{ + +OperatorStatus AddExtraTableIDColumnTransformOp::transformImpl(Block & block) +{ + if (!action.transform(block, physical_table_id)) + block = {}; + return OperatorStatus::HAS_OUTPUT; +} + +void AddExtraTableIDColumnTransformOp::transformHeaderImpl(Block & header_) +{ + header_ = action.getHeader(); +} + +} // namespace DB diff --git a/dbms/src/Operators/AddExtraTableIDColumnTransformOp.h b/dbms/src/Operators/AddExtraTableIDColumnTransformOp.h new file mode 100644 index 00000000000..92de3563474 --- /dev/null +++ b/dbms/src/Operators/AddExtraTableIDColumnTransformOp.h @@ -0,0 +1,52 @@ +// Copyright 2023 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include + +namespace DB +{ + +class AddExtraTableIDColumnTransformOp : public TransformOp +{ +public: + AddExtraTableIDColumnTransformOp( + PipelineExecutorStatus & exec_status_, + const String & req_id, + const DM::ColumnDefines & columns_to_read, + int extra_table_id_index, + TableID physical_table_id_) + : TransformOp(exec_status_, req_id) + , action(columns_to_read, extra_table_id_index) + , physical_table_id(physical_table_id_) + {} + + String getName() const override + { + return "AddExtraTableIDColumnTransformOp"; + } + +protected: + OperatorStatus transformImpl(Block & block) override; + + void transformHeaderImpl(Block & header_) override; + +private: + AddExtraTableIDColumnTransformAction action; + const TableID physical_table_id; +}; + +} // namespace DB diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp index 049a085d7f3..ba9d2ac5407 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.cpp +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.cpp @@ -34,8 +34,6 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( UInt64 max_version_, size_t expected_block_size_, DM::ReadMode read_mode_, - const int extra_table_id_index, - const TableID physical_table_id, const String & req_id) : SourceOp(exec_status_, req_id) , dm_context(dm_context_) @@ -46,9 +44,8 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp( , max_version(max_version_) , expected_block_size(expected_block_size_) , read_mode(read_mode_) - , action(columns_to_read_, extra_table_id_index, physical_table_id) { - setHeader(action.getHeader()); + setHeader(toEmptyBlock(columns_to_read)); } String DMSegmentThreadSourceOp::getName() const @@ -58,7 +55,7 @@ String DMSegmentThreadSourceOp::getName() const void DMSegmentThreadSourceOp::operateSuffix() { - LOG_DEBUG(log, "Finish read {} rows from storage", action.totalRows()); + LOG_DEBUG(log, "Finish read {} rows from storage", total_rows); } OperatorStatus DMSegmentThreadSourceOp::readImpl(Block & block) @@ -72,10 +69,7 @@ OperatorStatus DMSegmentThreadSourceOp::readImpl(Block & block) { std::swap(block, t_block.value()); t_block.reset(); - if (action.transform(block)) - { - return OperatorStatus::HAS_OUTPUT; - } + total_rows += block.rows(); return OperatorStatus::HAS_OUTPUT; } return OperatorStatus::IO; diff --git a/dbms/src/Operators/DMSegmentThreadSourceOp.h b/dbms/src/Operators/DMSegmentThreadSourceOp.h index 4e1f24e9eef..2c0188cafcc 100644 --- a/dbms/src/Operators/DMSegmentThreadSourceOp.h +++ b/dbms/src/Operators/DMSegmentThreadSourceOp.h @@ -14,7 +14,6 @@ #pragma once -#include #include #include #include @@ -41,8 +40,6 @@ class DMSegmentThreadSourceOp : public SourceOp UInt64 max_version_, size_t expected_block_size_, DM::ReadMode read_mode_, - int extra_table_id_index, - TableID physical_table_id, const String & req_id); String getName() const override; @@ -70,12 +67,10 @@ class DMSegmentThreadSourceOp : public SourceOp DM::SegmentPtr cur_segment; - // TODO: Remove this action from this operator. - // Instead use AddExtraTableIDColumnTransformOp in the outside. - AddExtraTableIDColumnTransformAction action; - FilterPtr filter_ignored = nullptr; std::optional t_block; + + size_t total_rows = 0; }; } // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index 9c7cdcd8e31..a2bef901cbe 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -27,6 +27,7 @@ #include #include #include +#include #include #include #include @@ -1185,9 +1186,9 @@ void DeltaMergeStore::read( enable_read_thread, final_num_stream); - for (size_t i = 0; i < final_num_stream; ++i) + if (enable_read_thread) { - if (enable_read_thread) + for (size_t i = 0; i < final_num_stream; ++i) { group_builder.addConcurrency( std::make_unique( @@ -1197,24 +1198,33 @@ void DeltaMergeStore::read( extra_table_id_index, log_tracing_id)); } - else + } + else + { + for (size_t i = 0; i < final_num_stream; ++i) { - group_builder.addConcurrency( - std::make_unique( - exec_status, - dm_context, - read_task_pool, - after_segment_read, - columns_to_read, - filter, - max_version, - expected_block_size, - /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, - extra_table_id_index, - physical_table_id, - log_tracing_id)); + group_builder.addConcurrency(std::make_unique( + exec_status, + dm_context, + read_task_pool, + after_segment_read, + columns_to_read, + filter, + max_version, + expected_block_size, + /* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal, + log_tracing_id)); } + group_builder.transform([&](auto & builder) { + builder.appendTransformOp(std::make_unique( + exec_status, + log_tracing_id, + columns_to_read, + extra_table_id_index, + physical_table_id)); + }); } + LOG_DEBUG(tracing_logger, "Read create PipelineExec done"); }