Skip to content

Commit

Permalink
Support late materialization (#6966)
Browse files Browse the repository at this point in the history
ref #5829
  • Loading branch information
Lloyd-Pottiger authored Mar 16, 2023
1 parent c58c1fc commit b82b0ba
Show file tree
Hide file tree
Showing 48 changed files with 1,751 additions and 454 deletions.
2 changes: 1 addition & 1 deletion contrib/tipb
10 changes: 6 additions & 4 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -530,14 +530,16 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
return {};

Block res = header.cloneEmpty();

size_t num_rows = blocks.front().rows();
for (const auto & block : blocks)
{
RUNTIME_CHECK_MSG(block.rows() == num_rows, "Cannot hstack blocks with different number of rows");
for (const auto & elem : block)
{
res.getByName(elem.name).column = std::move(elem.column);
if (likely(res.has(elem.name)))
{
res.getByName(elem.name).column = std::move(elem.column);
}
}
}

Expand Down Expand Up @@ -662,12 +664,12 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
WriteBufferFromString lhs_diff_writer(out_lhs_diff);
WriteBufferFromString rhs_diff_writer(out_rhs_diff);

for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it)
for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it) // NOLINT
{
lhs_diff_writer << it->dumpStructure();
lhs_diff_writer << ", position: " << lhs.getPositionByName(it->name) << '\n';
}
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it)
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it) // NOLINT
{
rhs_diff_writer << it->dumpStructure();
rhs_diff_writer << ", position: " << rhs.getPositionByName(it->name) << '\n';
Expand Down
15 changes: 13 additions & 2 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -172,10 +172,21 @@ using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
using BucketBlocksListMap = std::map<Int32, BlocksList>;

/// join blocks by columns
/// Join blocks by columns
/// The schema of the output block is the same as the header block.
/// The columns not in the header block will be ignored.
/// For example:
/// header: (a UInt32, b UInt32, c UInt32, d UInt32)
/// block1: (a UInt32, b UInt32, c UInt32, e UInt32), rows: 3
/// block2: (d UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32, d UInt32), rows: 3
Block hstackBlocks(Blocks && blocks, const Block & header);

/// join blocks by rows
/// Join blocks by rows
/// For example:
/// block1: (a UInt32, b UInt32, c UInt32), rows: 2
/// block2: (a UInt32, b UInt32, c UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32), rows: 5
Block vstackBlocks(Blocks && blocks);

Block popBlocksListFront(BlocksList & blocks);
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/DataStreams/FilterBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ class FilterBlockInputStream : public IProfilingBlockInputStream
return readImpl(filter_ignored, false);
}

// Note: When return_filter is true, res_filter will be point to the filter column of the returned block.
// If res_filter is nullptr, it means the filter conditions are always true.
Block readImpl(FilterPtr & res_filter, bool return_filter) override;

private:
Expand Down
12 changes: 6 additions & 6 deletions dbms/src/DataStreams/FilterTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,18 +114,18 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
filter_holder = filter_and_holder.data_holder;
}

size_t filtered_rows = countBytesInFilter(*filter);

/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
return false;

if (return_filter)
{
res_filter = filter;
return true;
}

size_t filtered_rows = countBytesInFilter(*filter);

/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
return false;

/// If all the rows pass through the filter.
if (filtered_rows == rows)
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/DataStreams/FilterTransformAction.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ struct FilterTransformAction

bool alwaysFalse() const;
// return false if all filter out.
// if return_filter is true and all rows are passed, set res_filter = nullptr.
// When return_filter is true, res_filter will be set to the filter column.
// Always return true, and when filter conditions are always true, set res_filter = nullptr.
bool transform(Block & block, FilterPtr & res_filter, bool return_filter);
Block getHeader() const;
ExpressionActionsPtr getExperssion() const;
Expand Down
5 changes: 2 additions & 3 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,6 @@ std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareFor
column_names.push_back(column_info.name);

auto scan_context = std::make_shared<DM::ScanContext>();

SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.keep_order = false;
Expand All @@ -170,11 +169,11 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
google::protobuf::RepeatedPtrField<tipb::Expr>{}, // Not care now
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer);

auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions->conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
// TODO: set num_streams, then ins.size() != 1
BlockInputStreamPtr in = ins[0];
Expand Down
40 changes: 27 additions & 13 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -898,13 +898,14 @@ String DAGExpressionAnalyzer::appendTimeZoneCast(
return cast_expr_name;
}

bool DAGExpressionAnalyzer::buildExtraCastsAfterTS(
std::pair<bool, std::vector<String>> DAGExpressionAnalyzer::buildExtraCastsAfterTS(
const ExpressionActionsPtr & actions,
const std::vector<ExtraCastAfterTSMode> & need_cast_column,
const ColumnInfos & table_scan_columns)
{
bool has_cast = false;

std::vector<String> casted_columns;
casted_columns.reserve(need_cast_column.size());
// For TimeZone
tipb::Expr tz_expr = constructTZExpr(context.getTimezoneInfo());
String tz_col = getActions(tz_expr, actions);
Expand All @@ -917,10 +918,10 @@ bool DAGExpressionAnalyzer::buildExtraCastsAfterTS(
static const String dur_func_name = "FunctionConvertDurationFromNanos";
for (size_t i = 0; i < need_cast_column.size(); ++i)
{
String casted_name = source_columns[i].name;
if (!context.getTimezoneInfo().is_utc_timezone && need_cast_column[i] == ExtraCastAfterTSMode::AppendTimeZoneCast)
{
String casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, timezone_func_name, actions);
source_columns[i].name = casted_name;
casted_name = appendTimeZoneCast(tz_col, source_columns[i].name, timezone_func_name, actions);
has_cast = true;
}

Expand All @@ -931,18 +932,17 @@ bool DAGExpressionAnalyzer::buildExtraCastsAfterTS(
const auto fsp = table_scan_columns[i].decimal < 0 ? 0 : table_scan_columns[i].decimal;
tipb::Expr fsp_expr = constructInt64LiteralTiExpr(fsp);
fsp_col = getActions(fsp_expr, actions);
String casted_name = appendDurationCast(fsp_col, source_columns[i].name, dur_func_name, actions);
source_columns[i].name = casted_name;
casted_name = appendDurationCast(fsp_col, source_columns[i].name, dur_func_name, actions);
// We will replace the source_columns[i] with the casted column later
// so we need to update the type of the source_column[i]
source_columns[i].type = actions->getSampleBlock().getByName(casted_name).type;
has_cast = true;
}

casted_columns.emplace_back(std::move(casted_name));
}
NamesWithAliases project_cols;
for (auto & col : source_columns)
project_cols.emplace_back(col.name, col.name);
actions->add(ExpressionAction::project(project_cols));

return has_cast;
return {has_cast, casted_columns};
}

bool DAGExpressionAnalyzer::appendExtraCastsAfterTS(
Expand All @@ -951,13 +951,27 @@ bool DAGExpressionAnalyzer::appendExtraCastsAfterTS(
const TiDBTableScan & table_scan)
{
auto & step = initAndGetLastStep(chain);
auto & actions = step.actions;

bool has_cast = buildExtraCastsAfterTS(step.actions, need_cast_column, table_scan.getColumns());
auto [has_cast, casted_columns] = buildExtraCastsAfterTS(actions, need_cast_column, table_scan.getColumns());

if (!has_cast)
return false;

// Add a projection to replace the original columns with the casted columns.
// For example:
// we have a block with columns (a int64, b float, c int64)
// after the cast, the block will be (a int64, b float, c int64, casted_c MyDuration)
// After this projection, the block will be (a int64, b float, c MyDuration)
NamesWithAliases project_cols;
for (size_t i = 0; i < need_cast_column.size(); ++i)
project_cols.emplace_back(casted_columns[i], source_columns[i].name);
actions->add(ExpressionAction::project(project_cols));

for (auto & col : source_columns)
step.required_output.push_back(col.name);

return has_cast;
return true;
}

String DAGExpressionAnalyzer::appendDurationCast(
Expand Down
10 changes: 5 additions & 5 deletions dbms/src/Flash/Coprocessor/DAGExpressionAnalyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,11 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const ExpressionActionsPtr & actions,
const tipb::Aggregation & agg);

std::pair<bool, std::vector<String>> buildExtraCastsAfterTS(
const ExpressionActionsPtr & actions,
const std::vector<ExtraCastAfterTSMode> & need_cast_column,
const ColumnInfos & table_scan_columns);

#ifndef DBMS_PUBLIC_GTEST
private:
#endif
Expand Down Expand Up @@ -285,11 +290,6 @@ class DAGExpressionAnalyzer : private boost::noncopyable
const String & expr_name,
bool force_uint8);

bool buildExtraCastsAfterTS(
const ExpressionActionsPtr & actions,
const std::vector<ExtraCastAfterTSMode> & need_cast_column,
const ColumnInfos & table_scan_columns);

/// @ret: if some new expression actions are added.
/// @key_names: column names of keys.
/// @original_key_names: original column names of keys.(only used for null-aware semi join)
Expand Down
11 changes: 8 additions & 3 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,24 @@ struct DAGQueryInfo
{
DAGQueryInfo(
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters_,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters_,
DAGPreparedSets dag_sets_,
const NamesAndTypes & source_columns_,
const TimezoneInfo & timezone_info_)
: filters(filters_)
: source_columns(source_columns_)
, filters(filters_)
, pushed_down_filters(pushed_down_filters_)
, dag_sets(std::move(dag_sets_))
, source_columns(source_columns_)
, timezone_info(timezone_info_){};

const NamesAndTypes & source_columns;
// filters in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
// filters have been push down to storage engine in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters;
// Prepared sets extracted from dag request, which are used for indices
// by storage engine.
DAGPreparedSets dag_sets;
const NamesAndTypes & source_columns;

const TimezoneInfo & timezone_info;
};
Expand Down
60 changes: 29 additions & 31 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ bool hasRegionToRead(const DAGContext & dag_context, const TiDBTableScan & table

// add timezone cast for timestamp type, this is used to support session level timezone
// <has_cast, extra_cast, project_for_remote_read>
std::tuple<bool, ExpressionActionsPtr, ExpressionActionsPtr> addExtraCastsAfterTs(
std::pair<bool, ExpressionActionsPtr> addExtraCastsAfterTs(
DAGExpressionAnalyzer & analyzer,
const std::vector<ExtraCastAfterTSMode> & need_cast_column,
const TiDBTableScan & table_scan)
Expand All @@ -184,35 +184,21 @@ std::tuple<bool, ExpressionActionsPtr, ExpressionActionsPtr> addExtraCastsAfterT
for (auto b : need_cast_column)
has_need_cast_column |= (b != ExtraCastAfterTSMode::None);
if (!has_need_cast_column)
return {false, nullptr, nullptr};
return {false, nullptr};

ExpressionActionsChain chain;
analyzer.initChain(chain);
auto original_source_columns = analyzer.getCurrentInputColumns();
// execute timezone cast or duration cast if needed for local table scan
if (analyzer.appendExtraCastsAfterTS(chain, need_cast_column, table_scan))
{
ExpressionActionsPtr extra_cast = chain.getLastActions();
assert(extra_cast);
chain.finalize();
chain.clear();

// After `analyzer.appendExtraCastsAfterTS`, analyzer.getCurrentInputColumns() has been modified.
// For remote read, `timezone cast and duration cast` had been pushed down, don't need to execute cast expressions.
// To keep the schema of local read streams and remote read streams the same, do project action for remote read streams.
NamesWithAliases project_for_remote_read;
const auto & after_cast_source_columns = analyzer.getCurrentInputColumns();
for (size_t i = 0; i < after_cast_source_columns.size(); ++i)
project_for_remote_read.emplace_back(original_source_columns[i].name, after_cast_source_columns[i].name);
assert(!project_for_remote_read.empty());
ExpressionActionsPtr project_for_cop_read = std::make_shared<ExpressionActions>(original_source_columns);
project_for_cop_read->add(ExpressionAction::project(project_for_remote_read));

return {true, extra_cast, project_for_cop_read};
return {true, extra_cast};
}
else
{
return {false, nullptr, nullptr};
return {false, nullptr};
}
}

Expand Down Expand Up @@ -412,7 +398,7 @@ void DAGStorageInterpreter::executeCastAfterTableScan(
DAGPipeline & pipeline)
{
// execute timezone cast or duration cast if needed for local table scan
auto [has_cast, extra_cast, project_for_cop_read] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan);
auto [has_cast, extra_cast] = addExtraCastsAfterTs(*analyzer, is_need_add_cast_column, table_scan);
if (has_cast)
{
assert(remote_read_streams_start_index <= pipeline.streams.size());
Expand All @@ -424,13 +410,6 @@ void DAGStorageInterpreter::executeCastAfterTableScan(
stream = std::make_shared<ExpressionBlockInputStream>(stream, extra_cast, log->identifier());
stream->setExtraInfo("cast after local tableScan");
}
// remote streams
while (i < pipeline.streams.size())
{
auto & stream = pipeline.streams[i++];
stream = std::make_shared<ExpressionBlockInputStream>(stream, project_for_cop_read, log->identifier());
stream->setExtraInfo("cast after remote tableScan");
}
}
}

Expand Down Expand Up @@ -609,6 +588,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.query = dagContext().dummy_ast;
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions.conditions,
table_scan.getPushedDownFilters(),
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
Expand Down Expand Up @@ -1003,7 +983,9 @@ std::unordered_map<TableID, DAGStorageInterpreter::StorageWithStructureLock> DAG
std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageInterpreter::getColumnsForTableScan()
{
Names required_columns_tmp;
required_columns_tmp.reserve(table_scan.getColumnSize());
NamesAndTypes source_columns_tmp;
source_columns_tmp.reserve(table_scan.getColumnSize());
std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(table_scan.getColumnSize());
String handle_column_name = MutableSupport::tidb_pk_column_name;
Expand Down Expand Up @@ -1043,12 +1025,28 @@ std::tuple<Names, NamesAndTypes, std::vector<ExtraCastAfterTSMode>> DAGStorageIn
source_columns_tmp.emplace_back(std::move(pair));
}
required_columns_tmp.emplace_back(std::move(name));
if (cid != -1 && ci.tp == TiDB::TypeTimestamp)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
else if (cid != -1 && ci.tp == TiDB::TypeTime)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
else
}

std::unordered_set<String> col_name_set;
for (const auto & expr : table_scan.getPushedDownFilters())
{
getColumnNamesFromExpr(expr, source_columns_tmp, col_name_set);
}
for (const auto & col : table_scan.getColumns())
{
if (col_name_set.contains(col.name))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
else
{
if (col.id != -1 && col.tp == TiDB::TypeTimestamp)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendTimeZoneCast);
else if (col.id != -1 && col.tp == TiDB::TypeTime)
need_cast_column.push_back(ExtraCastAfterTSMode::AppendDurationCast);
else
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
}

return {required_columns_tmp, source_columns_tmp, need_cast_column};
Expand Down
Loading

0 comments on commit b82b0ba

Please sign in to comment.