Skip to content

Commit

Permalink
ignore delmark when add minmax for pk column (#4746) (#4760)
Browse files Browse the repository at this point in the history
* ignore delmark when add minmax for pk column

* remove extra line

* fix static analysis

Co-authored-by: lidezhu <[email protected]>
Co-authored-by: lidezhu <[email protected]>
  • Loading branch information
3 people authored Apr 26, 2022
1 parent ef69b08 commit d80443b
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 7 deletions.
16 changes: 11 additions & 5 deletions dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,17 +114,17 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper

auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column;

const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : (const ColumnVector<UInt8> *)del_mark_column.get();
const ColumnVector<UInt8> * del_mark = !del_mark_column ? nullptr : static_cast<const ColumnVector<UInt8> *>(del_mark_column.get());

for (auto & cd : write_columns)
{
auto & col = getByColumnId(block, cd.id).column;
const auto & col = getByColumnId(block, cd.id).column;
writeColumn(cd.id, *cd.type, *col, del_mark);

if (cd.id == VERSION_COLUMN_ID)
stat.first_version = col->get64(0);
else if (cd.id == TAG_COLUMN_ID)
stat.first_tag = (UInt8)(col->get64(0));
stat.first_tag = static_cast<UInt8>(col->get64(0));
}

if (!options.flags.isSingleFile())
Expand Down Expand Up @@ -178,7 +178,9 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
auto & minmax_indexs = single_file_stream->minmax_indexs;
if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end())
{
iter->second->addPack(column, del_mark);
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

auto offset_in_compressed_block = single_file_stream->original_layer.offset();
Expand Down Expand Up @@ -240,7 +242,11 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu
const auto name = DMFile::getFileNameBase(col_id, substream);
auto & stream = column_streams.at(name);
if (stream->minmaxes)
stream->minmaxes->addPack(column, del_mark);
{
// For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index.
// Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row.
stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark);
}

/// There could already be enough data to compress into the new block.
if (stream->compressed_buf->offset() >= options.min_compress_block_size)
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h
Original file line number Diff line number Diff line change
Expand Up @@ -227,9 +227,10 @@ class DMTestEnv
* @param ts_beg `timestamp`'s value begin
* @param ts_end `timestamp`'s value end (not included)
* @param reversed increasing/decreasing insert `timestamp`'s value
* @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1
* @return
*/
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false)
static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false)
{
Block block;
const size_t num_rows = (ts_end - ts_beg);
Expand All @@ -245,7 +246,7 @@ class DMTestEnv
VERSION_COLUMN_ID));
// tag_col
block.insert(DB::tests::createColumn<UInt8>(
std::vector<UInt64>(num_rows, 0),
std::vector<UInt64>(num_rows, deleted ? 1 : 0),
TAG_COLUMN_NAME,
TAG_COLUMN_ID));
return block;
Expand Down
37 changes: 37 additions & 0 deletions dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,43 @@ try
}
CATCH

TEST_F(Segment_test, WriteRead2)
try
{
const size_t num_rows_write = dmContext().stable_pack_rows;
{
// write a block with rows all deleted
Block block = DMTestEnv::prepareBlockWithTso(2, 100, 100 + num_rows_write, false, true);
segment->write(dmContext(), block);
// write not deleted rows with larger pk
Block block2 = DMTestEnv::prepareBlockWithTso(3, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block2);

// flush segment and make sure there is two packs in stable
segment = segment->mergeDelta(dmContext(), tableColumns());
ASSERT_EQ(segment->getStable()->getPacks(), 2);
}

{
Block block = DMTestEnv::prepareBlockWithTso(1, 100, 100 + num_rows_write, false, false);
segment->write(dmContext(), block);
}

{
auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)});
size_t num_rows_read = 0;
in->readPrefix();
while (Block block = in->read())
{
num_rows_read += block.rows();
}
in->readSuffix();
// only write two visible pks
ASSERT_EQ(num_rows_read, 2);
}
}
CATCH

TEST_F(Segment_test, WriteReadMultiRange)
try
{
Expand Down

0 comments on commit d80443b

Please sign in to comment.