Skip to content

Commit

Permalink
Add a config to force enable lm and add more tests for lm (#7399)
Browse files Browse the repository at this point in the history
ref #5829
  • Loading branch information
Lloyd-Pottiger authored May 25, 2023
1 parent efb62b9 commit cdef326
Show file tree
Hide file tree
Showing 16 changed files with 334 additions and 202 deletions.
7 changes: 3 additions & 4 deletions dbms/src/DataStreams/FilterTransformAction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <Common/typeid_cast.h>
#include <DataStreams/FilterTransformAction.h>

#include <algorithm>

namespace DB
{
Expand Down Expand Up @@ -51,7 +50,7 @@ FilterTransformAction::FilterTransformAction(
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
column_elem.column = column_elem.type->createColumnConst(header.rows(), static_cast<UInt64>(1));
}
}

Expand Down Expand Up @@ -131,7 +130,7 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
{
/// Replace the column with the filter by a constant.
block.safeGetByPosition(filter_column).column
= block.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
= block.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, static_cast<UInt64>(1));
/// No need to touch the rest of the columns.
return true;
}
Expand All @@ -148,7 +147,7 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, UInt64(1));
current_column.column = current_column.type->createColumnConst(filtered_rows, static_cast<UInt64>(1));
continue;
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/Coprocessor/DAGQueryInfo.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <google/protobuf/repeated_ptr_field.h>
#include <tipb/expression.pb.h>

#include <unordered_map>

namespace DB
{
Expand All @@ -37,6 +36,7 @@ struct DAGQueryInfo
, pushed_down_filters(pushed_down_filters_)
, timezone_info(timezone_info_){};

// A light copy of tipb::TableScan::columns from TiDB, some attributes are empty, like name.
const ColumnInfos & source_columns;
// filters in dag request
const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());

/// handle filter conditions for local and remote table scan.
if (filter_conditions.hasValue())
/// If force_push_down_all_filters_to_scan is set, we will build all filter conditions in scan.
if (filter_conditions.hasValue() && likely(!context.getSettingsRef().force_push_down_all_filters_to_scan))
{
::DB::executePushedDownFilter(remote_read_streams_start_index, filter_conditions, *analyzer, log, pipeline);
recordProfileStreams(pipeline, filter_conditions.executor_id);
Expand Down Expand Up @@ -1312,6 +1313,11 @@ std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableSc
std::unordered_set<ColumnID> filter_col_id_set;
for (const auto & expr : table_scan.getPushedDownFilters())
getColumnIDsFromExpr(expr, table_scan.getColumns(), filter_col_id_set);
if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan))
{
for (const auto & expr : filter_conditions.conditions)
getColumnIDsFromExpr(expr, table_scan.getColumns(), filter_col_id_set);
}
std::vector<UInt8> may_need_add_cast_column_tmp;
may_need_add_cast_column_tmp.reserve(table_scan.getColumnSize());
// If the column is not generated column, not in the filter columns and column id is not -1, then it may need cast.
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,7 @@ struct Settings
M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool") \
M(SettingTaskQueueType, pipeline_io_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of io task thread pool") \
M(SettingUInt64, local_tunnel_version, 2, "1: not refined, 2: refined") \
M(SettingBool, force_push_down_all_filters_to_scan, false, "Push down all filters to scan, only used for test") \
M(SettingUInt64, async_recv_version, 1, "1: reactor mode, 2: no additional threads") \
M(SettingUInt64, recv_queue_size, 0, "size of ExchangeReceiver queue, 0 means the size is set to data_source_mpp_task_num * 50") \
M(SettingUInt64, shallow_copy_cross_probe_threshold, 0, "minimum right rows to use shallow copy probe mode for cross join, default is max(1, max_block_size/10)")
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1692,7 +1692,7 @@ SortDescription DeltaMergeStore::getPrimarySortDescription() const
return desc;
}

void DeltaMergeStore::restoreStableFilesFromLocal()
void DeltaMergeStore::restoreStableFilesFromLocal() const
{
DMFile::ListOptions options;
options.only_list_can_gc = false; // We need all files to restore the bytes on disk
Expand All @@ -1711,7 +1711,7 @@ void DeltaMergeStore::restoreStableFilesFromLocal()
}
}

void DeltaMergeStore::restoreStableFiles()
void DeltaMergeStore::restoreStableFiles() const
{
LOG_DEBUG(log, "Loading dt files");

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -671,8 +671,8 @@ class DeltaMergeStore : private boost::noncopyable

bool handleBackgroundTask(bool heavy);

void restoreStableFiles();
void restoreStableFilesFromLocal();
void restoreStableFiles() const;
void restoreStableFilesFromLocal() const;

SegmentReadTasks getReadTasksByRanges(DMContext & dm_context,
const RowKeyRanges & sorted_ranges,
Expand Down
22 changes: 14 additions & 8 deletions dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
public:
PushDownFilter(const RSOperatorPtr & rs_operator_,
const ExpressionActionsPtr & beofre_where_,
const ColumnDefines & filter_columns_,
const ExpressionActionsPtr & project_after_where_,
const ColumnDefinesPtr & filter_columns_,
const String filter_column_name_,
const ExpressionActionsPtr & extra_cast_,
const ColumnDefinesPtr & columns_after_cast_)
: rs_operator(rs_operator_)
, before_where(beofre_where_)
, project_after_where(project_after_where_)
, filter_column_name(std::move(filter_column_name_))
, filter_columns(std::move(filter_columns_))
, filter_columns(filter_columns_)
, extra_cast(extra_cast_)
, columns_after_cast(columns_after_cast_)
{}
Expand All @@ -46,17 +48,21 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
{}

// Rough set operator
RSOperatorPtr rs_operator;
const RSOperatorPtr rs_operator;
// Filter expression actions and the name of the tmp filter column
// Used construct the FilterBlockInputStream
ExpressionActionsPtr before_where;
String filter_column_name;
const ExpressionActionsPtr before_where;
// The projection after the filter, used to remove the tmp filter column
// Used to construct the ExpressionBlockInputStream
// Note: ususally we will remove the tmp filter column in the LateMaterializationBlockInputStream, this only used for unexpected cases
const ExpressionActionsPtr project_after_where;
const String filter_column_name;
// The columns needed by the filter expression
ColumnDefines filter_columns;
const ColumnDefinesPtr filter_columns;
// The expression actions used to cast the timestamp/datetime column
ExpressionActionsPtr extra_cast;
const ExpressionActionsPtr extra_cast;
// If the extra_cast is not null, the types of the columns may be changed
ColumnDefinesPtr columns_after_cast;
const ColumnDefinesPtr columns_after_cast;
};

} // namespace DB::DM
52 changes: 32 additions & 20 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2736,24 +2736,44 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
const auto & filter_columns = filter->filter_columns;
SkippableBlockInputStreamPtr filter_column_stable_stream = segment_snap->stable->getInputStream(
dm_context,
filter_columns,
*filter_columns,
data_ranges,
filter->rs_operator,
max_version,
expected_block_size,
enable_handle_clean_read,
is_fast_scan,
enable_del_clean_read);

auto filter_columns_to_read_ptr = std::make_shared<ColumnDefines>(filter_columns);
SkippableBlockInputStreamPtr filter_column_delta_stream = std::make_shared<DeltaValueInputStream>(
dm_context,
segment_snap->delta,
filter_columns_to_read_ptr,
filter_columns,
this->rowkey_range);

if (unlikely(filter_columns->size() == columns_to_read.size()))
{
LOG_ERROR(log, "Late materialization filter columns size equal to read columns size, which is not expected.");
BlockInputStreamPtr stream = std::make_shared<BitmapFilterBlockInputStream>(
*filter_columns,
filter_column_stable_stream,
filter_column_delta_stream,
segment_snap->stable->getDMFilesRows(),
bitmap_filter,
dm_context.tracing_id);
if (filter->extra_cast)
{
stream = std::make_shared<ExpressionBlockInputStream>(stream, filter->extra_cast, dm_context.tracing_id);
stream->setExtraInfo("cast after tableScan");
}
stream = std::make_shared<FilterBlockInputStream>(stream, filter->before_where, filter->filter_column_name, dm_context.tracing_id);
stream->setExtraInfo("push down filter");
stream = std::make_shared<ExpressionBlockInputStream>(stream, filter->project_after_where, dm_context.tracing_id);
stream->setExtraInfo("project after where");
return stream;
}

BlockInputStreamPtr filter_column_stream = std::make_shared<RowKeyOrderedBlockInputStream>(
filter_columns,
*filter_columns,
filter_column_stable_stream,
filter_column_delta_stream,
segment_snap->stable->getDMFilesRows(),
Expand All @@ -2769,41 +2789,33 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
// construct filter stream
filter_column_stream = std::make_shared<FilterBlockInputStream>(filter_column_stream, filter->before_where, filter->filter_column_name, dm_context.tracing_id);
filter_column_stream->setExtraInfo("push down filter");
if (filter_columns.size() == columns_to_read.size())
{
LOG_ERROR(log, "Late materialization filter columns size equal to read columns size, which is not expected.");
// no need to read columns again
return filter_column_stream;
}

ColumnDefines rest_columns_to_read{columns_to_read};
auto rest_columns_to_read = std::make_shared<ColumnDefines>(columns_to_read);
// remove columns of pushed down filter
for (const auto & col : filter_columns)
for (const auto & col : *filter_columns)
{
rest_columns_to_read.erase(std::remove_if(rest_columns_to_read.begin(), rest_columns_to_read.end(), [&](const ColumnDefine & c) { return c.id == col.id; }), rest_columns_to_read.end());
rest_columns_to_read->erase(std::remove_if(rest_columns_to_read->begin(), rest_columns_to_read->end(), [&](const ColumnDefine & c) { return c.id == col.id; }),
rest_columns_to_read->end());
}

// construct rest column stream
SkippableBlockInputStreamPtr rest_column_stable_stream = segment_snap->stable->getInputStream(
dm_context,
rest_columns_to_read,
*rest_columns_to_read,
data_ranges,
filter->rs_operator,
max_version,
expected_block_size,
enable_handle_clean_read,
is_fast_scan,
enable_del_clean_read);

auto rest_columns_to_read_ptr = std::make_shared<ColumnDefines>(rest_columns_to_read);
SkippableBlockInputStreamPtr rest_column_delta_stream = std::make_shared<DeltaValueInputStream>(
dm_context,
segment_snap->delta,
rest_columns_to_read_ptr,
rest_columns_to_read,
this->rowkey_range);

SkippableBlockInputStreamPtr rest_column_stream = std::make_shared<RowKeyOrderedBlockInputStream>(
rest_columns_to_read,
*rest_columns_to_read,
rest_column_stable_stream,
rest_column_delta_stream,
segment_snap->stable->getDMFilesRows(),
Expand Down
Loading

0 comments on commit cdef326

Please sign in to comment.