Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[enhance](paimon) opt count pushdown for paimon and refactor be logic #46911

Open
wants to merge 20 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 8 additions & 35 deletions be/src/vec/exec/format/table/iceberg_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,13 +77,8 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache,
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader)),
_profile(profile),
_state(state),
_params(params),
_range(range),
_kv_cache(kv_cache),
_io_ctx(io_ctx) {
: TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx),
_kv_cache(kv_cache) {
static const char* iceberg_profile = "IcebergProfile";
ADD_TIMER(_profile, iceberg_profile);
_iceberg_profile.num_delete_files =
Expand All @@ -94,31 +89,9 @@ IcebergTableReader::IcebergTableReader(std::unique_ptr<GenericReader> file_forma
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", iceberg_profile);
_iceberg_profile.delete_rows_sort_time =
ADD_CHILD_TIMER(_profile, "DeleteRowsSortTime", iceberg_profile);
if (range.table_format_params.iceberg_params.__isset.row_count) {
_remaining_table_level_row_count = range.table_format_params.iceberg_params.row_count;
} else {
_remaining_table_level_row_count = -1;
}
}

Status IcebergTableReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
// already get rows from be
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

return Status::OK();
}
Status IcebergTableReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
RETURN_IF_ERROR(_expand_block_if_need(block));

// To support iceberg schema evolution. We change the column name in block to
Expand Down Expand Up @@ -161,13 +134,13 @@ Status IcebergTableReader::get_columns(
return _file_format_reader->get_columns(name_to_type, missing_cols);
}

Status IcebergTableReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
Status IcebergTableReader::init_row_filters() {
// We get the count value by doris's be, so we don't need to read the delete file
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count > 0) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _table_level_row_count > 0) {
return Status::OK();
}

const auto& table_desc = range.table_format_params.iceberg_params;
const auto& table_desc = _range.table_format_params.iceberg_params;
const auto& version = table_desc.format_version;
if (version < MIN_SUPPORT_DELETE_FILES_VERSION) {
return Status::OK();
Expand Down Expand Up @@ -545,7 +518,7 @@ Status IcebergParquetReader::init_reader(
_gen_new_colname_to_value_range();
parquet_reader->set_table_to_file_col_map(_table_col_to_file_col);
parquet_reader->iceberg_sanitize(_all_required_col_names);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
RETURN_IF_ERROR(init_row_filters());
return parquet_reader->init_reader(
_all_required_col_names, _not_in_file_col_names, &_new_colname_to_value_range,
conjuncts, tuple_descriptor, row_descriptor, colname_to_slot_id,
Expand Down Expand Up @@ -617,7 +590,7 @@ Status IcebergOrcReader::init_reader(
_gen_file_col_names();
_gen_new_colname_to_value_range();
orc_reader->set_table_col_to_file_col(_table_col_to_file_col);
RETURN_IF_ERROR(init_row_filters(_range, _io_ctx));
RETURN_IF_ERROR(init_row_filters());
return orc_reader->init_reader(&_all_required_col_names, &_new_colname_to_value_range,
conjuncts, false, tuple_descriptor, row_descriptor,
not_single_slot_filter_conjuncts, slot_id_to_filter_conjuncts);
Expand Down
24 changes: 7 additions & 17 deletions be/src/vec/exec/format/table/iceberg_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@
#include "exec/olap_common.h"
#include "runtime/define_primitive_type.h"
#include "runtime/primitive_type.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "table_format_reader.h"
#include "util/runtime_profile.h"
#include "vec/columns/column_dictionary.h"
#include "vec/exec/format/orc/vorc_reader.h"
#include "vec/exec/format/parquet/vparquet_reader.h"
Expand Down Expand Up @@ -80,9 +78,9 @@ class IcebergTableReader : public TableFormatReader {
io::IOContext* io_ctx);
~IcebergTableReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;
Status init_row_filters() final;

Status get_next_block(Block* block, size_t* read_rows, bool* eof) final;
Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;

Status get_columns(std::unordered_map<std::string, TypeDescriptor>* name_to_type,
std::unordered_set<std::string>* missing_cols) final;
Expand Down Expand Up @@ -135,10 +133,6 @@ class IcebergTableReader : public TableFormatReader {
// Remove the added delete columns
Status _shrink_block_if_need(Block* block);

RuntimeProfile* _profile;
RuntimeState* _state;
const TFileScanRangeParams& _params;
const TFileRangeDesc& _range;
// owned by scan node
ShardedKVCache* _kv_cache;
IcebergProfile _iceberg_profile;
Expand All @@ -162,13 +156,9 @@ class IcebergTableReader : public TableFormatReader {
std::vector<std::string> _expand_col_names;
std::vector<ColumnWithTypeAndName> _expand_columns;

io::IOContext* _io_ctx;
bool _has_schema_change = false;
bool _has_iceberg_schema = false;

// the table level row count for optimizing query like:
// select count(*) from table;
int64_t _remaining_table_level_row_count;
Fileformat _file_format = Fileformat::NONE;

const int64_t MIN_SUPPORT_DELETE_FILES_VERSION = 2;
Expand Down Expand Up @@ -212,9 +202,9 @@ class IcebergParquetReader final : public IcebergTableReader {
const std::unordered_map<int, VExprContextSPtrs>* slot_id_to_filter_conjuncts);

Status _read_position_delete_file(const TFileRangeDesc* delete_range,
DeleteFile* position_delete) override;
DeleteFile* position_delete) final;

void set_delete_rows() override {
void set_delete_rows() final {
auto* parquet_reader = (ParquetReader*)(_file_format_reader.get());
parquet_reader->set_delete_rows(&_iceberg_delete_rows);
}
Expand All @@ -223,7 +213,7 @@ class IcebergParquetReader final : public IcebergTableReader {

protected:
std::unique_ptr<GenericReader> _create_equality_reader(
const TFileRangeDesc& delete_desc) override {
const TFileRangeDesc& delete_desc) final {
return ParquetReader::create_unique(
_profile, _params, delete_desc, READ_DELETE_FILE_BATCH_SIZE,
const_cast<cctz::time_zone*>(&_state->timezone_obj()), _io_ctx, _state);
Expand All @@ -234,15 +224,15 @@ class IcebergOrcReader final : public IcebergTableReader {
ENABLE_FACTORY_CREATOR(IcebergOrcReader);

Status _read_position_delete_file(const TFileRangeDesc* delete_range,
DeleteFile* position_delete) override;
DeleteFile* position_delete) final;

IcebergOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, ShardedKVCache* kv_cache, io::IOContext* io_ctx)
: IcebergTableReader(std::move(file_format_reader), profile, state, params, range,
kv_cache, io_ctx) {}

void set_delete_rows() override {
void set_delete_rows() final {
auto* orc_reader = (OrcReader*)_file_format_reader.get();
orc_reader->set_position_delete_rowids(&_iceberg_delete_rows);
}
Expand Down
23 changes: 22 additions & 1 deletion be/src/vec/exec/format/table/paimon_jni_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,9 @@
#include <map>

#include "runtime/descriptors.h"
#include "runtime/runtime_state.h"
#include "runtime/types.h"
#include "vec/core/types.h"

namespace doris {
class RuntimeProfile;
class RuntimeState;
Expand Down Expand Up @@ -64,6 +64,11 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
if (range_params->__isset.serialized_table) {
params["serialized_table"] = range_params->serialized_table;
}
if (range.table_format_params.paimon_params.__isset.row_count) {
_remaining_table_level_row_count = range.table_format_params.paimon_params.row_count;
} else {
_remaining_table_level_row_count = -1;
}

// Used to create paimon option
for (const auto& kv : range.table_format_params.paimon_params.paimon_options) {
Expand All @@ -79,6 +84,22 @@ PaimonJniReader::PaimonJniReader(const std::vector<SlotDescriptor*>& file_slot_d
}

Status PaimonJniReader::get_next_block(Block* block, size_t* read_rows, bool* eof) {
if (_push_down_agg_type == TPushAggOp::type::COUNT && _remaining_table_level_row_count >= 0) {
auto rows = std::min(_remaining_table_level_row_count,
(int64_t)_state->query_options().batch_size);
_remaining_table_level_row_count -= rows;
auto mutate_columns = block->mutate_columns();
for (auto& col : mutate_columns) {
col->resize(rows);
}
block->set_columns(std::move(mutate_columns));
*read_rows = rows;
if (_remaining_table_level_row_count == 0) {
*eof = true;
}

return Status::OK();
}
return _jni_connector->get_next_block(block, read_rows, eof);
}

Expand Down
1 change: 1 addition & 0 deletions be/src/vec/exec/format/table/paimon_jni_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ class PaimonJniReader : public JniReader {

private:
std::unordered_map<std::string, ColumnValueRangeType>* _colname_to_value_range;
int64_t _remaining_table_level_row_count;
};

#include "common/compile_check_end.h"
Expand Down
28 changes: 19 additions & 9 deletions be/src/vec/exec/format/table/paimon_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,16 @@
#include <vector>

#include "common/status.h"
#include "runtime/runtime_state.h"
#include "util/deletion_vector.h"

namespace doris::vectorized {
#include "common/compile_check_begin.h"
PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
RuntimeProfile* profile, const TFileScanRangeParams& params)
: TableFormatReader(std::move(file_format_reader)), _profile(profile), _params(params) {
RuntimeProfile* profile, RuntimeState* state,
const TFileScanRangeParams& params, const TFileRangeDesc& range,
io::IOContext* io_ctx)
: TableFormatReader(std::move(file_format_reader), state, profile, params, range, io_ctx) {
static const char* paimon_profile = "PaimonProfile";
ADD_TIMER(_profile, paimon_profile);
_paimon_profile.num_delete_rows =
Expand All @@ -35,25 +38,28 @@ PaimonReader::PaimonReader(std::unique_ptr<GenericReader> file_format_reader,
ADD_CHILD_TIMER(_profile, "DeleteFileReadTime", paimon_profile);
}

Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) {
const auto& table_desc = range.table_format_params.paimon_params;
Status PaimonReader::init_row_filters() {
const auto& table_desc = _range.table_format_params.paimon_params;
if (!table_desc.__isset.deletion_file) {
return Status::OK();
}

// set push down agg type to NONE because we can not do count push down opt
// if there are delete files.
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
if (!_range.table_format_params.paimon_params.__isset.row_count) {
_file_format_reader->set_push_down_agg_type(TPushAggOp::NONE);
}

const auto& deletion_file = table_desc.deletion_file;
io::FileSystemProperties properties = {
.system_type = _params.file_type,
.properties = _params.properties,
.hdfs_params = _params.hdfs_params,
.broker_addresses {},
};
if (range.__isset.file_type) {
if (_range.__isset.file_type) {
// for compatibility
properties.system_type = range.file_type;
properties.system_type = _range.file_type;
}
if (_params.__isset.broker_addresses) {
properties.broker_addresses.assign(_params.broker_addresses.begin(),
Expand All @@ -64,7 +70,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
.path = deletion_file.path,
.file_size = -1,
.mtime = 0,
.fs_name = range.fs_name,
.fs_name = _range.fs_name,
};

// TODO: cache the file in local
Expand All @@ -78,7 +84,7 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
{
SCOPED_TIMER(_paimon_profile.delete_files_read_time);
RETURN_IF_ERROR(
delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, io_ctx));
delete_file_reader->read_at(deletion_file.offset, result, &bytes_read, _io_ctx));
}
if (bytes_read != deletion_file.length + 4) {
return Status::IOError(
Expand All @@ -99,5 +105,9 @@ Status PaimonReader::init_row_filters(const TFileRangeDesc& range, io::IOContext
}
return Status::OK();
}

Status PaimonReader::get_next_block_inner(Block* block, size_t* read_rows, bool* eof) {
return _file_format_reader->get_next_block(block, read_rows, eof);
}
#include "common/compile_check_end.h"
} // namespace doris::vectorized
26 changes: 14 additions & 12 deletions be/src/vec/exec/format/table/paimon_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,34 +29,35 @@ namespace doris::vectorized {
class PaimonReader : public TableFormatReader {
public:
PaimonReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params);
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx);
~PaimonReader() override = default;

Status init_row_filters(const TFileRangeDesc& range, io::IOContext* io_ctx) final;
Status init_row_filters() final;

Status get_next_block_inner(Block* block, size_t* read_rows, bool* eof) final;

protected:
struct PaimonProfile {
RuntimeProfile::Counter* num_delete_rows;
RuntimeProfile::Counter* delete_files_read_time;
};
std::vector<int64_t> _delete_rows;
RuntimeProfile* _profile;
PaimonProfile _paimon_profile;
virtual void set_delete_rows() = 0;

private:
const TFileScanRangeParams& _params;
virtual void set_delete_rows() = 0;
};

class PaimonOrcReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonOrcReader);
PaimonOrcReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx)
: PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {};
~PaimonOrcReader() final = default;

void set_delete_rows() override {
void set_delete_rows() final {
(reinterpret_cast<OrcReader*>(_file_format_reader.get()))
->set_position_delete_rowids(&_delete_rows);
}
Expand All @@ -66,11 +67,12 @@ class PaimonParquetReader final : public PaimonReader {
public:
ENABLE_FACTORY_CREATOR(PaimonParquetReader);
PaimonParquetReader(std::unique_ptr<GenericReader> file_format_reader, RuntimeProfile* profile,
const TFileScanRangeParams& params)
: PaimonReader(std::move(file_format_reader), profile, params) {};
RuntimeState* state, const TFileScanRangeParams& params,
const TFileRangeDesc& range, io::IOContext* io_ctx)
: PaimonReader(std::move(file_format_reader), profile, state, params, range, io_ctx) {};
~PaimonParquetReader() final = default;

void set_delete_rows() override {
void set_delete_rows() final {
(reinterpret_cast<ParquetReader*>(_file_format_reader.get()))
->set_delete_rows(&_delete_rows);
}
Expand Down
Loading