Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Storage: refine some code in lm #7313

Merged
merged 3 commits into from
Apr 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -145,10 +145,11 @@ std::tuple<StorageDeltaMergePtr, Names, SelectQueryInfo> MockStorage::prepareFor
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
auto column_infos = table_schema_for_delta_merge[table_id];
auto & column_infos = table_schema_for_delta_merge[table_id];
assert(storage);
assert(!column_infos.empty());
Names column_names;
column_names.reserve(column_infos.size());
for (const auto & column_info : column_infos)
column_names.push_back(column_info.name);

Expand All @@ -171,8 +172,7 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions->conditions,
pushed_down_filters, // Not care now
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
mockColumnInfosToTiDBColumnInfos(table_schema_for_delta_merge[table_id]),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(filter_conditions->conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
Expand Down
16 changes: 6 additions & 10 deletions dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,10 @@

#pragma once

#include <Core/NamesAndTypes.h>
#include <Flash/Coprocessor/DAGExpressionAnalyzer.h>
#include <Flash/Coprocessor/DAGQuerySource.h>
#include <Interpreters/TimezoneInfo.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <google/protobuf/repeated_ptr_field.h>
#include <tipb/expression.pb.h>

#include <unordered_map>

Expand All @@ -29,23 +30,18 @@ struct DAGQueryInfo
DAGQueryInfo(
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters_,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters_,
DAGPreparedSets dag_sets_,
const NamesAndTypes & source_columns_,
const ColumnInfos & source_columns_,
const TimezoneInfo & timezone_info_)
: source_columns(source_columns_)
, filters(filters_)
, pushed_down_filters(pushed_down_filters_)
, dag_sets(std::move(dag_sets_))
, timezone_info(timezone_info_){};

const NamesAndTypes & source_columns;
const ColumnInfos & source_columns;
// filters in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
// filters have been push down to storage engine in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters;
// Prepared sets extracted from dag request, which are used for indices
// by storage engine.
DAGPreparedSets dag_sets;

const TimezoneInfo & timezone_info;
};
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -599,8 +599,7 @@ std::unordered_map<TableID, SelectQueryInfo> DAGStorageInterpreter::generateSele
query_info.dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions.conditions,
table_scan.getPushedDownFilters(),
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
table_scan.getColumns(),
context.getTimezoneInfo());
query_info.req_id = fmt::format("{} table_id={}", log->identifier(), table_id);
query_info.keep_order = table_scan.keepOrder();
Expand Down
24 changes: 3 additions & 21 deletions dbms/src/Flash/Coprocessor/DAGUtils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1094,7 +1094,7 @@ Field decodeLiteral(const tipb::Expr & expr)
case tipb::ExprType::Uint64:
return decodeDAGUInt64(expr.val());
case tipb::ExprType::Float32:
return Float64(decodeDAGFloat32(expr.val()));
return static_cast<Float64>(decodeDAGFloat32(expr.val()));
case tipb::ExprType::Float64:
return decodeDAGFloat64(expr.val());
case tipb::ExprType::String:
Expand Down Expand Up @@ -1167,24 +1167,6 @@ void getColumnIDsFromExpr(const tipb::Expr & expr, const std::vector<ColumnInfo>
}
}

void getColumnNamesFromExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, std::unordered_set<String> & col_id_set)
{
if (expr.children_size() == 0)
{
if (isColumnExpr(expr))
{
col_id_set.insert(getColumnNameForColumnExpr(expr, input_col));
}
}
else
{
for (const auto & child : expr.children())
{
getColumnNamesFromExpr(child, input_col, col_id_set);
}
}
}

NameAndTypePair getColumnNameAndTypeForColumnExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col)
{
auto column_index = decodeDAGInt64(expr.val());
Expand All @@ -1204,8 +1186,8 @@ NameAndTypePair getColumnNameAndTypeForColumnExpr(const tipb::Expr & expr, const
bool exprHasValidFieldType(const tipb::Expr & expr)
{
return expr.has_field_type()
&& !(expr.field_type().tp() == TiDB::TP::TypeNewDecimal
&& (expr.field_type().decimal() == -1 || expr.field_type().flen() == 0 || expr.field_type().flen() == -1));
&& (expr.field_type().tp() != TiDB::TP::TypeNewDecimal
|| (expr.field_type().decimal() != -1 && expr.field_type().flen() != 0 && expr.field_type().flen() != -1));
}

bool isUnsupportedEncodeType(const std::vector<tipb::FieldType> & types, tipb::EncodeType encode_type)
Expand Down
1 change: 0 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGUtils.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ String getJoinExecTypeName(const tipb::JoinExecType & tp);
bool isColumnExpr(const tipb::Expr & expr);
String getColumnNameForColumnExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
void getColumnIDsFromExpr(const tipb::Expr & expr, const std::vector<ColumnInfo> & input_col, std::unordered_set<ColumnID> & col_id_set);
void getColumnNamesFromExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col, std::unordered_set<String> & col_id_set);
NameAndTypePair getColumnNameAndTypeForColumnExpr(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
const String & getTypeName(const tipb::Expr & expr);
String exprToString(const tipb::Expr & expr, const std::vector<NameAndTypePair> & input_col);
Expand Down
51 changes: 11 additions & 40 deletions dbms/src/Storages/StorageDeltaMerge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -740,7 +740,7 @@ DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator(const SelectQueryInfo & que
}

DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr & rs_operator,
const ColumnInfos & table_infos,
const ColumnInfos & table_scan_column_info,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters,
const ColumnDefines & columns_to_read,
const Context & context,
Expand All @@ -753,58 +753,28 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
{
columns_to_read_name_and_type.emplace_back(col.name, col.type);
}

std::unordered_set<String> filter_column_names;
for (const auto & filter : pushed_down_filters)
std::unordered_set<ColumnID> filter_col_id_set;
for (const auto & expr : pushed_down_filters)
{
DB::getColumnNamesFromExpr(filter, columns_to_read_name_and_type, filter_column_names);
getColumnIDsFromExpr(expr, table_scan_column_info, filter_col_id_set);
}
ColumnDefines filter_columns;
filter_columns.reserve(filter_column_names.size());
for (const auto & name : filter_column_names)
filter_columns.reserve(filter_col_id_set.size());
for (const auto & id : filter_col_id_set)
{
auto iter = std::find_if(
columns_to_read.begin(),
columns_to_read.end(),
[&name](const ColumnDefine & d) -> bool { return d.name == name; });
[&id](const ColumnDefine & d) -> bool { return d.id == id; });
RUNTIME_CHECK(iter != columns_to_read.end());
filter_columns.push_back(*iter);
}

ColumnInfos table_scan_column_info;
table_scan_column_info.reserve(columns_to_read.size());
for (const auto & col : columns_to_read)
{
// table_infos does not contain EXTRA_HANDLE_COLUMN and EXTRA_TABLE_ID_COLUMN
if (col.id == EXTRA_HANDLE_COLUMN_ID)
{
auto handle = ColumnInfo();
handle.id = EXTRA_HANDLE_COLUMN_ID;
handle.name = EXTRA_HANDLE_COLUMN_NAME;
table_scan_column_info.push_back(handle);
continue;
}
else if (col.id == ExtraTableIDColumnID)
{
auto col = ColumnInfo();
col.id = ExtraTableIDColumnID;
col.name = EXTRA_TABLE_ID_COLUMN_NAME;
table_scan_column_info.push_back(col);
continue;
}
auto iter = std::find_if(
table_infos.begin(),
table_infos.end(),
[col](const ColumnInfo & c) -> bool { return c.id == col.id; });
RUNTIME_CHECK_MSG(iter != table_infos.end(), "column: [id: {}, name: {}] not found in table info", col.id, col.name);
table_scan_column_info.push_back(*iter);
}

std::vector<ExtraCastAfterTSMode> need_cast_column;
need_cast_column.reserve(columns_to_read.size());
for (const auto & col : table_scan_column_info)
{
if (!filter_column_names.contains(col.name))
if (!filter_col_id_set.contains(col.id))
{
need_cast_column.push_back(ExtraCastAfterTSMode::None);
}
Expand All @@ -829,7 +799,7 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
NamesWithAliases project_cols;
for (size_t i = 0; i < columns_to_read.size(); ++i)
{
if (filter_column_names.contains(columns_to_read[i].name))
if (filter_col_id_set.contains(columns_to_read[i].id))
{
project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
}
Expand Down Expand Up @@ -866,7 +836,8 @@ DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter(const SelectQueryIn

// build push down filter
const auto & pushed_down_filters = query_info.dag_query != nullptr ? query_info.dag_query->pushed_down_filters : google::protobuf::RepeatedPtrField<tipb::Expr>{};
return buildPushDownFilter(rs_operator, tidb_table_info.columns, pushed_down_filters, columns_to_read, context, tracing_logger);
const auto & columns_to_read_info = query_info.dag_query != nullptr ? query_info.dag_query->source_columns : ColumnInfos{};
return buildPushDownFilter(rs_operator, columns_to_read_info, pushed_down_filters, columns_to_read, context, tracing_logger);
}

BlockInputStreams StorageDeltaMerge::read(
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Storages/StorageDeltaMerge.h
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ class StorageDeltaMerge
}

static DM::PushDownFilterPtr buildPushDownFilter(const DM::RSOperatorPtr & rs_operator,
const ColumnInfos & table_infos,
const ColumnInfos & table_scan_column_info,
const google::protobuf::RepeatedPtrField<tipb::Expr> & pushed_down_filters,
const DM::ColumnDefines & columns_to_read,
const Context & context,
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/StorageDisaggregatedRemote.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
#include <Storages/SelectQueryInfo.h>
#include <Storages/StorageDeltaMerge.h>
#include <Storages/StorageDisaggregated.h>
#include <Storages/Transaction/DecodingStorageSchemaSnapshot.h>
#include <Storages/Transaction/TMTContext.h>
#include <Storages/Transaction/TiDB.h>
#include <Storages/Transaction/Types.h>
Expand Down Expand Up @@ -433,8 +434,7 @@ DM::RSOperatorPtr StorageDisaggregated::buildRSOperator(
auto dag_query = std::make_unique<DAGQueryInfo>(
filter_conditions.conditions,
table_scan.getPushedDownFilters(),
DAGPreparedSets{}, // Not care now
NamesAndTypes{}, // Not care now
table_scan.getColumns(),
db_context.getTimezoneInfo());
auto create_attr_by_column_id = [defines = columns_to_read](ColumnID column_id) -> DM::Attr {
auto iter = std::find_if(
Expand Down
16 changes: 7 additions & 9 deletions dbms/src/Storages/tests/gtest_filter_parser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,20 +94,18 @@ DM::RSOperatorPtr FilterParserTest::generateRsOperator(const String table_info_j

std::unique_ptr<DAGQueryInfo> dag_query;
DM::ColumnDefines columns_to_read;
columns_to_read.reserve(table_info.columns.size());
{
NamesAndTypes source_columns;
std::tie(source_columns, std::ignore) = parseColumnsFromTableInfo(table_info);
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters;
dag_query = std::make_unique<DAGQueryInfo>(
conditions,
google::protobuf::RepeatedPtrField<tipb::Expr>{}, // don't care pushed down filters
DAGPreparedSets(),
source_columns,
timezone_info);
for (const auto & column : table_info.columns)
{
columns_to_read.push_back(DM::ColumnDefine(column.id, column.name, getDataTypeByColumnInfo(column)));
}
const google::protobuf::RepeatedPtrField<tipb::Expr> pushed_down_filters; // don't care pushed down filters
dag_query = std::make_unique<DAGQueryInfo>(
conditions,
pushed_down_filters,
table_info.columns,
timezone_info);
}
auto create_attr_by_column_id = [&columns_to_read](ColumnID column_id) -> DM::Attr {
auto iter = std::find_if(
Expand Down
Loading