Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
flowbehappy committed Oct 18, 2019
1 parent ea526ab commit 0f776f1
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 68 deletions.
2 changes: 2 additions & 0 deletions dbms/src/Storages/DeltaMerge/DeltaMergeDefines.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,5 +131,7 @@ static_assert(static_cast<Int64>(static_cast<UInt64>(MAX_INT64)) == MAX_INT64, "

static constexpr bool DM_RUN_CHECK = true;

#define WARN_UNUSED_RESULT __attribute__((warn_unused_result))

} // namespace DM
} // namespace DB
24 changes: 11 additions & 13 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void DeltaMergeStore::write(const Context & db_context, const DB::Settings & db_
action.task = action.segment->createAppendTask(op_context, wbs, action.update);
}

commitWrites(actions, wbs, dm_context, op_context, db_context, true);
commitWrites(actions, wbs, dm_context, op_context);
}


Expand Down Expand Up @@ -222,15 +222,13 @@ void DeltaMergeStore::deleteRange(const Context & db_context, const DB::Settings

// TODO: We need to do a delta merge after write a delete range, otherwise, the rows got deleted could never be acutally removed.

commitWrites(actions, wbs, dm_context, op_context, db_context, false);
commitWrites(actions, wbs, dm_context, op_context);
}

void DeltaMergeStore::commitWrites(const WriteActions & actions,
WriteBatches & wbs,
const DMContextPtr & dm_context,
OpContext & op_context,
const Context & /*db_context*/,
bool /*is_upsert*/)
OpContext & op_context)
{
// Save generated chunks to disk.
{
Expand Down Expand Up @@ -420,19 +418,19 @@ void DeltaMergeStore::checkSegmentUpdate(const DMContextPtr & dm_context, const

size_t delta_rows = segment->deltaRows(/* with_delta_cache */ by_write_thread);
size_t delta_updates = segment->updatesInDeltaTree();
size_t segment_rows = segment->getEstimatedRows();

bool should_split = segment_rows >= dm_context->segment_limit_rows * 2;
bool force_split = segment_rows >= dm_context->segment_limit_rows * 10;

bool should_merge = segment_rows < dm_context->segment_limit_rows / 4;

bool should_background_merge_delta = std::max(delta_rows, delta_updates) >= dm_context->delta_limit_rows;
bool should_foreground_merge_delta = std::max(delta_rows, delta_updates) >= dm_context->segment_limit_rows * 5;

if (by_write_thread)
{
// Only write thread will check split.
// Only write thread will check split & merge.
size_t segment_rows = segment->getEstimatedRows();

bool should_split = segment_rows >= dm_context->segment_limit_rows * 2;
bool force_split = segment_rows >= dm_context->segment_limit_rows * 10;

bool should_merge = segment_rows < dm_context->segment_limit_rows / 4;

if (force_split || (should_split && !segment->isBackgroundMergeDelta()))
{
Expand Down Expand Up @@ -632,7 +630,7 @@ void DeltaMergeStore::segmentMergeDelta(DMContext & dm_context,
LOG_DEBUG(log,
"Segment [" << segment->segmentId() << "] apply " << (is_foreground ? "foreground" : "background") << " merge delta");

auto new_segment = segment->applyMergeDelta(dm_context, segment_snap, storage_snap, wbs, new_stable);
auto new_segment = segment->applyMergeDelta(segment_snap, wbs, new_stable);

wbs.writeMeta(storage_pool);

Expand Down
7 changes: 1 addition & 6 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -207,12 +207,7 @@ class DeltaMergeStore : private boost::noncopyable
const OptionTableInfoConstRef table_info,
ColumnID & max_column_id_used);

void commitWrites(const WriteActions & actions,
WriteBatches & wbs,
const DMContextPtr & dm_context,
OpContext & op_context,
const Context & db_context,
bool is_upsert);
void commitWrites(const WriteActions & actions, WriteBatches & wbs, const DMContextPtr & dm_context, OpContext & op_context);

bool isSegmentValid(const SegmentPtr & segment);

Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/DeltaMerge/DiskValueSpace.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ struct WriteBatches
WriteBatch data;
WriteBatch meta;

Ids writtenLog;
Ids writtenData;
PageIds writtenLog;
PageIds writtenData;

WriteBatch removed_log;
WriteBatch removed_data;
Expand Down
29 changes: 3 additions & 26 deletions dbms/src/Storages/DeltaMerge/Segment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,6 @@ Segment::Segment(UInt64 epoch_, const HandleRange & range_, PageId segment_id_,
{
}

//Segment::Segment(UInt64 epoch_,
// const HandleRange & range_,
// PageId segment_id_,
// PageId next_segment_id_,
// PageId delta_id,
// const Chunks & delta_chunks_,
// PageId stable_id,
// const Chunks & stable_chunks_)
// : epoch(epoch_),
// range(range_),
// segment_id(segment_id_),
// next_segment_id(next_segment_id_),
// delta(std::make_shared<DiskValueSpace>(true, delta_id, delta_chunks_)),
// stable(std::make_shared<DiskValueSpace>(false, stable_id, stable_chunks_)),
// delta_tree(std::make_shared<DefaultDeltaTree>()),
// log(&Logger::get("Segment"))
//{
//}

Segment::Segment(UInt64 epoch_, //
const HandleRange & range_,
PageId segment_id_,
Expand Down Expand Up @@ -346,7 +327,7 @@ BlockInputStreamPtr Segment::getInputStreamRaw(const DMContext & dm_contex
EMPTY_FILTER);
delta_stream = std::make_shared<DMHandleFilterBlockInputStream<false>>(delta_stream, range, 0);

BlockInputStreamPtr stable_stream = std::make_shared<ChunkBlockInputStream>(stable->getChunks(), //
BlockInputStreamPtr stable_stream = std::make_shared<ChunkBlockInputStream>(segment_snap.stable->getChunks(), //
new_columns_to_read,
storage_snap.data_reader,
EMPTY_FILTER);
Expand Down Expand Up @@ -489,11 +470,7 @@ DiskValueSpacePtr Segment::prepareMergeDelta(DMContext & dm_context,
return new_stable;
}

SegmentPtr Segment::applyMergeDelta(DMContext & /*dm_context*/,
const SegmentSnapshot & segment_snap,
const StorageSnapshot & /*storage_snap*/,
WriteBatches & wbs,
const DiskValueSpacePtr & new_stable) const
SegmentPtr Segment::applyMergeDelta(const SegmentSnapshot & segment_snap, WriteBatches & wbs, const DiskValueSpacePtr & new_stable) const
{
LOG_DEBUG(log, "Segment [" << DB::toString(segment_id) << "] apply merge delta start.");

Expand Down Expand Up @@ -525,7 +502,7 @@ SegmentPtr Segment::mergeDelta(DMContext & dm_context,
WriteBatches & wbs) const
{
auto new_stable = prepareMergeDelta(dm_context, segment_snap, storage_snap, wbs);
return applyMergeDelta(dm_context, segment_snap, storage_snap, wbs, new_stable);
return applyMergeDelta(segment_snap, wbs, new_stable);
}

SegmentPtr Segment::mergeDelta(DMContext & dm_context) const
Expand Down
24 changes: 3 additions & 21 deletions dbms/src/Storages/DeltaMerge/Segment.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,6 @@ struct DeltaValueSpace
}
}
rows = block.rows();

// Debug code !!!
// String s;
// for (size_t i = 0; i < rows; ++i)
// {
// s += DB::toString((*handle_column)[i]) + ",";
// }
// std::cout << s << std::endl;
}

void insertValue(IColumn & des, size_t column_index, UInt64 value_id) //
Expand Down Expand Up @@ -125,14 +117,6 @@ class Segment : private boost::noncopyable
PageId delta_id,
PageId stable_id);

// Segment(UInt64 epoch_, //
// const HandleRange & range_,
// PageId segment_id_,
// PageId next_segment_id_,
// PageId delta_id,
// const Chunks & delta_chunks_,
// PageId stable_id,
// const Chunks & stable_chunks_);
Segment(UInt64 epoch_, //
const HandleRange & range_,
PageId segment_id_,
Expand Down Expand Up @@ -196,17 +180,15 @@ class Segment : private boost::noncopyable
const SegmentSnapshot & segment_snap,
const StorageSnapshot & storage_snap,
WriteBatches & wbs) const;
SegmentPtr applyMergeDelta(DMContext & dm_context,
const SegmentSnapshot & segment_snap,
const StorageSnapshot & storage_snap,
WriteBatches & wbs,
const DiskValueSpacePtr & new_stable) const;
SegmentPtr applyMergeDelta(const SegmentSnapshot & segment_snap, WriteBatches & wbs, const DiskValueSpacePtr & new_stable) const;

/// Note that we should replace this object with return object, or we can not read latest data after `mergeDelta`.
WARN_UNUSED_RESULT
SegmentPtr mergeDelta(DMContext & dm_context,
const SegmentSnapshot & segment_snap,
const StorageSnapshot & storage_snap,
WriteBatches & wbs) const;
WARN_UNUSED_RESULT
SegmentPtr mergeDelta(DMContext & dm_context) const;

/// Flush delta's cache chunks.
Expand Down

0 comments on commit 0f776f1

Please sign in to comment.