diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp index 1ad99630840..97048c0e1bc 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWriter.cpp @@ -158,7 +158,9 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu auto & minmax_indexs = single_file_stream->minmax_indexs; if (auto iter = minmax_indexs.find(stream_name); iter != minmax_indexs.end()) { - iter->second->addPack(column, del_mark); + // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. + // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. + iter->second->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark); } auto offset_in_compressed_block = single_file_stream->original_hashing.offset(); @@ -217,7 +219,11 @@ void DMFileWriter::writeColumn(ColId col_id, const IDataType & type, const IColu const auto name = DMFile::getFileNameBase(col_id, substream); auto & stream = column_streams.at(name); if (stream->minmaxes) - stream->minmaxes->addPack(column, del_mark); + { + // For EXTRA_HANDLE_COLUMN_ID, we ignore del_mark when add minmax index. + // Because we need all rows which satisfy a certain range when place delta index no matter whether the row is a delete row. + stream->minmaxes->addPack(column, col_id == EXTRA_HANDLE_COLUMN_ID ? nullptr : del_mark); + } /// There could already be enough data to compress into the new block. if (stream->original_hashing.offset() >= options.min_compress_block_size) diff --git a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h index 167dc5155ba..d7235fa2214 100644 --- a/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h +++ b/dbms/src/Storages/DeltaMerge/tests/dm_basic_include.h @@ -201,9 +201,10 @@ class DMTestEnv * @param ts_beg `timestamp`'s value begin * @param ts_end `timestamp`'s value end (not included) * @param reversed increasing/decreasing insert `timestamp`'s value + * @param deleted if deleted is false, set `tag` to 0; otherwise set `tag` to 1 * @return */ - static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false) + static Block prepareBlockWithTso(Int64 pk, size_t ts_beg, size_t ts_end, bool reversed = false, bool deleted = false) { Block block; const size_t num_rows = (ts_end - ts_beg); @@ -240,7 +241,7 @@ class DMTestEnv column_data.resize(num_rows); for (size_t i = 0; i < num_rows; ++i) { - column_data[i] = 0; + column_data[i] = deleted ? 1 : 0; } tag_col.column = std::move(m_col); } diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp index df2c29a924e..24180f42baf 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_segment.cpp @@ -212,6 +212,42 @@ try } CATCH +TEST_F(Segment_test, WriteRead2) +try +{ + const size_t num_rows_write = dmContext().stable_pack_rows; + { + // write a block with rows all deleted + Block block = DMTestEnv::prepareBlockWithTso(2, 100, 100 + num_rows_write, false, true); + segment->write(dmContext(), block); + // write not deleted rows with larger pk + Block block2 = DMTestEnv::prepareBlockWithTso(3, 100, 100 + num_rows_write, false, false); + segment->write(dmContext(), block2); + + // flush segment and make sure there is two packs in stable + segment = segment->mergeDelta(dmContext(), tableColumns()); + ASSERT_EQ(segment->getStable()->getPacks(), 2); + } + + { + Block block = DMTestEnv::prepareBlockWithTso(1, 100, 100 + num_rows_write, false, false); + segment->write(dmContext(), block); + } + + { + auto in = segment->getInputStream(dmContext(), *tableColumns(), {RowKeyRange::newAll(false, 1)}); + size_t num_rows_read = 0; + in->readPrefix(); + while (Block block = in->read()) + { + num_rows_read += block.rows(); + } + in->readSuffix(); + // only write two visible pks + ASSERT_EQ(num_rows_read, 2); + } +} +CATCH TEST_F(Segment_test, ReadWithMoreAdvacedDeltaIndex) try