Skip to content

Commit

Permalink
storage: fix block rows not match when filter column is the first non…
Browse files Browse the repository at this point in the history
…-empty column in the block (#9483) (#9495)

ref #9472

storage: fix block rows not match when the filter column is the first non-empty column in the block

Signed-off-by: ti-chi-bot <[email protected]>
Signed-off-by: Lloyd-Pottiger <[email protected]>

Co-authored-by: Lloyd-Pottiger <[email protected]>
Co-authored-by: Lloyd-Pottiger <[email protected]>
  • Loading branch information
3 people authored Oct 8, 2024
1 parent 04697ee commit 5915c6b
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 23 deletions.
3 changes: 1 addition & 2 deletions dbms/src/Core/Block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -541,10 +541,8 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
return {};

Block res = header.cloneEmpty();
size_t num_rows = blocks.front().rows();
for (const auto & block : blocks)
{
RUNTIME_CHECK_MSG(block.rows() == num_rows, "Cannot hstack blocks with different number of rows");
for (const auto & elem : block)
{
if (likely(res.has(elem.name)))
Expand All @@ -553,6 +551,7 @@ Block hstackBlocks(Blocks && blocks, const Block & header)
}
}
}
res.checkNumberOfRows();

return res;
}
Expand Down
13 changes: 8 additions & 5 deletions dbms/src/Core/Block.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,11 +175,14 @@ using BucketBlocksListMap = std::map<Int32, BlocksList>;
/// Join blocks by columns
/// The schema of the output block is the same as the header block.
/// The columns not in the header block will be ignored.
/// For example:
/// header: (a UInt32, b UInt32, c UInt32, d UInt32)
/// block1: (a UInt32, b UInt32, c UInt32, e UInt32), rows: 3
/// block2: (d UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32, d UInt32), rows: 3
/// NOTE: The input blocks can have columns with different sizes,
/// but the columns in the header block must have the same size,
/// Otherwise, an exception will be thrown.
/// Example:
/// header: (a UInt32, b UInt32, c UInt32, d UInt32)
/// block1: (a UInt32, b UInt32, c UInt32, e UInt32), rows: 3
/// block2: (d UInt32), rows: 3
/// result: (a UInt32, b UInt32, c UInt32, d UInt32), rows: 3
Block hstackBlocks(Blocks && blocks, const Block & header);

/// Join blocks by rows
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,32 @@
namespace DB::DM
{

namespace
{

void filterFilterColumnBlock(
const Block & header,
Block & block,
const IColumn::Filter & filter,
size_t passed_count,
const String & filter_column_name)
{
ColumnPtr filter_column;
for (auto & col : block)
{
if (col.name == filter_column_name)
{
filter_column = col.column;
continue;
}
col.column = col.column->filter(filter, passed_count);
}
if (header.has(filter_column_name))
filter_column = filter_column->filter(filter, passed_count);
}

} // namespace

LateMaterializationBlockInputStream::LateMaterializationBlockInputStream(
const ColumnDefines & columns_to_read,
const String & filter_column_name_,
Expand Down Expand Up @@ -69,10 +95,7 @@ Block LateMaterializationBlockInputStream::readImpl()
{
col.column = col.column->filter(col_filter, passed_count);
}
for (auto & col : filter_column_block)
{
col.column = col.column->filter(col_filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, col_filter, passed_count, filter_column_name);
}
return hstackBlocks({std::move(filter_column_block), std::move(rest_column_block)}, header);
}
Expand Down Expand Up @@ -110,12 +133,7 @@ Block LateMaterializationBlockInputStream::readImpl()
// so only if the number of rows left after filtering out is large enough,
// we can skip some packs of the next block, call readWithFilter to get the next block.
rest_column_block = rest_column_stream->readWithFilter(*filter);
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, *filter, passed_count, filter_column_name);
}
else if (filter_out_count > 0)
{
Expand All @@ -126,12 +144,7 @@ Block LateMaterializationBlockInputStream::readImpl()
{
col.column = col.column->filter(*filter, passed_count);
}
for (auto & col : filter_column_block)
{
if (col.name == filter_column_name)
continue;
col.column = col.column->filter(*filter, passed_count);
}
filterFilterColumnBlock(header, filter_column_block, *filter, passed_count, filter_column_name);
}
else
{
Expand Down

0 comments on commit 5915c6b

Please sign in to comment.