Skip to content

Commit

Permalink
Eliminate some code duplication in MergeHelper (#12121)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12121

The patch eliminates some code duplication by unifying the two sets of `MergeHelper::TimedFullMerge` overloads using variadic templates. It also brings the order of parameters into sync when it comes to the various `TimedFullMerge*` methods.

Reviewed By: jaykorean

Differential Revision: D51862483

fbshipit-source-id: e3f832a6ff89ba34591451655cf11025d0a0d018
  • Loading branch information
ltamasi authored and facebook-github-bot committed Dec 5, 2023
1 parent 2045fe4 commit 0ebe161
Show file tree
Hide file tree
Showing 8 changed files with 103 additions and 240 deletions.
12 changes: 6 additions & 6 deletions db/db_iter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1293,8 +1293,8 @@ bool DBIter::MergeWithNoBaseValue(const Slice& user_key) {
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, MergeHelper::kNoBaseValue,
merge_context_.GetOperands(), logger_, statistics_, clock_,
/* update_num_ops_stats */ true, &saved_value_, &pinned_value_,
&result_type, /* op_failure_scope */ nullptr);
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
&saved_value_, &pinned_value_, &result_type);
return SetValueAndColumnsFromMergeResult(s, result_type);
}

Expand All @@ -1306,8 +1306,8 @@ bool DBIter::MergeWithPlainBaseValue(const Slice& value,
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, MergeHelper::kPlainBaseValue, value,
merge_context_.GetOperands(), logger_, statistics_, clock_,
/* update_num_ops_stats */ true, &saved_value_, &pinned_value_,
&result_type, /* op_failure_scope */ nullptr);
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
&saved_value_, &pinned_value_, &result_type);
return SetValueAndColumnsFromMergeResult(s, result_type);
}

Expand All @@ -1319,8 +1319,8 @@ bool DBIter::MergeWithWideColumnBaseValue(const Slice& entity,
const Status s = MergeHelper::TimedFullMerge(
merge_operator_, user_key, MergeHelper::kWideBaseValue, entity,
merge_context_.GetOperands(), logger_, statistics_, clock_,
/* update_num_ops_stats */ true, &saved_value_, &pinned_value_,
&result_type, /* op_failure_scope */ nullptr);
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
&saved_value_, &pinned_value_, &result_type);
return SetValueAndColumnsFromMergeResult(s, result_type);
}

Expand Down
16 changes: 8 additions & 8 deletions db/memtable.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1061,8 +1061,8 @@ static bool SaveValue(void* arg, const char* entry) {
merge_operator, s->key->user_key(),
MergeHelper::kPlainBaseValue, v, merge_context->GetOperands(),
s->logger, s->statistics, s->clock,
/* update_num_ops_stats */ true, s->value, s->columns,
/* op_failure_scope */ nullptr);
/* update_num_ops_stats */ true, /* op_failure_scope */ nullptr,
s->value, s->columns);
}
} else if (s->value) {
s->value->assign(v.data(), v.size());
Expand Down Expand Up @@ -1114,8 +1114,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), MergeHelper::kWideBaseValue,
v, merge_context->GetOperands(), s->logger, s->statistics,
s->clock, /* update_num_ops_stats */ true, s->value, s->columns,
/* op_failure_scope */ nullptr);
s->clock, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, s->value, s->columns);
}
} else if (s->value) {
Slice value_of_default;
Expand Down Expand Up @@ -1152,8 +1152,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue,
merge_context->GetOperands(), s->logger, s->statistics,
s->clock, /* update_num_ops_stats */ true, s->value, s->columns,
/* op_failure_scope */ nullptr);
s->clock, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, s->value, s->columns);
} else {
// We have found a final value (a base deletion) and have newer
// merge operands that we do not intend to merge. Nothing remains
Expand Down Expand Up @@ -1192,8 +1192,8 @@ static bool SaveValue(void* arg, const char* entry) {
*(s->status) = MergeHelper::TimedFullMerge(
merge_operator, s->key->user_key(), MergeHelper::kNoBaseValue,
merge_context->GetOperands(), s->logger, s->statistics,
s->clock, /* update_num_ops_stats */ true, s->value, s->columns,
/* op_failure_scope */ nullptr);
s->clock, /* update_num_ops_stats */ true,
/* op_failure_scope */ nullptr, s->value, s->columns);
}

*(s->found_final_value) = true;
Expand Down
174 changes: 19 additions & 155 deletions db/merge_helper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
#include "db/blob/prefetch_buffer_collection.h"
#include "db/compaction/compaction_iteration_stats.h"
#include "db/dbformat.h"
#include "db/wide/wide_column_serialization.h"
#include "db/wide/wide_columns_helper.h"
#include "logging/logging.h"
#include "monitoring/perf_context_imp.h"
Expand Down Expand Up @@ -111,9 +110,9 @@ Status MergeHelper::TimedFullMergeImpl(
const MergeOperator* merge_operator, const Slice& key,
MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats, std::string* result,
Slice* result_operand, ValueType* result_type,
MergeOperator::OpFailureScope* op_failure_scope) {
SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, std::string* result,
Slice* result_operand, ValueType* result_type) {
assert(result);
assert(result_type);

Expand Down Expand Up @@ -173,9 +172,9 @@ Status MergeHelper::TimedFullMergeImpl(
const MergeOperator* merge_operator, const Slice& key,
MergeOperator::MergeOperationInputV3::ExistingValue&& existing_value,
const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats, std::string* result_value,
PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
SystemClock* clock, bool update_num_ops_stats,
MergeOperator::OpFailureScope* op_failure_scope, std::string* result_value,
PinnableWideColumns* result_entity) {
assert(result_value || result_entity);
assert(!result_value || !result_entity);

Expand Down Expand Up @@ -245,141 +244,6 @@ Status MergeHelper::TimedFullMergeImpl(
op_failure_scope, std::move(visitor));
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, NoBaseValueTag,
const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats, std::string* result,
Slice* result_operand, ValueType* result_type,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result, result_operand,
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
const Slice& value, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
std::string* result, Slice* result_operand, ValueType* result_type,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result, result_operand,
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
std::string* result, Slice* result_operand, ValueType* result_type,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

Slice entity_copy(entity);
WideColumns existing_columns;

const Status s =
WideColumnSerialization::Deserialize(entity_copy, existing_columns);
if (!s.ok()) {
return s;
}

existing_value = std::move(existing_columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result, result_operand,
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const WideColumns& columns, const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics, SystemClock* clock,
bool update_num_ops_stats, std::string* result, Slice* result_operand,
ValueType* result_type, MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result, result_operand,
result_type, op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, NoBaseValueTag,
const std::vector<Slice>& operands, Logger* logger, Statistics* statistics,
SystemClock* clock, bool update_num_ops_stats, std::string* result_value,
PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result_value, result_entity,
op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, PlainBaseValueTag,
const Slice& value, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
std::string* result_value, PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(value);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result_value, result_entity,
op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const Slice& entity, const std::vector<Slice>& operands, Logger* logger,
Statistics* statistics, SystemClock* clock, bool update_num_ops_stats,
std::string* result_value, PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value;

Slice entity_copy(entity);
WideColumns existing_columns;

const Status s =
WideColumnSerialization::Deserialize(entity_copy, existing_columns);
if (!s.ok()) {
return s;
}

existing_value = std::move(existing_columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result_value, result_entity,
op_failure_scope);
}

Status MergeHelper::TimedFullMerge(
const MergeOperator* merge_operator, const Slice& key, WideBaseValueTag,
const WideColumns& columns, const std::vector<Slice>& operands,
Logger* logger, Statistics* statistics, SystemClock* clock,
bool update_num_ops_stats, std::string* result_value,
PinnableWideColumns* result_entity,
MergeOperator::OpFailureScope* op_failure_scope) {
MergeOperator::MergeOperationInputV3::ExistingValue existing_value(columns);

return TimedFullMergeImpl(merge_operator, key, std::move(existing_value),
operands, logger, statistics, clock,
update_num_ops_stats, result_value, result_entity,
op_failure_scope);
}

// PRE: iter points to the first merge type entry
// POST: iter points to the first entry beyond the merge process (or the end)
// keys_, operands_ are updated to reflect the merge result.
Expand Down Expand Up @@ -519,14 +383,14 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
merge_context_.GetOperands(), logger_, stats_,
clock_, /* update_num_ops_stats */ false,
&merge_result, /* result_operand */ nullptr,
&merge_result_type, &op_failure_scope);
&op_failure_scope, &merge_result,
/* result_operand */ nullptr, &merge_result_type);
} else if (ikey.type == kTypeValue) {
s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
iter->value(), merge_context_.GetOperands(), logger_,
stats_, clock_, /* update_num_ops_stats */ false,
&merge_result, /* result_operand */ nullptr,
&merge_result_type, &op_failure_scope);
&op_failure_scope, &merge_result,
/* result_operand */ nullptr, &merge_result_type);
} else if (ikey.type == kTypeBlobIndex) {
BlobIndex blob_index;

Expand Down Expand Up @@ -559,20 +423,20 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
s = TimedFullMerge(user_merge_operator_, ikey.user_key, kPlainBaseValue,
blob_value, merge_context_.GetOperands(), logger_,
stats_, clock_, /* update_num_ops_stats */ false,
&merge_result, /* result_operand */ nullptr,
&merge_result_type, &op_failure_scope);
&op_failure_scope, &merge_result,
/* result_operand */ nullptr, &merge_result_type);
} else if (ikey.type == kTypeWideColumnEntity) {
s = TimedFullMerge(user_merge_operator_, ikey.user_key, kWideBaseValue,
iter->value(), merge_context_.GetOperands(), logger_,
stats_, clock_, /* update_num_ops_stats */ false,
&merge_result, /* result_operand */ nullptr,
&merge_result_type, &op_failure_scope);
&op_failure_scope, &merge_result,
/* result_operand */ nullptr, &merge_result_type);
} else {
s = TimedFullMerge(user_merge_operator_, ikey.user_key, kNoBaseValue,
merge_context_.GetOperands(), logger_, stats_,
clock_, /* update_num_ops_stats */ false,
&merge_result, /* result_operand */ nullptr,
&merge_result_type, &op_failure_scope);
&op_failure_scope, &merge_result,
/* result_operand */ nullptr, &merge_result_type);
}

// We store the result in keys_.back() and operands_.back()
Expand Down Expand Up @@ -714,9 +578,9 @@ Status MergeHelper::MergeUntil(InternalIterator* iter,
MergeOperator::OpFailureScope op_failure_scope;
s = TimedFullMerge(user_merge_operator_, orig_ikey.user_key, kNoBaseValue,
merge_context_.GetOperands(), logger_, stats_, clock_,
/* update_num_ops_stats */ false, &merge_result,
/* result_operand */ nullptr, &merge_result_type,
&op_failure_scope);
/* update_num_ops_stats */ false, &op_failure_scope,
&merge_result,
/* result_operand */ nullptr, &merge_result_type);
if (s.ok()) {
// The original key encountered
// We are certain that keys_ is not empty here (see assertions couple of
Expand Down
Loading

0 comments on commit 0ebe161

Please sign in to comment.