Skip to content

Commit

Permalink
storage: Introduce AddExtraTableIDColumnTransformOp (#7545)
Browse files Browse the repository at this point in the history
ref #6827
  • Loading branch information
breezewish authored May 25, 2023
1 parent 4882fc4 commit 2513bd4
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 52 deletions.
7 changes: 4 additions & 3 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@ namespace DB
AddExtraTableIDColumnInputStream::AddExtraTableIDColumnInputStream(
BlockInputStreamPtr input,
int extra_table_id_index,
TableID physical_table_id)
: action(input->getHeader(), extra_table_id_index, physical_table_id)
TableID physical_table_id_)
: physical_table_id(physical_table_id_)
, action(input->getHeader(), extra_table_id_index)
{
children.push_back(input);
}
Expand All @@ -32,7 +33,7 @@ Block AddExtraTableIDColumnInputStream::readImpl()
if (!res)
return res;

auto ok = action.transform(res);
auto ok = action.transform(res, physical_table_id);
if (!ok)
return {};

Expand Down
1 change: 1 addition & 0 deletions dbms/src/DataStreams/AddExtraTableIDColumnInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class AddExtraTableIDColumnInputStream : public IProfilingBlockInputStream
Block readImpl() override;

private:
const TableID physical_table_id;
AddExtraTableIDColumnTransformAction action;
};

Expand Down
10 changes: 3 additions & 7 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,21 +47,17 @@ Block AddExtraTableIDColumnTransformAction::buildHeader(

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_)
int extra_table_id_index_)
: header(buildHeader(inner_header_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

AddExtraTableIDColumnTransformAction::AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_)
int extra_table_id_index_)
: header(buildHeader(columns_to_read_, extra_table_id_index_))
, extra_table_id_index(extra_table_id_index_)
, physical_table_id(physical_table_id_)
{
}

Expand All @@ -70,7 +66,7 @@ Block AddExtraTableIDColumnTransformAction::getHeader() const
return header;
}

bool AddExtraTableIDColumnTransformAction::transform(Block & block)
bool AddExtraTableIDColumnTransformAction::transform(Block & block, TableID physical_table_id)
{
if (unlikely(!block))
return true;
Expand Down
10 changes: 3 additions & 7 deletions dbms/src/DataStreams/AddExtraTableIDColumnTransformAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,13 @@ struct AddExtraTableIDColumnTransformAction

AddExtraTableIDColumnTransformAction(
const Block & inner_header_,
int extra_table_id_index_,
TableID physical_table_id_);
int extra_table_id_index_);

AddExtraTableIDColumnTransformAction(
const DM::ColumnDefines & columns_to_read_,
int extra_table_id_index_,
TableID physical_table_id_);
int extra_table_id_index_);

bool transform(Block & block);
bool transform(Block & block, TableID physical_table_id);

Block getHeader() const;

Expand All @@ -48,12 +46,10 @@ struct AddExtraTableIDColumnTransformAction
return total_rows;
}


private:
Block header;
// position of the ExtraPhysTblID column in column_names parameter in the StorageDeltaMerge::read function.
const int extra_table_id_index;
const TableID physical_table_id;

size_t total_rows = 0;
};
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ NamesAndTypes genNamesAndTypes(const TiDBTableScan & table_scan, const StringRef
return genNamesAndTypes(table_scan.getColumns(), column_prefix);
}

std::tuple<DM::ColumnDefinesPtr, size_t> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
std::tuple<DM::ColumnDefinesPtr, int> genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan)
{
auto column_defines = std::make_shared<DM::ColumnDefines>();
size_t extra_table_id_index = InvalidColumnID;
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/GenSchemaAndColumn.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ using ColumnDefinesPtr = std::shared_ptr<std::vector<ColumnDefine>>;
} // namespace DM

// The column defines and `extra table id index`
std::tuple<DM::ColumnDefinesPtr, size_t>
std::tuple<DM::ColumnDefinesPtr, int>
genColumnDefinesForDisaggregatedRead(const TiDBTableScan & table_scan);

} // namespace DB
32 changes: 32 additions & 0 deletions dbms/src/Operators/AddExtraTableIDColumnTransformOp.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Operators/AddExtraTableIDColumnTransformOp.h>

namespace DB
{

OperatorStatus AddExtraTableIDColumnTransformOp::transformImpl(Block & block)
{
if (!action.transform(block, physical_table_id))
block = {};
return OperatorStatus::HAS_OUTPUT;
}

void AddExtraTableIDColumnTransformOp::transformHeaderImpl(Block & header_)
{
header_ = action.getHeader();
}

} // namespace DB
52 changes: 52 additions & 0 deletions dbms/src/Operators/AddExtraTableIDColumnTransformOp.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Operators/Operator.h>

namespace DB
{

class AddExtraTableIDColumnTransformOp : public TransformOp
{
public:
AddExtraTableIDColumnTransformOp(
PipelineExecutorStatus & exec_status_,
const String & req_id,
const DM::ColumnDefines & columns_to_read,
int extra_table_id_index,
TableID physical_table_id_)
: TransformOp(exec_status_, req_id)
, action(columns_to_read, extra_table_id_index)
, physical_table_id(physical_table_id_)
{}

String getName() const override
{
return "AddExtraTableIDColumnTransformOp";
}

protected:
OperatorStatus transformImpl(Block & block) override;

void transformHeaderImpl(Block & header_) override;

private:
AddExtraTableIDColumnTransformAction action;
const TableID physical_table_id;
};

} // namespace DB
12 changes: 3 additions & 9 deletions dbms/src/Operators/DMSegmentThreadSourceOp.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,6 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
UInt64 max_version_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
const int extra_table_id_index,
const TableID physical_table_id,
const String & req_id)
: SourceOp(exec_status_, req_id)
, dm_context(dm_context_)
Expand All @@ -46,9 +44,8 @@ DMSegmentThreadSourceOp::DMSegmentThreadSourceOp(
, max_version(max_version_)
, expected_block_size(expected_block_size_)
, read_mode(read_mode_)
, action(columns_to_read_, extra_table_id_index, physical_table_id)
{
setHeader(action.getHeader());
setHeader(toEmptyBlock(columns_to_read));
}

String DMSegmentThreadSourceOp::getName() const
Expand All @@ -58,7 +55,7 @@ String DMSegmentThreadSourceOp::getName() const

void DMSegmentThreadSourceOp::operateSuffix()
{
LOG_DEBUG(log, "Finish read {} rows from storage", action.totalRows());
LOG_DEBUG(log, "Finish read {} rows from storage", total_rows);
}

OperatorStatus DMSegmentThreadSourceOp::readImpl(Block & block)
Expand All @@ -72,10 +69,7 @@ OperatorStatus DMSegmentThreadSourceOp::readImpl(Block & block)
{
std::swap(block, t_block.value());
t_block.reset();
if (action.transform(block))
{
return OperatorStatus::HAS_OUTPUT;
}
total_rows += block.rows();
return OperatorStatus::HAS_OUTPUT;
}
return OperatorStatus::IO;
Expand Down
9 changes: 2 additions & 7 deletions dbms/src/Operators/DMSegmentThreadSourceOp.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#pragma once

#include <DataStreams/AddExtraTableIDColumnTransformAction.h>
#include <Operators/Operator.h>
#include <Storages/DeltaMerge/DMContext.h>
#include <Storages/DeltaMerge/Segment.h>
Expand All @@ -41,8 +40,6 @@ class DMSegmentThreadSourceOp : public SourceOp
UInt64 max_version_,
size_t expected_block_size_,
DM::ReadMode read_mode_,
int extra_table_id_index,
TableID physical_table_id,
const String & req_id);

String getName() const override;
Expand Down Expand Up @@ -70,12 +67,10 @@ class DMSegmentThreadSourceOp : public SourceOp

DM::SegmentPtr cur_segment;

// TODO: Remove this action from this operator.
// Instead use AddExtraTableIDColumnTransformOp in the outside.
AddExtraTableIDColumnTransformAction action;

FilterPtr filter_ignored = nullptr;
std::optional<Block> t_block;

size_t total_rows = 0;
};

} // namespace DB
44 changes: 27 additions & 17 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
#include <Interpreters/sortBlock.h>
#include <Operators/AddExtraTableIDColumnTransformOp.h>
#include <Operators/DMSegmentThreadSourceOp.h>
#include <Operators/UnorderedSourceOp.h>
#include <Poco/Exception.h>
Expand Down Expand Up @@ -1185,9 +1186,9 @@ void DeltaMergeStore::read(
enable_read_thread,
final_num_stream);

for (size_t i = 0; i < final_num_stream; ++i)
if (enable_read_thread)
{
if (enable_read_thread)
for (size_t i = 0; i < final_num_stream; ++i)
{
group_builder.addConcurrency(
std::make_unique<UnorderedSourceOp>(
Expand All @@ -1197,24 +1198,33 @@ void DeltaMergeStore::read(
extra_table_id_index,
log_tracing_id));
}
else
}
else
{
for (size_t i = 0; i < final_num_stream; ++i)
{
group_builder.addConcurrency(
std::make_unique<DMSegmentThreadSourceOp>(
exec_status,
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
filter,
max_version,
expected_block_size,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
extra_table_id_index,
physical_table_id,
log_tracing_id));
group_builder.addConcurrency(std::make_unique<DMSegmentThreadSourceOp>(
exec_status,
dm_context,
read_task_pool,
after_segment_read,
columns_to_read,
filter,
max_version,
expected_block_size,
/* read_mode = */ is_fast_scan ? ReadMode::Fast : ReadMode::Normal,
log_tracing_id));
}
group_builder.transform([&](auto & builder) {
builder.appendTransformOp(std::make_unique<AddExtraTableIDColumnTransformOp>(
exec_status,
log_tracing_id,
columns_to_read,
extra_table_id_index,
physical_table_id));
});
}

LOG_DEBUG(tracing_logger, "Read create PipelineExec done");
}

Expand Down

0 comments on commit 2513bd4

Please sign in to comment.