From d80443b6f18769dfd36cd947069eeb63b0efde3c Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Tue, 26 Apr 2022 10:18:09 +0800 Subject: [PATCH] ignore delmark when add minmax for pk column (#4746) (#4760) * ignore delmark when add minmax for pk column * remove extra line * fix static analysis Co-authored-by: lidezhu Co-authored-by: lidezhu <47731263+lidezhu@users.noreply.github.com> --- .../Storages/DeltaMerge/File/DMFileWriter.cpp | 16 +++++--- .../DeltaMerge/tests/dm_basic_include.h | 5 ++- .../DeltaMerge/tests/gtest_dm_segment.cpp | 37 +++++++++++++++++++ 3 files changed, 51 insertions(+), 7 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index f1a8b64d3af..cb70204bc33 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -114,17 +114,17 @@ void DMFileWriter::write(const Block & block, const BlockProperty & block_proper auto del_mark_column = tryGetByColumnId(block, TAG_COLUMN_ID).column; - const ColumnVector * del_mark = !del_mark_column ? nullptr : (const ColumnVector *)del_mark_column.get(); + const ColumnVector * del_mark = !del_mark_column ? nullptr : static_cast *>(del_mark_column.get()); for (auto & cd : write_columns) { - auto & col = getByColumnId(block, cd.id).column; + const auto & col = getByColumnId(block, cd.id).column; writeColumn(cd.id, *cd.type, *col, del_mark); if (cd.id == VERSION_COLUMN_ID) stat.first_version = col->get64(0); else if (cd.id == TAG_COLUMN_ID) - stat.first_tag = (UInt8)(col->get64(0)); + stat.first_tag = static_cast(col->get64(0)); } if (!options.flags.isSingleFile()) @@ -178,7 +178,9 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu auto & minmax_indexs = single_file_stream->minmax_indexs; if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end()) { - iter->second->addPack(column, del_mark); + // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. + // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. + iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark); } auto offset_in_compressed_block = single_file_stream->original_layer.offset(); @@ -240,7 +242,11 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu const auto name = DMFile::getFileNameBase(col_id, substream); auto & stream = column_streams.at(name); if (stream->minmaxes) - stream->minmaxes->addPack(column, del_mark); + { + // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. + // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. + stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark); + } /// There could already be enough data to compress into the new block. if (stream->compressed_buf->offset() >= options.min_compress_block_size) diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 31e9eb8cb56..a71ee0c7dc4 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -227,9 +227,10 @@ class DMTestEnv * @param ts_beg `timestamp`'s value begin * @param ts_end `timestamp`'s value end (not included) * @param reversed increasing/decreasing insert `timestamp`'s value + * @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1 * @return */ - static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false) + static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false) { Block block; const size_t num_rows = (ts_end - ts_beg); @@ -245,7 +246,7 @@ class DMTestEnv VERSION_COLUMN_ID)); // tag_col block.insert(DB::tests::createColumn( - std::vector(num_rows, 0), + std::vector(num_rows, deleted ? 1 : 0), TAG_COLUMN_NAME, TAG_COLUMN_ID)); return block; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index 57ea73d0512..682710f738d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -198,6 +198,43 @@ try } CATCH +TEST_F(Segment_test, WriteRead2) +try +{ + const size_t num_rows_write = dmContext().stable_pack_rows; + { + // write a block with rows all deleted + Block block = DMTestEnv::prepareBlockWithTso(2, 100, 100 + num_rows_write, false, true); + segment->write(dmContext(), block); + // write not deleted rows with larger pk + Block block2 = DMTestEnv::prepareBlockWithTso(3, 100, 100 + num_rows_write, false, false); + segment->write(dmContext(), block2); + + // flush segment and make sure there is two packs in stable + segment = segment->mergeDelta(dmContext(), tableColumns()); + ASSERT_EQ(segment->getStable()->getPacks(), 2); + } + + { + Block block = DMTestEnv::prepareBlockWithTso(1, 100, 100 + num_rows_write, false, false); + segment->write(dmContext(), block); + } + + { + auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + // only write two visible pks + ASSERT_EQ(num_rows_read, 2); + } +} +CATCH + TEST_F(Segment_test, WriteReadMultiRange) try {