diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index d936920e422..59785233f2b 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -37,6 +37,8 @@ using LoggerPtr = std::shared_ptr; namespace DM { +class DMFile; +using DMFilePtr = std::shared_ptr; class Segment; using SegmentPtr = std::shared_ptr; using SegmentPair = std::pair; @@ -495,6 +497,24 @@ class DeltaMergeStore : private boost::noncopyable MergeDeltaReason reason, SegmentSnapshotPtr segment_snap = nullptr); + /** + * Discard all data in the segment, and use the specified DMFile as the stable instead. + * The specified DMFile is safe to be shared for multiple segments. + * + * Note 1: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same + * stable file for multiple segments. It is your own duty to enable GC later. + * + * Note 2: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written + * to the PageStorage's data. Otherwise there will be exceptions. + * + * Note 3: This API is subjected to be changed in future, as it relies on the knowledge that all current data + * in this segment is useless, which is a pretty tough requirement. + */ + SegmentPtr segmentDangerouslyReplaceData( + DMContext & dm_context, + const SegmentPtr & segment, + const DMFilePtr & data_file); + // isSegmentValid should be protected by lock on `read_write_mutex` inline bool isSegmentValid(const std::shared_lock &, const SegmentPtr & segment) { diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index 1a0da119a7c..b16667ecd90 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -487,6 +487,48 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta( return new_segment; } +SegmentPtr DeltaMergeStore::segmentDangerouslyReplaceData( + DMContext & dm_context, + const SegmentPtr & segment, + const DMFilePtr & data_file) +{ + LOG_FMT_INFO(log, "ReplaceData - Begin, segment={} data_file={}", segment->info(), data_file->path()); + + WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter()); + + SegmentPtr new_segment; + { + std::unique_lock lock(read_write_mutex); + if (!isSegmentValid(lock, segment)) + { + LOG_FMT_DEBUG(log, "ReplaceData - Give up segment replace data because segment not valid, segment={} data_file={}", segment->simpleInfo(), data_file->path()); + return {}; + } + + auto segment_lock = segment->mustGetUpdateLock(); + new_segment = segment->dangerouslyReplaceData(segment_lock, dm_context, data_file, wbs); + + RUNTIME_CHECK(compare(segment->getRowKeyRange().getEnd(), new_segment->getRowKeyRange().getEnd()) == 0, segment->info(), new_segment->info()); + RUNTIME_CHECK(segment->segmentId() == new_segment->segmentId(), segment->info(), new_segment->info()); + + wbs.writeLogAndData(); + wbs.writeMeta(); + + segment->abandon(dm_context); + segments[segment->getRowKeyRange().getEnd()] = new_segment; + id_to_segment[segment->segmentId()] = new_segment; + + LOG_FMT_INFO(log, "ReplaceData - Finish, old_segment={} new_segment={}", segment->info(), new_segment->info()); + } + + wbs.writeRemoves(); + + if constexpr (DM_RUN_CHECK) + check(dm_context.db_context); + + return new_segment; +} + bool DeltaMergeStore::doIsSegmentValid(const SegmentPtr & segment) { if (segment->hasAbandoned()) diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index 48d4071d595..eb34d31feb9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -268,11 +268,16 @@ class DMFile : private boost::noncopyable bool isColumnExist(ColId col_id) const { return column_stats.find(col_id) != column_stats.end(); } bool isSingleFileMode() const { return mode == Mode::SINGLE_FILE; } - String toString() const - { - return "{DMFile, packs: " + DB::toString(getPacks()) + ", rows: " + DB::toString(getRows()) + ", bytes: " + DB::toString(getBytes()) - + ", file size: " + DB::toString(getBytesOnDisk()) + "}"; - } + /* + * TODO: This function is currently unused. We could use it when: + * 1. The content is polished (e.g. including at least file ID, and use a format easy for grep). + * 2. Unify the place where we are currently printing out DMFile's `path` or `file_id`. + */ + // String toString() const + // { + // return "{DMFile, packs: " + DB::toString(getPacks()) + ", rows: " + DB::toString(getRows()) + ", bytes: " + DB::toString(getBytes()) + // + ", file size: " + DB::toString(getBytesOnDisk()) + "}"; + // } DMConfigurationOpt & getConfiguration() { return configuration; } diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index ca702dc38f9..86dbec61db0 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -706,6 +706,65 @@ SegmentPtr Segment::applyMergeDelta(const Segment::Lock &, // return new_me; } +SegmentPtr Segment::dangerouslyReplaceDataForTest(DMContext & dm_context, // + const DMFilePtr & data_file) const +{ + WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); + + auto lock = mustGetUpdateLock(); + auto new_segment = dangerouslyReplaceData(lock, dm_context, data_file, wbs); + + wbs.writeAll(); + return new_segment; +} + +SegmentPtr Segment::dangerouslyReplaceData(const Segment::Lock &, // + DMContext & dm_context, + const DMFilePtr & data_file, + WriteBatches & wbs) const +{ + LOG_FMT_DEBUG(log, "ReplaceData - Begin, data_file={}", data_file->path()); + + auto & storage_pool = dm_context.storage_pool; + auto delegate = dm_context.path_pool.getStableDiskDelegator(); + + RUNTIME_CHECK(delegate.getDTFilePath(data_file->fileId()) == data_file->parentPath()); + + // Always create a ref to the file to allow `data_file` being shared. + auto new_page_id = storage_pool.newDataPageIdForDTFile(delegate, __PRETTY_FUNCTION__); + // TODO: We could allow assigning multiple DMFiles in future. + auto ref_file = DMFile::restore( + dm_context.db_context.getFileProvider(), + data_file->fileId(), + new_page_id, + data_file->parentPath(), + DMFile::ReadMetaMode::all()); + wbs.data.putRefPage(new_page_id, data_file->pageId()); + + auto new_stable = std::make_shared(stable->getId()); + new_stable->setFiles({ref_file}, rowkey_range, &dm_context); + new_stable->saveMeta(wbs.meta); + + // Empty new delta + auto new_delta = std::make_shared(delta->getId()); + new_delta->saveMeta(wbs); + + auto new_me = std::make_shared(epoch + 1, // + rowkey_range, + segment_id, + next_segment_id, + new_delta, + new_stable); + new_me->serialize(wbs.meta); + + delta->recordRemoveColumnFilesPages(wbs); + stable->recordRemovePacksPages(wbs); + + LOG_FMT_DEBUG(log, "ReplaceData - Finish, old_me={} new_me={}", info(), new_me->info()); + + return new_me; +} + SegmentPair Segment::split(DMContext & dm_context, const ColumnDefinesPtr & schema_snap, std::optional opt_split_at, SplitMode opt_split_mode) const { WriteBatches wbs(dm_context.storage_pool, dm_context.getWriteLimiter()); diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index 318a5150068..1bda20c8bf4 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -333,6 +333,31 @@ class Segment : private boost::noncopyable WriteBatches & wbs, const StableValueSpacePtr & new_stable) const; + /** + * Only used in tests as a shortcut. + * Normally you should use `dangerouslyReplaceData`. + */ + [[nodiscard]] SegmentPtr dangerouslyReplaceDataForTest(DMContext & dm_context, const DMFilePtr & data_file) const; + + /** + * Discard all data in the current delta and stable layer, and use the specified DMFile as the stable instead. + * This API does not have a prepare & apply pair, as it should be quick enough. The specified DMFile is safe + * to be shared for multiple segments. + * + * Note 1: Should be protected behind the Segment update lock to ensure no new data will be appended to this + * segment during the function call. Otherwise these new data will be lost in the new segment. + * + * Note 2: This function will not enable GC for the new_stable_file for you, in case of you may want to share the same + * stable file for multiple segments. It is your own duty to enable GC later. + * + * Note 3: You must ensure the specified new_stable_file has been managed by the storage pool, and has been written + * to the PageStorage's data. Otherwise there will be exceptions. + * + * Note 4: This API is subjected to be changed in future, as it relies on the knowledge that all current data + * in this segment is useless, which is a pretty tough requirement. + */ + [[nodiscard]] SegmentPtr dangerouslyReplaceData(const Lock &, DMContext & dm_context, const DMFilePtr & data_file, WriteBatches & wbs) const; + [[nodiscard]] SegmentPtr dropNextSegment(WriteBatches & wbs, const RowKeyRange & next_segment_range); /// Flush delta's cache packs. @@ -468,7 +493,7 @@ class Segment : private boost::noncopyable bool relevant_place) const; private: - /// The version of this segment. After split / merge / merge delta, epoch got increased by 1. + /// The version of this segment. After split / merge / mergeDelta / dangerouslyReplaceData, epoch got increased by 1. const UInt64 epoch; RowKeyRange rowkey_range; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp index 814e5443258..7c18b32a795 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment.cpp @@ -15,6 +15,7 @@ #include #include #include +#include #include #include #include @@ -46,6 +47,121 @@ bool shouldCompactStableWithTooMuchDataOutOfSegmentRange(const DMContext & conte } namespace tests { + +class SegmentFrameworkTest : public SegmentTestBasic +{ +}; + +TEST_F(SegmentFrameworkTest, PrepareWriteBlock) +try +{ + reloadWithOptions(SegmentTestOptions{.is_common_handle = false}); + + auto s1_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 10); + ASSERT_TRUE(s1_id.has_value()); + auto s2_id = splitSegmentAt(*s1_id, 20); + ASSERT_TRUE(s2_id.has_value()); + + // s1 has range [10, 20) + { + auto [begin, end] = getSegmentKeyRange(*s1_id); + ASSERT_EQ(10, begin); + ASSERT_EQ(20, end); + } + + { + // write_rows == segment_rows, start_key not specified + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 10); + ASSERT_EQ(1, blocks.size()); + auto handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), handle_data); + } + { + // write_rows > segment_rows, start_key not specified + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 13); + ASSERT_EQ(2, blocks.size()); + { + auto handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({10, 11, 12, 13, 14, 15, 16, 17, 18, 19}), handle_data); + } + { + auto handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({10, 11, 12}), handle_data); + } + } + { + // start_key specified, end_key - start_key < write_rows + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 2, /* at */ 16); + ASSERT_EQ(1, blocks.size()); + const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17}), handle_data); + } + { + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 4, /* at */ 16); + ASSERT_EQ(1, blocks.size()); + const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17, 18, 19}), handle_data); + } + { + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 5, /* at */ 16); + ASSERT_EQ(2, blocks.size()); + { + const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17, 18, 19}), handle_data); + } + { + const auto & handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16}), handle_data); + } + } + { + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 10, /* at */ 16); + ASSERT_EQ(3, blocks.size()); + { + const auto & handle_column = blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17, 18, 19}), handle_data); + } + { + const auto & handle_column = blocks[1].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17, 18, 19}), handle_data); + } + { + const auto & handle_column = blocks[2].getByName(EXTRA_HANDLE_COLUMN_NAME).column; + const auto & handle_data = typeid_cast &>(*handle_column).getData(); + ASSERT_EQ(PaddedPODArray({16, 17}), handle_data); + } + } + { + // write rows < segment rows, start key not specified, should choose a random start. + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 3); + ASSERT_EQ(1, blocks.size()); + ASSERT_EQ(3, blocks[0].rows()); + } + { + // Let's check whether the generated handles will be starting from 12, for at least once. + auto start_from_12 = 0; + for (size_t i = 0; i < 100; i++) + { + auto blocks = prepareWriteBlocksInSegmentRange(*s1_id, 3); + if (blocks[0].getByName(EXTRA_HANDLE_COLUMN_NAME).column->getInt(0) == 12) + start_from_12++; + } + ASSERT_TRUE(start_from_12 > 0); // We should hit at least 1 times in 100 iters. + ASSERT_TRUE(start_from_12 < 50); // We should not hit 50 times in 100 iters :) + } +} +CATCH + + class SegmentOperationTest : public SegmentTestBasic { protected: diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp new file mode 100644 index 00000000000..8a09cc87594 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_replace_data.cpp @@ -0,0 +1,490 @@ +// Copyright 2022 PingCAP, Ltd. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +namespace DB +{ + +namespace DM +{ + +extern DMFilePtr writeIntoNewDMFile(DMContext & dm_context, + const ColumnDefinesPtr & schema_snap, + const BlockInputStreamPtr & input_stream, + UInt64 file_id, + const String & parent_path, + DMFileBlockOutputStream::Flags flags); + +namespace tests +{ + +class SegmentReplaceDataTest : public SegmentTestBasic + , public testing::WithParamInterface +{ +public: + SegmentReplaceDataTest() + { + replace_to_rows = GetParam(); + } + +protected: + UInt64 replace_to_rows{}; +}; + +INSTANTIATE_TEST_CASE_P( + ReplaceToNRows, + SegmentReplaceDataTest, + testing::Values(0, 37)); // Note: some tests rely on the exact value of 37. Adding arbitrary values may break test. + +class SegmentReplaceDataBasicTest : public SegmentTestBasic +{ +}; + +TEST_F(SegmentReplaceDataBasicTest, ThrowWhenDMFileNotInDelegator) +try +{ + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(Block{}); + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + delegator.choosePath(), + DMFileBlockOutputStream::Flags{}); + + ASSERT_THROW({ + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); + }, + DB::Exception); +} +CATCH + + +TEST_F(SegmentReplaceDataBasicTest, ThrowWhenDMFileNotInPS) +try +{ + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(Block{}); + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + delegator.choosePath(), + DMFileBlockOutputStream::Flags{}); + + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); + + ASSERT_THROW({ + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); + }, + DB::Exception); +} +CATCH + + +TEST_P(SegmentReplaceDataTest, Basic) +try +{ + // Data in memtable should be discarded after replaceData + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + ASSERT_EQ(100, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + { + auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + } + ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + // Even flush will not "rescue" these memtable data. + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 2); // 1 DMFile, 1 Ref + PageId replaced_stable_id{}; + { + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); + replaced_stable_id = *stable_page_ids.begin(); + } + + // Write some data and create a new stable. + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 47, /* at */ replace_to_rows + 100); + ASSERT_EQ(47 + replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(47 + replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(47 + replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); + + auto const stable_files = segments[DELTA_MERGE_FIRST_SEGMENT_ID]->getStable()->getDMFiles(); + { + // Only the new stable DMFile is alive (and we should have a different DMFile). + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); + ASSERT_TRUE(stable_page_ids.count(stable_files[0]->fileId())); + ASSERT_FALSE(stable_page_ids.count(replaced_stable_id)); + } + + // Now let's replace data again. Everything in the current stable will be discarded. + { + auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + } + ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + { + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); + // The stable before replaceData should be not alive anymore. + ASSERT_FALSE(stable_page_ids.count(stable_files[0]->fileId())); + } +} +CATCH + +TEST_P(SegmentReplaceDataTest, WriteAfterReplace) +try +{ + if (replace_to_rows == 0) + { + return; + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 100); + ASSERT_EQ(100, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + { + auto replace_block = prepareWriteBlock(/* from */ 0, /* to */ replace_to_rows); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, replace_block); + } + ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 47, /* at */ replace_to_rows - 10); // 10 rows will be overlapped + ASSERT_EQ(37 + replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + ASSERT_EQ(37 + replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_EQ(47 + replace_to_rows, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); +} +CATCH + + +/** + * This test verify that, the DMFile will never be marked as GCable, during different segment operations. + * Otherwise, the DMFile will be unsafe to be used in another replaceData. + */ +TEST_F(SegmentReplaceDataBasicTest, DMFileGCIsUnchanged) +try +{ + WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(Block{}); + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + delegator.choosePath(), + DMFileBlockOutputStream::Flags{}); + + ingest_wbs.data.putExternal(file_id, /* tag */ 0); + ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); + + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, dm_file); + ASSERT_EQ(0, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + ingest_wbs.rollbackWrittenLogAndData(); + + // Note: we have not yet enabled GC for the dmfile here. + ASSERT_FALSE(dm_file->canGC()); + { + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_TRUE(stable_page_ids.count(dm_file->fileId())); + } + + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 47); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Even when the stable is replaced, the DMFile should not be marked as GCable. + ASSERT_FALSE(dm_file->canGC()); + { + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); + ASSERT_FALSE(stable_page_ids.count(dm_file->fileId())); + } + + // TODO: May be check split and merge as well. + + dm_file->enableGC(); +} +CATCH + + +TEST_P(SegmentReplaceDataTest, MultipleSegmentsSharingDMFile) +try +{ + std::optional seg_right_id; + Block block{}; + + if (replace_to_rows == 0) + { + seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 0); + // block is empty, split point doesn't matter. + } + else + { + seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, replace_to_rows - 10); /* right seg should contain 10 rows after replacing data */ + block = prepareWriteBlock(0, replace_to_rows); + } + + ASSERT_TRUE(seg_right_id.has_value()); + replaceSegmentData({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID}, block); + ASSERT_TRUE(areSegmentsSharingStable({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID})); + + UInt64 expected_left_rows, expected_right_rows; + if (replace_to_rows == 0) + { + expected_left_rows = 0; + expected_right_rows = 0; + } + else + { + expected_left_rows = replace_to_rows - 10; + expected_right_rows = 10; + } + ASSERT_EQ(expected_left_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_EQ(expected_right_rows, getSegmentRowNum(*seg_right_id)); + + // Now let's write something and perform merge delta for the right seg + writeSegment(*seg_right_id, 151); + expected_right_rows += 151; + ASSERT_EQ(expected_right_rows, getSegmentRowNumWithoutMVCC(*seg_right_id)); + flushSegmentCache(*seg_right_id); + mergeSegmentDelta(*seg_right_id); + ASSERT_EQ(expected_right_rows, getSegmentRowNumWithoutMVCC(*seg_right_id)); + // Left is not affected + ASSERT_EQ(expected_left_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_FALSE(areSegmentsSharingStable({*seg_right_id, DELTA_MERGE_FIRST_SEGMENT_ID})); + + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(2, stable_page_ids.size()); + + mergeSegment({DELTA_MERGE_FIRST_SEGMENT_ID, *seg_right_id}); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); +} +CATCH + + +TEST_F(SegmentReplaceDataBasicTest, ReplaceMultipleTimes) +try +{ + for (size_t i = 0; i < 20; ++i) + { + auto rows = std::uniform_int_distribution<>(1, 100)(random); + auto block = prepareWriteBlock(0, rows); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + ASSERT_EQ(rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + + // Write some rows doesn't affect our next replaceData + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + } + + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); +} +CATCH + + +TEST_P(SegmentReplaceDataTest, ReplaceSameDMFileMultipleTimes) +try +{ + auto block = prepareWriteBlock(0, replace_to_rows); + + WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(block); + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + delegator.choosePath(), + DMFileBlockOutputStream::Flags{}); + + ingest_wbs.data.putExternal(file_id, /* tag */ 0); + ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), dm_file->parentPath()); + + for (size_t i = 0; i < 20; ++i) + { + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + ASSERT_EQ(replace_to_rows, getSegmentRowNum(DELTA_MERGE_FIRST_SEGMENT_ID)); + // Write some rows doesn't affect our next replaceData + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID); + } + + dm_file->enableGC(); + ingest_wbs.rollbackWrittenLogAndData(); + + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); +} +CATCH + + +/** + * The out of bound data introduced by replaceData should not be seen after the merge. + */ +TEST_F(SegmentReplaceDataBasicTest, ReplaceOutOfBoundAndMerge) +try +{ + auto seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 100, Segment::SplitMode::Physical); + ASSERT_TRUE(seg_right_id.has_value()); + + writeSegment(*seg_right_id, 10); + ASSERT_EQ(0, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_EQ(10, getSegmentRowNumWithoutMVCC(*seg_right_id)); + + auto block = prepareWriteBlock(0, 300); + // Only replace this block to the left seg, whose range is [-∞, 100). + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + ASSERT_EQ(100, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_EQ(10, getSegmentRowNumWithoutMVCC(*seg_right_id)); + + mergeSegment({DELTA_MERGE_FIRST_SEGMENT_ID, *seg_right_id}); + ASSERT_EQ(110, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); + + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->log_storage_v3->gc(/* not_skip */ true); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + ASSERT_EQ(storage_pool->log_storage_v3->getNumberOfPages(), 0); + ASSERT_EQ(storage_pool->data_storage_v3->getNumberOfPages(), 1); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); +} +CATCH + + +TEST_F(SegmentReplaceDataBasicTest, ReleaseExistingSharedDMFile) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 500, /* at */ 0); + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + + // Use logical split to create two segments sharing the same stable. + auto seg_right_id = splitSegmentAt(DELTA_MERGE_FIRST_SEGMENT_ID, 100, Segment::SplitMode::Logical); + ASSERT_TRUE(seg_right_id.has_value()); + ASSERT_TRUE(areSegmentsSharingStable({DELTA_MERGE_FIRST_SEGMENT_ID, *seg_right_id})); + + ASSERT_EQ(100, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); + ASSERT_EQ(400, getSegmentRowNumWithoutMVCC(*seg_right_id)); + + auto shared_dm_files = segments[*seg_right_id]->getStable()->getDMFiles(); + + // As stable is shared in logical split, we should only have 1 alive external file. + ASSERT_TRUE(storage_pool->log_storage_v3 != nullptr); + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + + // Now let's replace one segment. + auto block = prepareWriteBlock(0, 300); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + + ASSERT_EQ(100, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); // We should only see [0, 100) + ASSERT_EQ(400, getSegmentRowNumWithoutMVCC(*seg_right_id)); + + // The previously-shared stable should be still valid. + storage_pool->data_storage_v3->gc(/* not_skip */ true); + stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(2, stable_page_ids.size()); + ASSERT_TRUE(stable_page_ids.count(shared_dm_files[0]->fileId())); +} +CATCH + + +TEST_F(SegmentReplaceDataBasicTest, ReadSnapshotBeforeReplace) +try +{ + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 400); // 400 in stable + flushSegmentCache(DELTA_MERGE_FIRST_SEGMENT_ID); + mergeSegmentDelta(DELTA_MERGE_FIRST_SEGMENT_ID); + writeSegment(DELTA_MERGE_FIRST_SEGMENT_ID, 41); // 41 in memtable + + auto segment = segments[DELTA_MERGE_FIRST_SEGMENT_ID]; + auto in_stream = segment->getInputStreamRaw(*dm_context, *tableColumns()); + + // Now let's replace data. + auto block = prepareWriteBlock(0, 233); + replaceSegmentData({DELTA_MERGE_FIRST_SEGMENT_ID}, block); + + // There is a snapshot alive, so we should have 2 stables. + storage_pool->data_storage_v3->gc(/* not_skip */ true); + auto stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(2, stable_page_ids.size()); + + // Continue the read + auto n_rows = DB::tests::getInputStreamNRows(in_stream); + ASSERT_EQ(441, n_rows); + + ASSERT_EQ(233, getSegmentRowNumWithoutMVCC(DELTA_MERGE_FIRST_SEGMENT_ID)); + + // Snapshot is dropped. + in_stream = {}; + storage_pool->data_storage_v3->gc(/* not_skip */ true); + stable_page_ids = storage_pool->data_storage_v3->getAliveExternalPageIds(NAMESPACE_ID); + ASSERT_EQ(1, stable_page_ids.size()); +} +CATCH + + +} // namespace tests +} // namespace DM +} // namespace DB diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 084a88de0f7..0860059af1d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -62,6 +62,7 @@ void SegmentTestBasic::reloadWithOptions(SegmentTestOptions config) size_t SegmentTestBasic::getSegmentRowNumWithoutMVCC(PageId segment_id) { + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; auto in = segment->getInputStreamRaw(*dm_context, *tableColumns()); return getInputStreamNRows(in); @@ -69,6 +70,7 @@ size_t SegmentTestBasic::getSegmentRowNumWithoutMVCC(PageId segment_id) size_t SegmentTestBasic::getSegmentRowNum(PageId segment_id) { + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; auto in = segment->getInputStream(*dm_context, *tableColumns(), {segment->getRowKeyRange()}); return getInputStreamNRows(in); @@ -78,6 +80,7 @@ std::optional SegmentTestBasic::splitSegment(PageId segment_id, Segment: { LOG_FMT_INFO(logger_op, "splitSegment, segment_id={} split_mode={}", segment_id, magic_enum::enum_name(split_mode)); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto origin_segment = segments[segment_id]; size_t origin_segment_row_num = getSegmentRowNum(segment_id); @@ -124,6 +127,7 @@ std::optional SegmentTestBasic::splitSegmentAt(PageId segment_id, Int64 split_at_key = RowKeyValue::fromHandle(split_at); } + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto origin_segment = segments[segment_id]; size_t origin_segment_row_num = getSegmentRowNum(segment_id); @@ -167,9 +171,8 @@ void SegmentTestBasic::mergeSegment(const std::vector & segments_id, boo for (const auto segment_id : segments_id) { - auto it = segments.find(segment_id); - RUNTIME_CHECK(it != segments.end(), segment_id); - segments_to_merge.emplace_back(it->second); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + segments_to_merge.emplace_back(segments[segment_id]); auto rows = getSegmentRowNum(segment_id); segments_rows.emplace_back(rows); @@ -203,6 +206,7 @@ void SegmentTestBasic::mergeSegmentDelta(PageId segment_id, bool check_rows) { LOG_FMT_INFO(logger_op, "mergeSegmentDelta, segment_id={}", segment_id); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNum(segment_id); SegmentPtr merged_segment = segment->mergeDelta(*dm_context, tableColumns()); @@ -218,6 +222,7 @@ void SegmentTestBasic::flushSegmentCache(PageId segment_id) { LOG_FMT_INFO(logger_op, "flushSegmentCache, segment_id={}", segment_id); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNum(segment_id); segment->flushCache(*dm_context); @@ -227,9 +232,8 @@ void SegmentTestBasic::flushSegmentCache(PageId segment_id) std::pair SegmentTestBasic::getSegmentKeyRange(PageId segment_id) { - auto segment_it = segments.find(segment_id); - EXPECT_TRUE(segment_it != segments.end()); - const auto & segment = segment_it->second; + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + const auto & segment = segments[segment_id]; Int64 start_key, end_key; if (!options.is_common_handle) @@ -263,80 +267,125 @@ std::pair SegmentTestBasic::getSegmentKeyRange(PageId segment_id) return {start_key, end_key}; } -void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows, std::optional begin_key) +Block SegmentTestBasic::prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted) { - LOG_FMT_INFO(logger_op, "writeSegment, segment_id={} rows={}", segment_id, write_rows); + RUNTIME_CHECK(start_key <= end_key); + if (end_key == start_key) + return Block{}; + version++; + return DMTestEnv::prepareSimpleWriteBlock( + start_key, // + end_key, + false, + version, + DMTestEnv::pk_name, + EXTRA_HANDLE_COLUMN_ID, + options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, + options.is_common_handle, + 1, + true, + is_deleted); +} - if (write_rows == 0) - return; +std::vector SegmentTestBasic::prepareWriteBlocksInSegmentRange(PageId segment_id, UInt64 total_write_rows, std::optional write_start_key, bool is_deleted) +{ + RUNTIME_CHECK(total_write_rows < std::numeric_limits::max()); - RUNTIME_CHECK(write_rows > 0); - RUNTIME_CHECK(write_rows < std::numeric_limits::max()); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto [segment_start_key, segment_end_key] = getSegmentKeyRange(segment_id); + auto segment_max_rows = static_cast(segment_end_key - segment_start_key); - auto segment = segments[segment_id]; - size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); - auto [start_key, end_key] = getSegmentKeyRange(segment_id); + if (segment_max_rows == 0) + return {}; - LOG_FMT_DEBUG(logger, "write to segment, segment={} segment_rows={} start_key={} end_key={}", segment->info(), segment_row_num, start_key, end_key); + if (write_start_key.has_value()) + { + // When write start key is specified, the caller must know exactly the segment range. + RUNTIME_CHECK(*write_start_key >= segment_start_key); + RUNTIME_CHECK(static_cast(segment_end_key - *write_start_key) > 0); + } - auto segment_max_rows = static_cast(end_key - start_key); - if (segment_max_rows == 0) - return; - // If the length of segment key range is larger than `write_rows`, then - // write the new data with the same tso in one block. - // Otherwise create multiple block with increasing tso until the `remain_row_num` - // down to 0. - UInt64 remain_row_num = 0; - if (segment_max_rows > write_rows) + if (!write_start_key.has_value()) { - if (begin_key.has_value()) + // When write start key is unspecified, we will: + // A. If the segment is large enough, we randomly pick a write start key in the range. + // B. If the segment is small, we write from the beginning. + if (segment_max_rows > total_write_rows) { - RUNTIME_CHECK(begin_key >= start_key, *begin_key, start_key); - RUNTIME_CHECK(begin_key < end_key, *begin_key, end_key); - start_key = *begin_key; + write_start_key = std::uniform_int_distribution{segment_start_key, segment_end_key - static_cast(total_write_rows)}(random); } else { - // The segment range is large enough, let's randomly pick a start key: - // Suppose we have segment range = [0, 11), which could contain at most 11 rows. - // Now we want to write 10 rows -- The write start key could be randomized in [0, 1]. - start_key = std::uniform_int_distribution{start_key, end_key - static_cast(write_rows)}(random); + write_start_key = segment_start_key; } - end_key = start_key + write_rows; - } - else - { - remain_row_num = write_rows - segment_max_rows; - RUNTIME_CHECK(!begin_key.has_value()); // Currently we don't support specifying start key when segment is small } + + auto max_write_rows_each_round = static_cast(segment_end_key - *write_start_key); + RUNTIME_CHECK(max_write_rows_each_round > 0); + RUNTIME_CHECK(*write_start_key >= segment_start_key); + + std::vector blocks; + + // If the length of segment key range is larger than `write_rows`, then + // write the new data with the same tso in one block. + // Otherwise create multiple block with increasing tso until the `remain_row_num` + // down to 0. + UInt64 remaining_rows = total_write_rows; + while (remaining_rows > 0) { - // write to segment and not flush - LOG_FMT_DEBUG(logger, "write block to segment, block_range=[{}, {})", start_key, end_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); - segment->write(*dm_context, std::move(block), false); - version++; + UInt64 write_rows_this_round = std::min(remaining_rows, max_write_rows_each_round); + RUNTIME_CHECK(write_rows_this_round > 0); + Int64 write_end_key_this_round = *write_start_key + static_cast(write_rows_this_round); + RUNTIME_CHECK(write_end_key_this_round <= segment_end_key); + + Block block = prepareWriteBlock(*write_start_key, write_end_key_this_round, is_deleted); + blocks.emplace_back(block); + remaining_rows -= write_rows_this_round; + + LOG_FMT_DEBUG(logger, "Prepared block for write, block_range=[{}, {}) (rows={}), total_rows_to_write={} remain_rows={}", // + *write_start_key, + write_end_key_this_round, + write_rows_this_round, + total_write_rows, + remaining_rows); } - while (remain_row_num > 0) + + return blocks; +} + +void SegmentTestBasic::writeSegment(PageId segment_id, UInt64 write_rows, std::optional start_at) +{ + LOG_FMT_INFO(logger_op, "writeSegment, segment_id={} write_rows={}", segment_id, write_rows); + + if (write_rows == 0) + return; + + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto segment = segments[segment_id]; + size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); + auto [start_key, end_key] = getSegmentKeyRange(segment_id); + LOG_FMT_DEBUG(logger, "write to segment, segment={} segment_rows={} start_key={} end_key={}", segment->info(), segment_row_num, start_key, end_key); + + auto blocks = prepareWriteBlocksInSegmentRange(segment_id, write_rows, start_at, /* is_deleted */ false); + for (const auto & block : blocks) { - UInt64 write_num = std::min(remain_row_num, segment_max_rows); - LOG_FMT_DEBUG(logger, "write block to segment, block_range=[{}, {})", start_key, write_num + start_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); - segment->write(*dm_context, std::move(block), false); - remain_row_num -= write_num; - version++; + segment->write(*dm_context, block, false); } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); operation_statistics["write"]++; } -void SegmentTestBasic::ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows) +void SegmentTestBasic::ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows, std::optional start_at) { - LOG_FMT_INFO(logger_op, "ingestDTFileIntoSegment, segment_id={} rows={}", segment_id, write_rows); + LOG_FMT_INFO(logger_op, "ingestDTFileIntoSegment, segment_id={} write_rows={}", segment_id, write_rows); if (write_rows == 0) return; - auto write_data = [&](SegmentPtr segment, const Block & block) { + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + + auto ingest_data = [&](SegmentPtr segment, const Block & block) { WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); auto delegator = storage_path_pool->getStableDiskDelegator(); auto parent_path = delegator.choosePath(); @@ -370,85 +419,37 @@ void SegmentTestBasic::ingestDTFileIntoSegment(PageId segment_id, UInt64 write_r auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); auto [start_key, end_key] = getSegmentKeyRange(segment_id); + LOG_FMT_DEBUG(logger, "ingest to segment, segment={} segment_rows={} start_key={} end_key={}", segment->info(), segment_row_num, start_key, end_key); - auto segment_max_rows = static_cast(end_key - start_key); - if (segment_max_rows == 0) - return; - // If the length of segment key range is larger than `write_rows`, then - // write the new data with the same tso in one block. - // Otherwise create multiple block with increasing tso until the `remain_row_num` - // down to 0. - UInt64 remain_row_num = 0; - if (segment_max_rows > write_rows) - { - start_key = std::uniform_int_distribution{start_key, end_key - static_cast(write_rows)}(random); - end_key = start_key + write_rows; - } - else + auto blocks = prepareWriteBlocksInSegmentRange(segment_id, write_rows, start_at, /* is_deleted */ false); + for (const auto & block : blocks) { - remain_row_num = write_rows - segment_max_rows; - } - { - // write to segment and not flush - LOG_FMT_DEBUG(logger, "ingest block to segment, block_range=[{}, {})", start_key, end_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); - write_data(segment, block); - version++; - } - while (remain_row_num > 0) - { - UInt64 write_num = std::min(remain_row_num, segment_max_rows); - LOG_FMT_DEBUG(logger, "ingest block to segment, block_range=[{}, {})", start_key, write_num + start_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle); - write_data(segment, block); - remain_row_num -= write_num; - version++; + ingest_data(segment, block); } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); operation_statistics["ingest"]++; } -void SegmentTestBasic::writeSegmentWithDeletedPack(PageId segment_id, UInt64 write_rows) +void SegmentTestBasic::writeSegmentWithDeletedPack(PageId segment_id, UInt64 write_rows, std::optional start_at) { - LOG_FMT_INFO(logger_op, "writeSegmentWithDeletedPack, segment_id={}", segment_id); + LOG_FMT_INFO(logger_op, "writeSegmentWithDeletedPack, segment_id={} write_rows={}", segment_id, write_rows); + if (write_rows == 0) + return; + + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; size_t segment_row_num = getSegmentRowNumWithoutMVCC(segment_id); auto [start_key, end_key] = getSegmentKeyRange(segment_id); + LOG_FMT_DEBUG(logger, "write deleted pack to segment, segment={} segment_rows={} start_key={} end_key={}", segment->info(), segment_row_num, start_key, end_key); - auto segment_max_rows = static_cast(end_key - start_key); - if (segment_max_rows == 0) - return; - // If the length of segment key range is larger than `write_rows`, then - // write the new data with the same tso in one block. - // Otherwise create multiple block with increasing tso until the `remain_row_num` - // down to 0. - UInt64 remain_row_num = 0; - if (segment_max_rows > write_rows) - { - start_key = std::uniform_int_distribution{start_key, end_key - static_cast(write_rows)}(random); - end_key = start_key + write_rows; - } - else + auto blocks = prepareWriteBlocksInSegmentRange(segment_id, write_rows, start_at, /* is_deleted */ true); + for (const auto & block : blocks) { - remain_row_num = write_rows - segment_max_rows; - } - { - // write to segment and not flush - LOG_FMT_DEBUG(logger, "write block to segment, block_range=[{}, {})", start_key, end_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, end_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle, 1, true, true); - segment->write(*dm_context, std::move(block), true); - version++; - } - while (remain_row_num > 0) - { - UInt64 write_num = std::min(remain_row_num, segment_max_rows); - LOG_FMT_DEBUG(logger, "write block to segment, block_range=[{}, {})", start_key, write_num + start_key); - Block block = DMTestEnv::prepareSimpleWriteBlock(start_key, write_num + start_key, false, version, DMTestEnv::pk_name, EXTRA_HANDLE_COLUMN_ID, options.is_common_handle ? EXTRA_HANDLE_COLUMN_STRING_TYPE : EXTRA_HANDLE_COLUMN_INT_TYPE, options.is_common_handle, 1, true, true); - segment->write(*dm_context, std::move(block), true); - remain_row_num -= write_num; - version++; + segment->write(*dm_context, block, false); } + EXPECT_EQ(getSegmentRowNumWithoutMVCC(segment_id), segment_row_num + write_rows); operation_statistics["writeDelete"]++; } @@ -457,15 +458,63 @@ void SegmentTestBasic::deleteRangeSegment(PageId segment_id) { LOG_FMT_INFO(logger_op, "deleteRangeSegment, segment_id={}", segment_id); + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); auto segment = segments[segment_id]; segment->write(*dm_context, /*delete_range*/ segment->getRowKeyRange()); EXPECT_EQ(getSegmentRowNum(segment_id), 0); - operation_statistics["deleteRange"]++; +} + +void SegmentTestBasic::replaceSegmentData(const std::vector & segments_id, const Block & block) +{ + LOG_FMT_DEBUG(logger, "replace segment data using block, segments_id={} block_rows={}", fmt::join(segments_id, ","), block.rows()); + + auto delegator = storage_path_pool->getStableDiskDelegator(); + auto parent_path = delegator.choosePath(); + auto file_provider = db_context->getFileProvider(); + + WriteBatches ingest_wbs(dm_context->storage_pool, dm_context->getWriteLimiter()); + + auto file_id = storage_pool->newDataPageIdForDTFile(delegator, __PRETTY_FUNCTION__); + auto input_stream = std::make_shared(block); + auto dm_file = writeIntoNewDMFile( + *dm_context, + table_columns, + input_stream, + file_id, + parent_path, + {}); + + ingest_wbs.data.putExternal(file_id, /* tag */ 0); + ingest_wbs.writeLogAndData(); + delegator.addDTFile(file_id, dm_file->getBytesOnDisk(), parent_path); + + replaceSegmentData(segments_id, dm_file); + + dm_file->enableGC(); + ingest_wbs.rollbackWrittenLogAndData(); +} + +void SegmentTestBasic::replaceSegmentData(const std::vector & segments_id, const DMFilePtr & file) +{ + LOG_FMT_INFO(logger_op, "replaceSegmentData, segments_id={} file_rows={} file={}", fmt::join(segments_id, ","), file->getRows(), file->path()); + + for (const auto segment_id : segments_id) + { + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto segment = segments[segment_id]; + auto new_segment = segment->dangerouslyReplaceDataForTest(*dm_context, file); + ASSERT_TRUE(new_segment != nullptr); + segments[new_segment->segmentId()] = new_segment; + } + operation_statistics["replaceData"]++; } bool SegmentTestBasic::areSegmentsSharingStable(const std::vector & segments_id) { RUNTIME_CHECK(segments_id.size() >= 2); + for (auto segment_id : segments_id) + RUNTIME_CHECK(segments.find(segment_id) != segments.end()); + auto base_stable = segments[segments_id[0]]->getStable()->getDMFilesString(); for (size_t i = 1; i < segments_id.size(); i++) { @@ -492,7 +541,7 @@ SegmentPtr SegmentTestBasic::reload(bool is_common_handle, const ColumnDefinesPt { TiFlashStorageTestBasic::reload(std::move(db_settings)); storage_path_pool = std::make_unique(db_context->getPathPool().withTable("test", "t1", false)); - storage_pool = std::make_unique(*db_context, /*ns_id*/ 100, *storage_path_pool, "test.t1"); + storage_pool = std::make_unique(*db_context, NAMESPACE_ID, *storage_path_pool, "test.t1"); storage_pool->restore(); ColumnDefinesPtr cols = (!pre_define_columns) ? DMTestEnv::getDefaultColumns(is_common_handle ? DMTestEnv::PkType::CommonHandle : DMTestEnv::PkType::HiddenTiDBRowID) : pre_define_columns; setColumns(cols); diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h index 0f79e1e6985..51d7605684d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.h @@ -62,11 +62,20 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic * When begin_key is specified, new rows will be written from specified key. Otherwise, new rows may be * written randomly in the segment range. */ - void writeSegment(PageId segment_id, UInt64 write_rows = 100, std::optional begin_key = std::nullopt); - void ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows = 100); - void writeSegmentWithDeletedPack(PageId segment_id, UInt64 write_rows = 100); + void writeSegment(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); + void ingestDTFileIntoSegment(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); + void writeSegmentWithDeletedPack(PageId segment_id, UInt64 write_rows = 100, std::optional start_at = std::nullopt); void deleteRangeSegment(PageId segment_id); + /** + * This function does not check rows. + */ + void replaceSegmentData(const std::vector & segments_id, const DMFilePtr & file); + void replaceSegmentData(const std::vector & segments_id, const Block & block); + + Block prepareWriteBlock(Int64 start_key, Int64 end_key, bool is_deleted = false); + std::vector prepareWriteBlocksInSegmentRange(PageId segment_id, UInt64 total_write_rows, std::optional write_start_key = std::nullopt, bool is_deleted = false); + size_t getSegmentRowNumWithoutMVCC(PageId segment_id); size_t getSegmentRowNum(PageId segment_id); @@ -75,7 +84,7 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic /** * You must pass at least 2 segments. Checks whether all segments passed in are sharing the same stable. */ - bool areSegmentsSharingStable(const std::vector & segments_id); + [[nodiscard]] bool areSegmentsSharingStable(const std::vector & segments_id); std::pair getSegmentKeyRange(PageId segment_id); @@ -104,6 +113,8 @@ class SegmentTestBasic : public DB::base::TiFlashStorageTestBasic void reloadDMContext(); protected: + inline static constexpr PageId NAMESPACE_ID = 100; + /// all these var lives as ref in dm_context std::unique_ptr storage_path_pool; std::unique_ptr storage_pool; diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp index 4b4cf19bf82..90826480422 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_randomized.cpp @@ -71,9 +71,10 @@ class SegmentRandomizedTest : public SegmentTestBasic {0.1, &SegmentRandomizedTest::deleteRangeRandomSegment}, {1.0, &SegmentRandomizedTest::splitRandomSegment}, {1.0, &SegmentRandomizedTest::splitAtRandomSegment}, - {0.25, &SegmentRandomizedTest::mergeRandomSegment}, + {0.25, &SegmentRandomizedTest::mergeRandomSegments}, {1.0, &SegmentRandomizedTest::mergeDeltaRandomSegment}, {1.0, &SegmentRandomizedTest::flushCacheRandomSegment}, + {0.5, &SegmentRandomizedTest::replaceRandomSegmentsData}, {0.25, &SegmentRandomizedTest::writeRandomSegmentWithDeletedPack}}; /** @@ -139,7 +140,7 @@ class SegmentRandomizedTest : public SegmentTestBasic splitSegmentAt(segment_id, split_at, split_mode); } - void mergeRandomSegment() + void mergeRandomSegments() { if (segments.size() < 2) return; @@ -166,6 +167,68 @@ class SegmentRandomizedTest : public SegmentTestBasic flushSegmentCache(random_segment_id); } + void replaceRandomSegmentsData() + { + if (segments.empty()) + return; + + auto segments_to_pick = std::uniform_int_distribution{1, 5}(random); + std::vector segments_list; + std::map expected_data_each_segment; + for (size_t i = 0; i < segments_to_pick; ++i) + { + auto id = getRandomSegmentId(); // allow duplicate + segments_list.emplace_back(id); + expected_data_each_segment[id] = 0; + } + + auto [min_key, max_key] = getSegmentKeyRange(segments_list[0]); + for (size_t i = 1; i < segments_to_pick; ++i) + { + auto [new_min_key, new_max_key] = getSegmentKeyRange(segments_list[i]); + if (new_min_key < min_key) + min_key = new_min_key; + if (new_max_key > max_key) + max_key = new_max_key; + } + + Block block{}; + if (max_key > min_key) + { + // Now let's generate some data. + std::vector n_rows_collection{0, 10, 50, 1000}; + auto block_rows = n_rows_collection[std::uniform_int_distribution{0, n_rows_collection.size() - 1}(random)]; + if (block_rows > 0) + { + auto block_start_key = std::uniform_int_distribution{min_key, max_key - 1}(random); + auto block_end_key = block_start_key + static_cast(block_rows); + block = prepareWriteBlock(block_start_key, block_end_key); + + // How many data will we have for each segment after replacing data? It should be BlockRange ∩ SegmentRange. + for (auto segment_id : segments_list) + { + auto [seg_min_key, seg_max_key] = getSegmentKeyRange(segment_id); + auto intersect_min = std::max(seg_min_key, block_start_key); + auto intersect_max = std::min(seg_max_key, block_end_key); + if (intersect_min <= intersect_max) + { + // There is an intersection + expected_data_each_segment[segment_id] = static_cast(intersect_max - intersect_min); + } + } + } + } + + LOG_FMT_DEBUG(logger, "start random replace segment data, segments_id={} block_rows={} all_segments={}", fmt::join(segments_list, ","), block.rows(), segments.size()); + replaceSegmentData({segments_list}, block); + + // Verify rows. + for (auto segment_id : segments_list) + { + EXPECT_EQ(getSegmentRowNum(segment_id), expected_data_each_segment[segment_id]); + } + } + Segment::SplitMode getRandomSplitMode() { int mode = std::uniform_int_distribution{1, 2}(random);