From 0f776f1e25d008ca6b4984d8631db82e178dc0a4 Mon Sep 17 00:00:00 2001 From: flow Date: Fri, 18 Oct 2019 18:24:33 +0800 Subject: [PATCH] address comments --- .../Storages/DeltaMerge/DeltaMergeDefines.h | 2 ++ .../Storages/DeltaMerge/DeltaMergeStore.cpp | 24 +++++++-------- .../src/Storages/DeltaMerge/DeltaMergeStore.h | 7 +---- dbms/src/Storages/DeltaMerge/DiskValueSpace.h | 4 +-- dbms/src/Storages/DeltaMerge/Segment.cpp | 29 ++----------------- dbms/src/Storages/DeltaMerge/Segment.h | 24 ++------------- 6 files changed, 22 insertions(+), 68 deletions(-) diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h index c6b6f5de29b..a113b560efb 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h @@ -131,5 +131,7 @@ static_assert(static_cast(static_cast(MAX_INT64)) == MAX_INT64, " static constexpr bool DM_RUN_CHECK = true; +#define WARN_UNUSED_RESULT __attribute__((warn_unused_result)) + } // namespace DM } // namespace DB \ No newline at end of file diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp index aa96a011a13..01086ad389f 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp @@ -178,7 +178,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_ action.task = action.segment->createAppendTask(op_context, wbs, action.update); } - commitWrites(actions, wbs, dm_context, op_context, db_context, true); + commitWrites(actions, wbs, dm_context, op_context); } @@ -222,15 +222,13 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings // TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be acutally removed. - commitWrites(actions, wbs, dm_context, op_context, db_context, false); + commitWrites(actions, wbs, dm_context, op_context); } void DeltaMergeStore::commitWrites(const WriteActions & actions, WriteBatches & wbs, const DMContextPtr & dm_context, - OpContext & op_context, - const Context & /*db_context*/, - bool /*is_upsert*/) + OpContext & op_context) { // Save generated chunks to disk. { @@ -420,19 +418,19 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const size_t delta_rows = segment->deltaRows(/* with_delta_cache */ by_write_thread); size_t delta_updates = segment->updatesInDeltaTree(); - size_t segment_rows = segment->getEstimatedRows(); - - bool should_split = segment_rows >= dm_context->segment_limit_rows * 2; - bool force_split = segment_rows >= dm_context->segment_limit_rows * 10; - - bool should_merge = segment_rows < dm_context->segment_limit_rows / 4; bool should_background_merge_delta = std::max(delta_rows, delta_updates) >= dm_context->delta_limit_rows; bool should_foreground_merge_delta = std::max(delta_rows, delta_updates) >= dm_context->segment_limit_rows * 5; if (by_write_thread) { - // Only write thread will check split. + // Only write thread will check split & merge. + size_t segment_rows = segment->getEstimatedRows(); + + bool should_split = segment_rows >= dm_context->segment_limit_rows * 2; + bool force_split = segment_rows >= dm_context->segment_limit_rows * 10; + + bool should_merge = segment_rows < dm_context->segment_limit_rows / 4; if (force_split || (should_split && !segment->isBackgroundMergeDelta())) { @@ -632,7 +630,7 @@ void DeltaMergeStore::segmentMergeDelta(DMContext & dm_context, LOG_DEBUG(log, "Segment [" << segment->segmentId() << "] apply " << (is_foreground ? "foreground" : "background") << " merge delta"); - auto new_segment = segment->applyMergeDelta(dm_context, segment_snap, storage_snap, wbs, new_stable); + auto new_segment = segment->applyMergeDelta(segment_snap, wbs, new_stable); wbs.writeMeta(storage_pool); diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h index f69f9d74e07..6fa5f8771cc 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore.h @@ -207,12 +207,7 @@ class DeltaMergeStore : private boost::noncopyable const OptionTableInfoConstRef table_info, ColumnID & max_column_id_used); - void commitWrites(const WriteActions & actions, - WriteBatches & wbs, - const DMContextPtr & dm_context, - OpContext & op_context, - const Context & db_context, - bool is_upsert); + void commitWrites(const WriteActions & actions, WriteBatches & wbs, const DMContextPtr & dm_context, OpContext & op_context); bool isSegmentValid(const SegmentPtr & segment); diff --git a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h index 141b528ca46..6b8ddd5e48e 100644 --- a/dbms/src/Storages/DeltaMerge/DiskValueSpace.h +++ b/dbms/src/Storages/DeltaMerge/DiskValueSpace.h @@ -37,8 +37,8 @@ struct WriteBatches WriteBatch data; WriteBatch meta; - Ids writtenLog; - Ids writtenData; + PageIds writtenLog; + PageIds writtenData; WriteBatch removed_log; WriteBatch removed_data; diff --git a/dbms/src/Storages/DeltaMerge/Segment.cpp b/dbms/src/Storages/DeltaMerge/Segment.cpp index e376918b5e1..1862706eb81 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.cpp +++ b/dbms/src/Storages/DeltaMerge/Segment.cpp @@ -76,25 +76,6 @@ Segment::Segment(UInt64 epoch_, const HandleRange & range_, PageId segment_id_, { } -//Segment::Segment(UInt64 epoch_, -// const HandleRange & range_, -// PageId segment_id_, -// PageId next_segment_id_, -// PageId delta_id, -// const Chunks & delta_chunks_, -// PageId stable_id, -// const Chunks & stable_chunks_) -// : epoch(epoch_), -// range(range_), -// segment_id(segment_id_), -// next_segment_id(next_segment_id_), -// delta(std::make_shared(true, delta_id, delta_chunks_)), -// stable(std::make_shared(false, stable_id, stable_chunks_)), -// delta_tree(std::make_shared()), -// log(&Logger::get("Segment")) -//{ -//} - Segment::Segment(UInt64 epoch_, // const HandleRange & range_, PageId segment_id_, @@ -346,7 +327,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_contex EMPTY_FILTER); delta_stream = std::make_shared>(delta_stream, range, 0); - BlockInputStreamPtr stable_stream = std::make_shared(stable->getChunks(), // + BlockInputStreamPtr stable_stream = std::make_shared(segment_snap.stable->getChunks(), // new_columns_to_read, storage_snap.data_reader, EMPTY_FILTER); @@ -489,11 +470,7 @@ DiskValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context, return new_stable; } -SegmentPtr Segment::applyMergeDelta(DMContext & /*dm_context*/, - const SegmentSnapshot & segment_snap, - const StorageSnapshot & /*storage_snap*/, - WriteBatches & wbs, - const DiskValueSpacePtr & new_stable) const +SegmentPtr Segment::applyMergeDelta(const SegmentSnapshot & segment_snap, WriteBatches & wbs, const DiskValueSpacePtr & new_stable) const { LOG_DEBUG(log, "Segment [" << DB::toString(segment_id) << "] apply merge delta start."); @@ -525,7 +502,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context, WriteBatches & wbs) const { auto new_stable = prepareMergeDelta(dm_context, segment_snap, storage_snap, wbs); - return applyMergeDelta(dm_context, segment_snap, storage_snap, wbs, new_stable); + return applyMergeDelta(segment_snap, wbs, new_stable); } SegmentPtr Segment::mergeDelta(DMContext & dm_context) const diff --git a/dbms/src/Storages/DeltaMerge/Segment.h b/dbms/src/Storages/DeltaMerge/Segment.h index fcb3970b3fd..7c575a1d713 100644 --- a/dbms/src/Storages/DeltaMerge/Segment.h +++ b/dbms/src/Storages/DeltaMerge/Segment.h @@ -44,14 +44,6 @@ struct DeltaValueSpace } } rows = block.rows(); - - // Debug code !!! - // String s; - // for (size_t i = 0; i < rows; ++i) - // { - // s += DB::toString((*handle_column)[i]) + ","; - // } - // std::cout << s << std::endl; } void insertValue(IColumn & des, size_t column_index, UInt64 value_id) // @@ -125,14 +117,6 @@ class Segment : private boost::noncopyable PageId delta_id, PageId stable_id); - // Segment(UInt64 epoch_, // - // const HandleRange & range_, - // PageId segment_id_, - // PageId next_segment_id_, - // PageId delta_id, - // const Chunks & delta_chunks_, - // PageId stable_id, - // const Chunks & stable_chunks_); Segment(UInt64 epoch_, // const HandleRange & range_, PageId segment_id_, @@ -196,17 +180,15 @@ class Segment : private boost::noncopyable const SegmentSnapshot & segment_snap, const StorageSnapshot & storage_snap, WriteBatches & wbs) const; - SegmentPtr applyMergeDelta(DMContext & dm_context, - const SegmentSnapshot & segment_snap, - const StorageSnapshot & storage_snap, - WriteBatches & wbs, - const DiskValueSpacePtr & new_stable) const; + SegmentPtr applyMergeDelta(const SegmentSnapshot & segment_snap, WriteBatches & wbs, const DiskValueSpacePtr & new_stable) const; /// Note that we should replace this object with return object, or we can not read latest data after `mergeDelta`. + WARN_UNUSED_RESULT SegmentPtr mergeDelta(DMContext & dm_context, const SegmentSnapshot & segment_snap, const StorageSnapshot & storage_snap, WriteBatches & wbs) const; + WARN_UNUSED_RESULT SegmentPtr mergeDelta(DMContext & dm_context) const; /// Flush delta's cache chunks.