Skip to content

Commit

Permalink
ignore delmark when add minmax for pk column (#4746) (#4758)
Browse files Browse the repository at this point in the history
close #4747
  • Loading branch information
ti-chi-bot authored Jun 21, 2022
1 parent a8d9c66 commit 65d8fd7
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 7 deletions.
16 changes: 11 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,17 +94,17 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper

auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column;

const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : (const ColumnVector<UInt8> *)del_mark_column.get();
const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : static_cast<const ColumnVector<UInt8> *>(del_mark_column.get());

for (auto & cd : write_columns)
{
auto & col = getByColumnId(block, cd.id).column;
const auto & col = getByColumnId(block, cd.id).column;
writeColumn(cd.id, *cd.type, *col, del_mark);

if (cd.id == VERSION_COLUMN_ID)
stat.first_version = col->get64(0);
else if (cd.id == TAG_COLUMN_ID)
stat.first_tag = (UInt8)(col->get64(0));
stat.first_tag = static_cast<UInt8>(col->get64(0));
}

if (!options.flags.isSingleFile())
Expand Down Expand Up @@ -158,7 +158,9 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
auto & minmax_indexs = single_file_stream->minmax_indexs;
if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end())
{
iter->second->addPack(column, del_mark);
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

auto offset_in_compressed_block = single_file_stream->original_layer.offset();
Expand Down Expand Up @@ -217,7 +219,11 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
const auto name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(name);
if (stream->minmaxes)
stream->minmaxes->addPack(column, del_mark);
{
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

/// There could already be enough data to compress into the new block.
if (stream->original_layer.offset() >= options.min_compress_block_size)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -203,9 +203,10 @@ class DMTestEnv
* @param ts_beg `timestamp`'s value begin
* @param ts_end `timestamp`'s value end (not included)
* @param reversed increasing/decreasing insert `timestamp`'s value
* @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1
* @return
*/
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false)
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false)
{
Block block;
const size_t num_rows = (ts_end - ts_beg);
Expand Down Expand Up @@ -242,7 +243,7 @@ class DMTestEnv
column_data.resize(num_rows);
for (size_t i = 0; i < num_rows; ++i)
{
column_data[i] = 0;
column_data[i] = deleted ? 1 : 0;
}
tag_col.column = std::move(m_col);
}
Expand Down
36 changes: 36 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,42 @@ try
}
CATCH

TEST_F(Segment_test, WriteRead2)
try
{
const size_t num_rows_write = dmContext().stable_pack_rows;
{
// write a block with rows all deleted
Block block = DMTestEnv::prepareBlockWithTso(2, 100, 100 + num_rows_write, false, true);
segment->write(dmContext(), block);
// write not deleted rows with larger pk
Block block2 = DMTestEnv::prepareBlockWithTso(3, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block2);

// flush segment and make sure there is two packs in stable
segment = segment->mergeDelta(dmContext(), tableColumns());
ASSERT_EQ(segment->getStable()->getPacks(), 2);
}

{
Block block = DMTestEnv::prepareBlockWithTso(1, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block);
}

{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
}
in->readSuffix();
// only write two visible pks
ASSERT_EQ(num_rows_read, 2);
}
}
CATCH

TEST_F(Segment_test, ReadWithMoreAdvacedDeltaIndex)
try
Expand Down

0 comments on commit 65d8fd7

Please sign in to comment.