From cdef326a57850c7264cf105c657307555afa0196 Mon Sep 17 00:00:00 2001
From: Lloyd-Pottiger <60744015+Lloyd-Pottiger@users.noreply.github.com>
Date: Thu, 25 May 2023 18:27:39 +0800
Subject: [PATCH] Add a config to force enable lm and add more tests for lm
 (#7399)

ref pingcap/tiflash#5829
---
 .../src/DataStreams/FilterTransformAction.cpp |   7 +-
 dbms/src/Flash/Coprocessor/DAGQueryInfo.h     |   2 +-
 .../Coprocessor/DAGStorageInterpreter.cpp     |   8 +-
 dbms/src/Interpreters/Settings.h              |   1 +
 .../Storages/DeltaMerge/DeltaMergeStore.cpp   |   4 +-
 .../src/Storages/DeltaMerge/DeltaMergeStore.h |   4 +-
 .../DeltaMerge/Filter/PushDownFilter.h        |  22 +-
 dbms/src/Storages/DeltaMerge/Segment.cpp      |  52 ++--
 dbms/src/Storages/StorageDeltaMerge.cpp       | 224 +++++++++---------
 dbms/src/Storages/StorageDeltaMerge.h         |   2 +-
 .../tests/gtests_parse_push_down_filter.cpp   | 104 ++++----
 .../config/tiflash_dt_force_enable_lm.toml    |  53 +++++
 tests/docker/tiflash-dt-force-enable-lm.yaml  |  43 ++++
 tests/tidb-ci/force_enable_lm                 |   1 +
 tests/tidb-ci/run.sh                          |   8 +
 tests/tidb-ci/tiflash-dt-force-enable-lm.yaml |   1 +
 16 files changed, 334 insertions(+), 202 deletions(-)
 create mode 100644 tests/docker/config/tiflash_dt_force_enable_lm.toml
 create mode 100644 tests/docker/tiflash-dt-force-enable-lm.yaml
 create mode 120000 tests/tidb-ci/force_enable_lm
 create mode 120000 tests/tidb-ci/tiflash-dt-force-enable-lm.yaml

diff --git a/dbms/src/DataStreams/FilterTransformAction.cpp b/dbms/src/DataStreams/FilterTransformAction.cpp
index b628dd43123..4bbd971d2d0 100644
--- a/dbms/src/DataStreams/FilterTransformAction.cpp
+++ b/dbms/src/DataStreams/FilterTransformAction.cpp
@@ -19,7 +19,6 @@
 #include <Common/typeid_cast.h>
 #include <DataStreams/FilterTransformAction.h>
 
-#include <algorithm>
 
 namespace DB
 {
@@ -51,7 +50,7 @@ FilterTransformAction::FilterTransformAction(
     {
         /// Replace the filter column to a constant with value 1.
         FilterDescription filter_description_check(*column_elem.column);
-        column_elem.column = column_elem.type->createColumnConst(header.rows(), UInt64(1));
+        column_elem.column = column_elem.type->createColumnConst(header.rows(), static_cast<UInt64>(1));
     }
 }
 
@@ -131,7 +130,7 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
     {
         /// Replace the column with the filter by a constant.
         block.safeGetByPosition(filter_column).column
-            = block.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, UInt64(1));
+            = block.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, static_cast<UInt64>(1));
         /// No need to touch the rest of the columns.
         return true;
     }
@@ -148,7 +147,7 @@ bool FilterTransformAction::transform(Block & block, FilterPtr & res_filter, boo
             /// Example:
             ///  SELECT materialize(100) AS x WHERE x
             /// will work incorrectly.
-            current_column.column = current_column.type->createColumnConst(filtered_rows, UInt64(1));
+            current_column.column = current_column.type->createColumnConst(filtered_rows, static_cast<UInt64>(1));
             continue;
         }
 
diff --git a/dbms/src/Flash/Coprocessor/DAGQueryInfo.h b/dbms/src/Flash/Coprocessor/DAGQueryInfo.h
index bdea67838c0..3a68a58365f 100644
--- a/dbms/src/Flash/Coprocessor/DAGQueryInfo.h
+++ b/dbms/src/Flash/Coprocessor/DAGQueryInfo.h
@@ -19,7 +19,6 @@
 #include <google/protobuf/repeated_ptr_field.h>
 #include <tipb/expression.pb.h>
 
-#include <unordered_map>
 
 namespace DB
 {
@@ -37,6 +36,7 @@ struct DAGQueryInfo
         , pushed_down_filters(pushed_down_filters_)
         , timezone_info(timezone_info_){};
 
+    // A light copy of tipb::TableScan::columns from TiDB, some attributes are empty, like name.
     const ColumnInfos & source_columns;
     // filters in dag request
     const google::protobuf::RepeatedPtrField<tipb::Expr> & filters;
diff --git a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
index ffd9b18f091..489890bbab4 100644
--- a/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
+++ b/dbms/src/Flash/Coprocessor/DAGStorageInterpreter.cpp
@@ -454,7 +454,8 @@ void DAGStorageInterpreter::executeImpl(DAGPipeline & pipeline)
     recordProfileStreams(pipeline, table_scan.getTableScanExecutorID());
 
     /// handle filter conditions for local and remote table scan.
-    if (filter_conditions.hasValue())
+    /// If force_push_down_all_filters_to_scan is set, we will build all filter conditions in scan.
+    if (filter_conditions.hasValue() && likely(!context.getSettingsRef().force_push_down_all_filters_to_scan))
     {
         ::DB::executePushedDownFilter(remote_read_streams_start_index, filter_conditions, *analyzer, log, pipeline);
         recordProfileStreams(pipeline, filter_conditions.executor_id);
@@ -1312,6 +1313,11 @@ std::pair<Names, std::vector<UInt8>> DAGStorageInterpreter::getColumnsForTableSc
     std::unordered_set<ColumnID> filter_col_id_set;
     for (const auto & expr : table_scan.getPushedDownFilters())
         getColumnIDsFromExpr(expr, table_scan.getColumns(), filter_col_id_set);
+    if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan))
+    {
+        for (const auto & expr : filter_conditions.conditions)
+            getColumnIDsFromExpr(expr, table_scan.getColumns(), filter_col_id_set);
+    }
     std::vector<UInt8> may_need_add_cast_column_tmp;
     may_need_add_cast_column_tmp.reserve(table_scan.getColumnSize());
     // If the column is not generated column, not in the filter columns and column id is not -1, then it may need cast.
diff --git a/dbms/src/Interpreters/Settings.h b/dbms/src/Interpreters/Settings.h
index c1da54cfb63..ea84c2d8b65 100644
--- a/dbms/src/Interpreters/Settings.h
+++ b/dbms/src/Interpreters/Settings.h
@@ -296,6 +296,7 @@ struct Settings
     M(SettingTaskQueueType, pipeline_cpu_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of cpu task thread pool")                                                                                                 \
     M(SettingTaskQueueType, pipeline_io_task_thread_pool_queue_type, TaskQueueType::DEFAULT, "The task queue of io task thread pool")                                                                                                   \
     M(SettingUInt64, local_tunnel_version, 2, "1: not refined, 2: refined")                                                                                                                                                             \
+    M(SettingBool, force_push_down_all_filters_to_scan, false, "Push down all filters to scan, only used for test")                                                                                                                     \
     M(SettingUInt64, async_recv_version, 1, "1: reactor mode, 2: no additional threads")                                                                                                                                                \
     M(SettingUInt64, recv_queue_size, 0, "size of ExchangeReceiver queue, 0 means the size is set to data_source_mpp_task_num * 50")                                                                                                    \
     M(SettingUInt64, shallow_copy_cross_probe_threshold, 0, "minimum right rows to use shallow copy probe mode for cross join, default is max(1, max_block_size/10)")
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
index a2bef901cbe..8b487941dd8 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
@@ -1692,7 +1692,7 @@ SortDescription DeltaMergeStore::getPrimarySortDescription() const
     return desc;
 }
 
-void DeltaMergeStore::restoreStableFilesFromLocal()
+void DeltaMergeStore::restoreStableFilesFromLocal() const
 {
     DMFile::ListOptions options;
     options.only_list_can_gc = false; // We need all files to restore the bytes on disk
@@ -1711,7 +1711,7 @@ void DeltaMergeStore::restoreStableFilesFromLocal()
     }
 }
 
-void DeltaMergeStore::restoreStableFiles()
+void DeltaMergeStore::restoreStableFiles() const
 {
     LOG_DEBUG(log, "Loading dt files");
 
diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
index 1582195e59d..6f590b475f1 100644
--- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
+++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
@@ -671,8 +671,8 @@ class DeltaMergeStore : private boost::noncopyable
 
     bool handleBackgroundTask(bool heavy);
 
-    void restoreStableFiles();
-    void restoreStableFilesFromLocal();
+    void restoreStableFiles() const;
+    void restoreStableFilesFromLocal() const;
 
     SegmentReadTasks getReadTasksByRanges(DMContext & dm_context,
                                           const RowKeyRanges & sorted_ranges,
diff --git a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
index fd910a9a6ba..db596221a02 100644
--- a/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
+++ b/dbms/src/Storages/DeltaMerge/Filter/PushDownFilter.h
@@ -29,14 +29,16 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
 public:
     PushDownFilter(const RSOperatorPtr & rs_operator_,
                    const ExpressionActionsPtr & beofre_where_,
-                   const ColumnDefines & filter_columns_,
+                   const ExpressionActionsPtr & project_after_where_,
+                   const ColumnDefinesPtr & filter_columns_,
                    const String filter_column_name_,
                    const ExpressionActionsPtr & extra_cast_,
                    const ColumnDefinesPtr & columns_after_cast_)
         : rs_operator(rs_operator_)
         , before_where(beofre_where_)
+        , project_after_where(project_after_where_)
         , filter_column_name(std::move(filter_column_name_))
-        , filter_columns(std::move(filter_columns_))
+        , filter_columns(filter_columns_)
         , extra_cast(extra_cast_)
         , columns_after_cast(columns_after_cast_)
     {}
@@ -46,17 +48,21 @@ class PushDownFilter : public std::enable_shared_from_this<PushDownFilter>
     {}
 
     // Rough set operator
-    RSOperatorPtr rs_operator;
+    const RSOperatorPtr rs_operator;
     // Filter expression actions and the name of the tmp filter column
     // Used construct the FilterBlockInputStream
-    ExpressionActionsPtr before_where;
-    String filter_column_name;
+    const ExpressionActionsPtr before_where;
+    // The projection after the filter, used to remove the tmp filter column
+    // Used to construct the ExpressionBlockInputStream
+    // Note: ususally we will remove the tmp filter column in the LateMaterializationBlockInputStream, this only used for unexpected cases
+    const ExpressionActionsPtr project_after_where;
+    const String filter_column_name;
     // The columns needed by the filter expression
-    ColumnDefines filter_columns;
+    const ColumnDefinesPtr filter_columns;
     // The expression actions used to cast the timestamp/datetime column
-    ExpressionActionsPtr extra_cast;
+    const ExpressionActionsPtr extra_cast;
     // If the extra_cast is not null, the types of the columns may be changed
-    ColumnDefinesPtr columns_after_cast;
+    const ColumnDefinesPtr columns_after_cast;
 };
 
 } // namespace DB::DM
diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp
index 7c3e97d7049..cf3eec371e7 100644
--- a/dbms/src/Storages/DeltaMerge/Segment.cpp
+++ b/dbms/src/Storages/DeltaMerge/Segment.cpp
@@ -2736,7 +2736,7 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
     const auto & filter_columns = filter->filter_columns;
     SkippableBlockInputStreamPtr filter_column_stable_stream = segment_snap->stable->getInputStream(
         dm_context,
-        filter_columns,
+        *filter_columns,
         data_ranges,
         filter->rs_operator,
         max_version,
@@ -2744,16 +2744,36 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
         enable_handle_clean_read,
         is_fast_scan,
         enable_del_clean_read);
-
-    auto filter_columns_to_read_ptr = std::make_shared<ColumnDefines>(filter_columns);
     SkippableBlockInputStreamPtr filter_column_delta_stream = std::make_shared<DeltaValueInputStream>(
         dm_context,
         segment_snap->delta,
-        filter_columns_to_read_ptr,
+        filter_columns,
         this->rowkey_range);
 
+    if (unlikely(filter_columns->size() == columns_to_read.size()))
+    {
+        LOG_ERROR(log, "Late materialization filter columns size equal to read columns size, which is not expected.");
+        BlockInputStreamPtr stream = std::make_shared<BitmapFilterBlockInputStream>(
+            *filter_columns,
+            filter_column_stable_stream,
+            filter_column_delta_stream,
+            segment_snap->stable->getDMFilesRows(),
+            bitmap_filter,
+            dm_context.tracing_id);
+        if (filter->extra_cast)
+        {
+            stream = std::make_shared<ExpressionBlockInputStream>(stream, filter->extra_cast, dm_context.tracing_id);
+            stream->setExtraInfo("cast after tableScan");
+        }
+        stream = std::make_shared<FilterBlockInputStream>(stream, filter->before_where, filter->filter_column_name, dm_context.tracing_id);
+        stream->setExtraInfo("push down filter");
+        stream = std::make_shared<ExpressionBlockInputStream>(stream, filter->project_after_where, dm_context.tracing_id);
+        stream->setExtraInfo("project after where");
+        return stream;
+    }
+
     BlockInputStreamPtr filter_column_stream = std::make_shared<RowKeyOrderedBlockInputStream>(
-        filter_columns,
+        *filter_columns,
         filter_column_stable_stream,
         filter_column_delta_stream,
         segment_snap->stable->getDMFilesRows(),
@@ -2769,24 +2789,19 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
     // construct filter stream
     filter_column_stream = std::make_shared<FilterBlockInputStream>(filter_column_stream, filter->before_where, filter->filter_column_name, dm_context.tracing_id);
     filter_column_stream->setExtraInfo("push down filter");
-    if (filter_columns.size() == columns_to_read.size())
-    {
-        LOG_ERROR(log, "Late materialization filter columns size equal to read columns size, which is not expected.");
-        // no need to read columns again
-        return filter_column_stream;
-    }
 
-    ColumnDefines rest_columns_to_read{columns_to_read};
+    auto rest_columns_to_read = std::make_shared<ColumnDefines>(columns_to_read);
     // remove columns of pushed down filter
-    for (const auto & col : filter_columns)
+    for (const auto & col : *filter_columns)
     {
-        rest_columns_to_read.erase(std::remove_if(rest_columns_to_read.begin(), rest_columns_to_read.end(), [&](const ColumnDefine & c) { return c.id == col.id; }), rest_columns_to_read.end());
+        rest_columns_to_read->erase(std::remove_if(rest_columns_to_read->begin(), rest_columns_to_read->end(), [&](const ColumnDefine & c) { return c.id == col.id; }),
+                                    rest_columns_to_read->end());
     }
 
     // construct rest column stream
     SkippableBlockInputStreamPtr rest_column_stable_stream = segment_snap->stable->getInputStream(
         dm_context,
-        rest_columns_to_read,
+        *rest_columns_to_read,
         data_ranges,
         filter->rs_operator,
         max_version,
@@ -2794,16 +2809,13 @@ BlockInputStreamPtr Segment::getLateMaterializationStream(BitmapFilterPtr && bit
         enable_handle_clean_read,
         is_fast_scan,
         enable_del_clean_read);
-
-    auto rest_columns_to_read_ptr = std::make_shared<ColumnDefines>(rest_columns_to_read);
     SkippableBlockInputStreamPtr rest_column_delta_stream = std::make_shared<DeltaValueInputStream>(
         dm_context,
         segment_snap->delta,
-        rest_columns_to_read_ptr,
+        rest_columns_to_read,
         this->rowkey_range);
-
     SkippableBlockInputStreamPtr rest_column_stream = std::make_shared<RowKeyOrderedBlockInputStream>(
-        rest_columns_to_read,
+        *rest_columns_to_read,
         rest_column_stable_stream,
         rest_column_delta_stream,
         segment_snap->stable->getDMFilesRows(),
diff --git a/dbms/src/Storages/StorageDeltaMerge.cpp b/dbms/src/Storages/StorageDeltaMerge.cpp
index 30778bdbf5c..11029eba0df 100644
--- a/dbms/src/Storages/StorageDeltaMerge.cpp
+++ b/dbms/src/Storages/StorageDeltaMerge.cpp
@@ -706,32 +706,30 @@ DM::RowKeyRanges StorageDeltaMerge::parseMvccQueryInfo(
     return ranges;
 }
 
-DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator(const SelectQueryInfo & query_info,
+DM::RSOperatorPtr StorageDeltaMerge::buildRSOperator(const std::unique_ptr<DAGQueryInfo> & dag_query,
                                                      const ColumnDefines & columns_to_read,
                                                      const Context & context,
                                                      const LoggerPtr & tracing_logger)
 {
+    RUNTIME_CHECK(dag_query != nullptr);
     // build rough set operator
     DM::RSOperatorPtr rs_operator = DM::EMPTY_RS_OPERATOR;
     const bool enable_rs_filter = context.getSettingsRef().dt_enable_rough_set_filter;
-    if (enable_rs_filter)
+    if (likely(enable_rs_filter))
     {
-        if (likely(query_info.dag_query))
-        {
-            /// Query from TiDB / TiSpark
-            auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr {
-                const ColumnDefines & defines = this->getAndMaybeInitStore()->getTableColumns();
-                auto iter = std::find_if(
-                    defines.begin(),
-                    defines.end(),
-                    [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; });
-                if (iter != defines.end())
-                    return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type};
-                // Maybe throw an exception? Or check if `type` is nullptr before creating filter?
-                return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
-            };
-            rs_operator = FilterParser::parseDAGQuery(*query_info.dag_query, columns_to_read, std::move(create_attr_by_column_id), log);
-        }
+        /// Query from TiDB / TiSpark
+        auto create_attr_by_column_id = [this](ColumnID column_id) -> Attr {
+            const ColumnDefines & defines = this->getAndMaybeInitStore()->getTableColumns();
+            auto iter = std::find_if(
+                defines.begin(),
+                defines.end(),
+                [column_id](const ColumnDefine & d) -> bool { return d.id == column_id; });
+            if (iter != defines.end())
+                return Attr{.col_name = iter->name, .col_id = iter->id, .type = iter->type};
+            // Maybe throw an exception? Or check if `type` is nullptr before creating filter?
+            return Attr{.col_name = "", .col_id = column_id, .type = DataTypePtr{}};
+        };
+        rs_operator = FilterParser::parseDAGQuery(*dag_query, columns_to_read, std::move(create_attr_by_column_id), log);
         if (likely(rs_operator != DM::EMPTY_RS_OPERATOR))
             LOG_DEBUG(tracing_logger, "Rough set filter: {}", rs_operator->toDebugString());
     }
@@ -748,108 +746,105 @@ DM::PushDownFilterPtr StorageDeltaMerge::buildPushDownFilter(const RSOperatorPtr
                                                              const Context & context,
                                                              const LoggerPtr & tracing_logger)
 {
-    if (!pushed_down_filters.empty())
+    if (pushed_down_filters.empty())
     {
-        // Note: table_scan_column_info is a light copy of column_info from TiDB, so some attributes are missing, like name.
-        std::unordered_map<ColumnID, ColumnDefine> columns_to_read_map;
-        for (const auto & column : columns_to_read)
-            columns_to_read_map.emplace(column.id, column);
+        LOG_DEBUG(tracing_logger, "Push down filter is empty");
+        return std::make_shared<PushDownFilter>(rs_operator);
+    }
+    std::unordered_map<ColumnID, ColumnDefine> columns_to_read_map;
+    for (const auto & column : columns_to_read)
+        columns_to_read_map.emplace(column.id, column);
 
-        // The source_columns_of_analyzer should be the same as the size of table_scan_column_info
-        // The columns_to_read is a subset of table_scan_column_info, when there are generated columns and extra table id column.
-        NamesAndTypes source_columns_of_analyzer;
-        source_columns_of_analyzer.reserve(table_scan_column_info.size());
-        for (size_t i = 0; i < table_scan_column_info.size(); ++i)
+    // Get the columns of the filter, is a subset of columns_to_read
+    std::unordered_set<ColumnID> filter_col_id_set;
+    for (const auto & expr : pushed_down_filters)
+    {
+        getColumnIDsFromExpr(expr, table_scan_column_info, filter_col_id_set);
+    }
+    auto filter_columns = std::make_shared<DM::ColumnDefines>();
+    filter_columns->reserve(filter_col_id_set.size());
+    for (const auto & cid : filter_col_id_set)
+    {
+        RUNTIME_CHECK_MSG(columns_to_read_map.contains(cid), "Filter ColumnID({}) not found in columns_to_read_map", cid);
+        filter_columns->emplace_back(columns_to_read_map.at(cid));
+    }
+
+    // The source_columns_of_analyzer should be the same as the size of table_scan_column_info
+    // The columns_to_read is a subset of table_scan_column_info, when there are generated columns and extra table id column.
+    NamesAndTypes source_columns_of_analyzer;
+    source_columns_of_analyzer.reserve(table_scan_column_info.size());
+    for (size_t i = 0; i < table_scan_column_info.size(); ++i)
+    {
+        auto const & ci = table_scan_column_info[i];
+        const auto cid = ci.id;
+        if (ci.hasGeneratedColumnFlag())
         {
-            auto const & ci = table_scan_column_info[i];
-            const auto cid = ci.id;
-            if (ci.hasGeneratedColumnFlag())
-            {
-                const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
-                const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
-                source_columns_of_analyzer.emplace_back(col_name, data_type);
-                continue;
-            }
-            if (cid == EXTRA_TABLE_ID_COLUMN_ID)
-            {
-                source_columns_of_analyzer.emplace_back(EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE);
-                continue;
-            }
-            RUNTIME_CHECK_MSG(columns_to_read_map.contains(cid), "ColumnID({}) not found in columns_to_read_map", cid);
-            source_columns_of_analyzer.emplace_back(columns_to_read_map.at(cid).name, columns_to_read_map.at(cid).type);
+            const auto & col_name = GeneratedColumnPlaceholderBlockInputStream::getColumnName(i);
+            const auto & data_type = getDataTypeByColumnInfoForComputingLayer(ci);
+            source_columns_of_analyzer.emplace_back(col_name, data_type);
+            continue;
         }
-        // Get the columns of the filter, is a subset of columns_to_read
-        std::unordered_set<ColumnID> filter_col_id_set;
-        for (const auto & expr : pushed_down_filters)
+        if (cid == EXTRA_TABLE_ID_COLUMN_ID)
         {
-            getColumnIDsFromExpr(expr, table_scan_column_info, filter_col_id_set);
+            source_columns_of_analyzer.emplace_back(EXTRA_TABLE_ID_COLUMN_NAME, EXTRA_TABLE_ID_COLUMN_TYPE);
+            continue;
         }
-        ColumnDefines filter_columns;
-        filter_columns.reserve(filter_col_id_set.size());
-        for (const auto & id : filter_col_id_set)
+        RUNTIME_CHECK_MSG(columns_to_read_map.contains(cid), "ColumnID({}) not found in columns_to_read_map", cid);
+        source_columns_of_analyzer.emplace_back(columns_to_read_map.at(cid).name, columns_to_read_map.at(cid).type);
+    }
+    std::unique_ptr<DAGExpressionAnalyzer> analyzer = std::make_unique<DAGExpressionAnalyzer>(source_columns_of_analyzer, context);
+
+    // Build the extra cast
+    ExpressionActionsPtr extra_cast = nullptr;
+    // need_cast_column should be the same size as table_scan_column_info and source_columns_of_analyzer
+    std::vector<UInt8> may_need_add_cast_column;
+    may_need_add_cast_column.reserve(table_scan_column_info.size());
+    for (const auto & col : table_scan_column_info)
+        may_need_add_cast_column.push_back(!col.hasGeneratedColumnFlag() && filter_col_id_set.contains(col.id) && col.id != -1);
+    ExpressionActionsChain chain;
+    auto & step = analyzer->initAndGetLastStep(chain);
+    auto & actions = step.actions;
+    if (auto [has_cast, casted_columns] = analyzer->buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); has_cast)
+    {
+        NamesWithAliases project_cols;
+        for (size_t i = 0; i < columns_to_read.size(); ++i)
         {
-            auto iter = std::find_if(
-                columns_to_read.begin(),
-                columns_to_read.end(),
-                [&id](const ColumnDefine & d) -> bool { return d.id == id; });
-            RUNTIME_CHECK(iter != columns_to_read.end());
-            filter_columns.push_back(*iter);
+            if (filter_col_id_set.contains(columns_to_read[i].id))
+                project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
         }
+        actions->add(ExpressionAction::project(project_cols));
 
-        // need_cast_column should be the same size as table_scan_column_info and source_columns_of_analyzer
-        std::vector<UInt8> may_need_add_cast_column;
-        may_need_add_cast_column.reserve(table_scan_column_info.size());
-        for (const auto & col : table_scan_column_info)
-            may_need_add_cast_column.push_back(!col.hasGeneratedColumnFlag() && filter_col_id_set.contains(col.id) && col.id != -1);
-
-        std::unique_ptr<DAGExpressionAnalyzer> analyzer = std::make_unique<DAGExpressionAnalyzer>(source_columns_of_analyzer, context);
-        ExpressionActionsChain chain;
-        auto & step = analyzer->initAndGetLastStep(chain);
-        auto & actions = step.actions;
-        ExpressionActionsPtr extra_cast = nullptr;
-        if (auto [has_cast, casted_columns] = analyzer->buildExtraCastsAfterTS(actions, may_need_add_cast_column, table_scan_column_info); has_cast)
-        {
-            NamesWithAliases project_cols;
-            for (size_t i = 0; i < columns_to_read.size(); ++i)
-            {
-                if (filter_col_id_set.contains(columns_to_read[i].id))
-                    project_cols.emplace_back(casted_columns[i], columns_to_read[i].name);
-            }
-            actions->add(ExpressionAction::project(project_cols));
-
-            for (auto & col : filter_columns)
-                step.required_output.push_back(col.name);
+        for (const auto & col : *filter_columns)
+            step.required_output.push_back(col.name);
 
-            extra_cast = chain.getLastActions();
-            assert(extra_cast);
-            chain.finalize();
-            chain.clear();
-            LOG_DEBUG(tracing_logger, "Extra cast: {}", extra_cast->dumpActions());
-        }
+        extra_cast = chain.getLastActions();
+        chain.finalize();
+        chain.clear();
+        LOG_DEBUG(tracing_logger, "Extra cast for filter columns: {}", extra_cast->dumpActions());
+    }
 
-        // build filter expression actions
-        auto [before_where, filter_column_name, _] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer);
-        LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());
+    // build filter expression actions
+    auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(pushed_down_filters, *analyzer);
+    LOG_DEBUG(tracing_logger, "Push down filter: {}", before_where->dumpActions());
 
-        auto columns_after_cast = std::make_shared<ColumnDefines>();
-        if (extra_cast != nullptr)
+    // record current column defines
+    auto columns_after_cast = std::make_shared<ColumnDefines>();
+    if (extra_cast != nullptr)
+    {
+        columns_after_cast->reserve(columns_to_read.size());
+        const auto & current_names_and_types = analyzer->getCurrentInputColumns();
+        for (size_t i = 0; i < table_scan_column_info.size(); ++i)
         {
-            columns_after_cast->reserve(columns_to_read.size());
-            const auto & current_names_and_types = analyzer->getCurrentInputColumns();
-            for (size_t i = 0; i < table_scan_column_info.size(); ++i)
-            {
-                if (table_scan_column_info[i].hasGeneratedColumnFlag() || table_scan_column_info[i].id == EXTRA_TABLE_ID_COLUMN_ID)
-                    continue;
-                auto col = columns_to_read_map.at(table_scan_column_info[i].id);
-                RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name);
-                columns_after_cast->push_back(col);
-                columns_after_cast->back().type = current_names_and_types[i].type;
-            }
+            if (table_scan_column_info[i].hasGeneratedColumnFlag() || table_scan_column_info[i].id == EXTRA_TABLE_ID_COLUMN_ID)
+                continue;
+            auto col = columns_to_read_map.at(table_scan_column_info[i].id);
+            RUNTIME_CHECK_MSG(col.name == current_names_and_types[i].name, "Column name mismatch, expect: {}, actual: {}", col.name, current_names_and_types[i].name);
+            columns_after_cast->push_back(col);
+            columns_after_cast->back().type = current_names_and_types[i].type;
         }
-        return std::make_shared<PushDownFilter>(rs_operator, before_where, filter_columns, filter_column_name, extra_cast, columns_after_cast);
     }
-    LOG_DEBUG(tracing_logger, "Push down filter is empty");
-    return std::make_shared<PushDownFilter>(rs_operator);
+
+    return std::make_shared<PushDownFilter>(rs_operator, before_where, project_after_where, filter_columns, filter_column_name, extra_cast, columns_after_cast);
 }
 
 DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter(const SelectQueryInfo & query_info,
@@ -857,14 +852,21 @@ DM::PushDownFilterPtr StorageDeltaMerge::parsePushDownFilter(const SelectQueryIn
                                                              const Context & context,
                                                              const LoggerPtr & tracing_logger)
 {
-    // build rough set operator
-    DM::RSOperatorPtr rs_operator = buildRSOperator(query_info, columns_to_read, context, tracing_logger);
+    const auto & dag_query = query_info.dag_query;
+    if (unlikely(dag_query == nullptr))
+        return EMPTY_FILTER;
 
+    // build rough set operator
+    const DM::RSOperatorPtr rs_operator = buildRSOperator(dag_query, columns_to_read, context, tracing_logger);
     // build push down filter
-    if (query_info.dag_query == nullptr)
-        return std::make_shared<PushDownFilter>(rs_operator);
-    const auto & pushed_down_filters = query_info.dag_query->pushed_down_filters;
-    const auto & columns_to_read_info = query_info.dag_query->source_columns;
+    const auto & columns_to_read_info = dag_query->source_columns;
+    const auto & pushed_down_filters = dag_query->pushed_down_filters;
+    if (unlikely(context.getSettingsRef().force_push_down_all_filters_to_scan) && !dag_query->filters.empty())
+    {
+        google::protobuf::RepeatedPtrField<tipb::Expr> merged_filters{pushed_down_filters.begin(), pushed_down_filters.end()};
+        merged_filters.MergeFrom(dag_query->filters);
+        return buildPushDownFilter(rs_operator, columns_to_read_info, merged_filters, columns_to_read, context, tracing_logger);
+    }
     return buildPushDownFilter(rs_operator, columns_to_read_info, pushed_down_filters, columns_to_read, context, tracing_logger);
 }
 
diff --git a/dbms/src/Storages/StorageDeltaMerge.h b/dbms/src/Storages/StorageDeltaMerge.h
index 485a134317b..c9f4f318e33 100644
--- a/dbms/src/Storages/StorageDeltaMerge.h
+++ b/dbms/src/Storages/StorageDeltaMerge.h
@@ -235,7 +235,7 @@ class StorageDeltaMerge
     bool dataDirExist();
     void shutdownImpl();
 
-    DM::RSOperatorPtr buildRSOperator(const SelectQueryInfo & query_info,
+    DM::RSOperatorPtr buildRSOperator(const std::unique_ptr<DAGQueryInfo> & dag_query,
                                       const DM::ColumnDefines & columns_to_read,
                                       const Context & context,
                                       const LoggerPtr & tracing_logger);
diff --git a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp
index 65c4f7fd1cb..db462f12434 100644
--- a/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp
+++ b/dbms/src/Storages/tests/gtests_parse_push_down_filter.cpp
@@ -126,7 +126,7 @@ try
     {
         // Equal between col and literal
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -140,13 +140,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Greater between col and literal
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 > 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -160,13 +160,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 2);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // GreaterEqual between col and literal
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 >= 667", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater_equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -180,13 +180,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 2);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Less between col and literal
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 < 777", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "less");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -200,13 +200,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // LessEqual between col and literal
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 <= 776", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "less_equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -220,7 +220,7 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 }
 CATCH
@@ -240,7 +240,7 @@ try
     {
         // Equal between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 667 = col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -254,13 +254,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // NotEqual between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 667 != col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "not_equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -274,13 +274,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Greater between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 667 < col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -294,13 +294,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // GreaterEqual between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 667 <= col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater_equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -314,13 +314,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 2);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Less between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 777 > col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "less");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -334,13 +334,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // LessEqual between literal and col (take care of direction)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where 777 >= col_2", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "less_equal");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -354,7 +354,7 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 }
 CATCH
@@ -376,7 +376,7 @@ try
     {
         // Not
         auto filter = generatePushDownFilter(table_info_json, "select col_1, col_2 from default.t_111 where NOT col_2=666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "not");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -392,13 +392,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // And
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_1 = 'test1' and col_2 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "and");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -415,13 +415,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // OR
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 = 789 or col_2 = 777", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "or");
         EXPECT_EQ(rs_operator->getAttrs().size(), 2);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -439,14 +439,14 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 0);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     // More complicated
     {
         // And with "not supported"
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_1 = 'test1' and not col_2 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "and");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -463,13 +463,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // And with not
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 = 789 and not col_3 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "and");
         EXPECT_EQ(rs_operator->getAttrs().size(), 2);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -487,13 +487,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 0);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // And with or
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 = 789 and (col_3 = 666 or col_3 = 678)", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "and");
         EXPECT_EQ(rs_operator->getAttrs().size(), 3);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -513,13 +513,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 1);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // Or with "not supported"
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_1 = 'test1' or col_2 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "or");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -536,13 +536,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 2);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // Or with not
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_1 = 'test1' or not col_2 = 666", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "or");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_2");
@@ -559,13 +559,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 2);
+        EXPECT_EQ(filter->filter_columns->size(), 2);
     }
 
     {
         // And between col and literal (not supported since And only support when child is ColumnExpr)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 and 1", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "and");
         EXPECT_EQ(rs_operator->toDebugString(), "{\"op\":\"and\",\"children\":[{\"op\":\"unsupported\",\"reason\":\"child of logical and is not function\",\"content\":\"tp: ColumnRef val: \"\\200\\000\\000\\000\\000\\000\\000\\001\" field_type { tp: 8 flag: 4097 flen: 0 decimal: 0 collate: 0 }\",\"is_not\":\"0\"},{\"op\":\"unsupported\",\"reason\":\"child of logical and is not function\",\"content\":\"tp: Uint64 val: \"\\000\\000\\000\\000\\000\\000\\000\\001\" field_type { tp: 1 flag: 4129 flen: 0 decimal: 0 collate: 0 }\",\"is_not\":\"0\"}]}");
 
@@ -578,14 +578,14 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 6);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     std::cout << " do query select * from default.t_111 where col_2 or 1 " << std::endl;
     {
         // Or between col and literal (not supported since Or only support when child is ColumnExpr)
         auto filter = generatePushDownFilter(table_info_json, "select * from default.t_111 where col_2 or 1", default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "or");
         EXPECT_EQ(rs_operator->toDebugString(), "{\"op\":\"or\",\"children\":[{\"op\":\"unsupported\",\"reason\":\"child of logical operator is not function\",\"content\":\"tp: ColumnRef val: \"\\200\\000\\000\\000\\000\\000\\000\\001\" field_type { tp: 8 flag: 4097 flen: 0 decimal: 0 collate: 0 }\",\"is_not\":\"0\"},{\"op\":\"unsupported\",\"reason\":\"child of logical operator is not function\",\"content\":\"tp: Uint64 val: \"\\000\\000\\000\\000\\000\\000\\000\\001\" field_type { tp: 1 flag: 4129 flen: 0 decimal: 0 collate: 0 }\",\"is_not\":\"0\"}]}");
 
@@ -597,7 +597,7 @@ try
         EXPECT_EQ(before_where_block.rows(), 8);
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         EXPECT_TRUE(col->isColumnConst()); // always true, so filter column is const column
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     // TODO: add is null and is not null test case
@@ -637,7 +637,7 @@ try
         // converted_time: 0
 
         auto filter = generatePushDownFilter(table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_timestamp");
@@ -654,7 +654,7 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 3);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
@@ -666,7 +666,7 @@ try
         // converted_time: 1802216518491045888
 
         auto filter = generatePushDownFilter(table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_timestamp");
@@ -683,7 +683,7 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
@@ -695,7 +695,7 @@ try
         // converted_time: 0
 
         auto filter = generatePushDownFilter(table_info_json, String("select * from default.t_111 where col_timestamp > cast_string_datetime('") + datetime + String("')"), timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_timestamp");
@@ -712,13 +712,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 7);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Greater between Datetime col and Datetime literal
         auto filter = generatePushDownFilter(table_info_json, String("select * from default.t_111 where col_datetime > cast_string_datetime('") + datetime + String("')"), default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_datetime");
@@ -734,13 +734,13 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 4);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 
     {
         // Greater between Date col and Datetime literal
         auto filter = generatePushDownFilter(table_info_json, String("select * from default.t_111 where col_date > cast_string_datetime('") + datetime + String("')"), default_timezone_info);
-        auto & rs_operator = filter->rs_operator;
+        const auto & rs_operator = filter->rs_operator;
         EXPECT_EQ(rs_operator->name(), "greater");
         EXPECT_EQ(rs_operator->getAttrs().size(), 1);
         EXPECT_EQ(rs_operator->getAttrs()[0].col_name, "col_date");
@@ -756,7 +756,7 @@ try
         auto & col = before_where_block.getByName(filter->filter_column_name).column;
         const auto * concrete_column = typeid_cast<const ColumnUInt8 *>(&(*col));
         EXPECT_EQ(countBytesInFilter(concrete_column->getData()), 3);
-        EXPECT_EQ(filter->filter_columns.size(), 1);
+        EXPECT_EQ(filter->filter_columns->size(), 1);
     }
 }
 CATCH
diff --git a/tests/docker/config/tiflash_dt_force_enable_lm.toml b/tests/docker/config/tiflash_dt_force_enable_lm.toml
new file mode 100644
index 00000000000..c57b52f1a76
--- /dev/null
+++ b/tests/docker/config/tiflash_dt_force_enable_lm.toml
@@ -0,0 +1,53 @@
+# Copyright 2023 PingCAP, Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+tmp_path = "/tmp/tiflash/data/tmp"
+
+path = "/tmp/tiflash/data/db"
+capacity = "10737418240"
+
+mark_cache_size = 5368709120
+minmax_index_cache_size = 5368709120
+tcp_port = 9000
+http_port = 8123
+
+[flash]
+tidb_status_addr = "tidb0:10080"
+service_addr = "0.0.0.0:3930"
+[flash.flash_cluster]
+update_rule_interval = 5
+[flash.proxy]
+addr = "0.0.0.0:20170"
+advertise-addr = "tiflash0:20170"
+data-dir = "/data"
+config = "/proxy.toml"
+log-file = "/log/proxy.log"
+engine-addr = "tiflash0:3930"
+status-addr = "0.0.0.0:20181"
+advertise-status-addr = "tiflash0:20181"
+
+[logger]
+count = 10
+errorlog = "/tmp/tiflash/log/error.log"
+size = "1000M"
+log = "/tmp/tiflash/log/server.log"
+level = "trace"
+
+[raft]
+pd_addr = "pd0:2379"
+ignore_databases = "system,default"
+
+[profiles]
+[profiles.default]
+force_push_down_all_filters_to_scan = 1
diff --git a/tests/docker/tiflash-dt-force-enable-lm.yaml b/tests/docker/tiflash-dt-force-enable-lm.yaml
new file mode 100644
index 00000000000..3da53716dba
--- /dev/null
+++ b/tests/docker/tiflash-dt-force-enable-lm.yaml
@@ -0,0 +1,43 @@
+# Copyright 2023 PingCAP, Ltd.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+version: '2.3'
+
+services:
+  # for tests under fullstack-test directory
+  # (engine DeltaTree)
+  tiflash0:
+    image: hub.pingcap.net/tiflash/tiflash-ci-base
+    volumes:
+      - ./config/tiflash_dt_force_enable_lm.toml:/config.toml:ro
+      - ./data/tiflash:/tmp/tiflash/data
+      - ./log/tiflash:/tmp/tiflash/log
+      - ..:/tests
+      - ../docker/_env.sh:/tests/_env.sh
+      - ./log/tiflash-cluster-manager:/tmp/tiflash/data/tmp
+      - ./config/proxy.toml:/proxy.toml:ro
+      - ./config/cipher-file-256:/cipher-file-256:ro
+      - ./data/proxy:/data
+      - ./log/proxy:/log
+      - ../.build/tiflash:/tiflash
+    entrypoint:
+      - /tiflash/tiflash
+      - server
+      - --config-file
+      - /config.toml
+    restart: on-failure
+    depends_on:
+      - "pd0"
+      - "tikv0"
+
diff --git a/tests/tidb-ci/force_enable_lm b/tests/tidb-ci/force_enable_lm
new file mode 120000
index 00000000000..9ad77ff12a6
--- /dev/null
+++ b/tests/tidb-ci/force_enable_lm
@@ -0,0 +1 @@
+../fullstack-test/expr/
\ No newline at end of file
diff --git a/tests/tidb-ci/run.sh b/tests/tidb-ci/run.sh
index bb7a5f5283d..b411aae7504 100755
--- a/tests/tidb-ci/run.sh
+++ b/tests/tidb-ci/run.sh
@@ -82,3 +82,11 @@ docker-compose -f cluster_disable_new_collation.yaml -f tiflash-dt.yaml exec -T
 
 docker-compose -f cluster_disable_new_collation.yaml -f tiflash-dt.yaml down
 clean_data_log
+
+# run force_enable_lm tests
+docker-compose -f cluster.yaml -f tiflash-dt-force-enable-lm.yaml up -d
+wait_env
+docker-compose -f cluster.yaml -f tiflash-dt-force-enable-lm.yaml exec -T tiflash0 bash -c 'cd /tests ; ./run-test.sh tidb-ci/force_enable_lm'
+
+docker-compose -f cluster.yaml -f tiflash-dt-force-enable-lm.yaml down
+clean_data_log
diff --git a/tests/tidb-ci/tiflash-dt-force-enable-lm.yaml b/tests/tidb-ci/tiflash-dt-force-enable-lm.yaml
new file mode 120000
index 00000000000..e5537d33ad5
--- /dev/null
+++ b/tests/tidb-ci/tiflash-dt-force-enable-lm.yaml
@@ -0,0 +1 @@
+../docker/tiflash-dt-force-enable-lm.yaml
\ No newline at end of file