Skip to content

Commit

Permalink
MPP: update the state of building a hash table when createOnce throw …
Browse files Browse the repository at this point in the history
…exceptions (#4202) (#4270)

close #4195
  • Loading branch information
ti-chi-bot authored Apr 25, 2022
1 parent 428e6c5 commit ef69b08
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 50 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log)
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
43 changes: 26 additions & 17 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -108,9 +109,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
Expand All @@ -129,27 +131,31 @@ void CreatingSetsBlockInputStream::createAll()
thread_manager->wait();

if (!exception_from_workers.empty())
{
LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds());

created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
auto log_msg = fmt::format("{} for task {}",
subquery.set ? "Creating set. " : subquery.join ? "Creating join. "
: subquery.table ? "Filling temporary table. "
: "null subquery",
mpp_task_id.toString());
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< mpp_task_id.toString());
Stopwatch watch;

LOG_FMT_DEBUG(log, "{}", log_msg);
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand All @@ -164,7 +170,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -209,7 +215,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -243,20 +252,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
msg << "In " << watch.elapsedSeconds() << " sec. ";
msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads ";

if (log != nullptr)
LOG_DEBUG(log, msg.rdbuf());
else
LOG_DEBUG(log, msg.rdbuf());
LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str());
}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << mpp_task_id.toString() << ".");
LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString());
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ struct ReceiverHelper
{
if (join_ptr)
{
join_ptr->setFinishBuildTable(true);
join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED);
std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl;
}
}
Expand Down
58 changes: 29 additions & 29 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
, other_condition_ptr(other_condition_ptr_)
, original_strictness(strictness)
, max_block_size_for_cross_join(max_block_size_)
, have_finish_build(true)
, build_table_state(BuildTableState::SUCCEED)
, log(getLogWithPrefix(log_, "Join"))
, limits(limits)
{
Expand All @@ -96,10 +96,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -344,7 +344,7 @@ size_t Join::getTotalByteCount() const
res += getTotalByteCountImpl(maps_all, type);
res += getTotalByteCountImpl(maps_any_full, type);
res += getTotalByteCountImpl(maps_all_full, type);
for (auto & pool : pools)
for (const auto & pool : pools)
{
/// note the return value might not be accurate since it does not use lock, but should be enough for current usage
res += pool->size();
Expand Down Expand Up @@ -421,7 +421,7 @@ namespace
{
void insertRowToList(Join::RowRefList * list, Join::RowRefList * elem, Block * stored_block, size_t index)
{
elem->next = list->next;
elem->next = list->next; // NOLINT(clang-analyzer-core.NullDereference)
list->next = elem;
elem->block = stored_block;
elem->row_num = index;
Expand Down Expand Up @@ -493,7 +493,7 @@ void NO_INLINE insertFromBlockImplTypeCase(
if (rows_not_inserted_to_map)
{
/// for right/full out join, need to record the rows not inserted to map
auto elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
auto * elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, i);
}
continue;
Expand Down Expand Up @@ -536,9 +536,9 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
segment_index_info.resize(segment_size);
}
size_t rows_per_seg = rows / segment_index_info.size();
for (size_t i = 0; i < segment_index_info.size(); i++)
for (auto & segment_index : segment_index_info)
{
segment_index_info[i].reserve(rows_per_seg);
segment_index.reserve(rows_per_seg);
}
for (size_t i = 0; i < rows; i++)
{
Expand Down Expand Up @@ -567,16 +567,16 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
{
/// null value
/// here ignore mutex because rows_not_inserted_to_map is privately owned by each stream thread
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
for (auto index : segment_index_info[segment_index])
{
/// for right/full out join, need to record the rows not inserted to map
auto elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, segment_index_info[segment_index][i]);
auto * elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, index);
}
}
else
{
std::lock_guard<std::mutex> lk(map.getSegmentMutex(segment_index));
std::lock_guard lk(map.getSegmentMutex(segment_index));
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
{
Inserter<STRICTNESS, typename Map::SegmentType::HashTable, KeyGetter>::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers);
Expand Down Expand Up @@ -705,7 +705,7 @@ void recordFilteredRows(const Block & block, const String & filter_column, Colum
MutableColumnPtr mutable_null_map_holder = (*std::move(null_map_holder)).mutate();
PaddedPODArray<UInt8> & mutable_null_map = static_cast<ColumnUInt8 &>(*mutable_null_map_holder).getData();

auto & nested_column = column->isColumnNullable() ? static_cast<const ColumnNullable &>(*column).getNestedColumnPtr() : column;
const auto & nested_column = column->isColumnNullable() ? static_cast<const ColumnNullable &>(*column).getNestedColumnPtr() : column;
for (size_t i = 0, size = nested_column->size(); i < size; ++i)
mutable_null_map[i] |= (!nested_column->getInt(i));

Expand All @@ -732,7 +732,7 @@ void Join::insertFromBlock(const Block & block, size_t stream_index)
std::shared_lock lock(rwlock);
Block * stored_block = nullptr;
{
std::lock_guard<std::mutex> lk(blocks_lock);
std::lock_guard lk(blocks_lock);
blocks.push_back(block);
stored_block = &blocks.back();
original_blocks.push_back(block);
Expand All @@ -753,6 +753,7 @@ bool Join::insertFromBlockInternal(Block * stored_block, size_t stream_index)
const Block & block = *stored_block;

/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
/// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function.
Columns materialized_columns;

/// Memoize key columns to work.
Expand Down Expand Up @@ -981,11 +982,11 @@ void NO_INLINE joinBlockImplTypeCase(
hash_value = map.hash(key);
segment_index = hash_value % map.getSegmentSize();
}
auto & internalMap = map.getSegmentTable(segment_index);
auto & internal_map = map.getSegmentTable(segment_index);
/// do not require segment lock because in join, the hash table can not be changed in probe stage.
auto it = map.getSegmentSize() > 0 ? internalMap.find(key, hash_value) : internalMap.find(key);
auto it = map.getSegmentSize() > 0 ? internal_map.find(key, hash_value) : internal_map.find(key);

if (it != internalMap.end())
if (it != internal_map.end())
{
it->getMapped().setUsed();
Adder<KIND, STRICTNESS, Map>::addFound(
Expand Down Expand Up @@ -1061,8 +1062,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
orig_filter_column = orig_filter_column->convertToFullColumnIfConst();
if (orig_filter_column->isColumnNullable())
{
auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
const auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
const auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
for (size_t i = 0; i < nullable_column->size(); i++)
{
if (filter_column[i] == 0)
Expand All @@ -1075,8 +1076,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
}
else
{
auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
const auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
const auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
for (size_t i = 0; i < other_filter_column->size(); i++)
filter_column[i] = filter_column[i] && other_filter_column_data[i];
}
Expand All @@ -1100,7 +1101,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>

auto filter_column = ColumnUInt8::create();
auto & filter = filter_column->getData();
filter.assign(block.rows(), (UInt8)1);
filter.assign(block.rows(), static_cast<UInt8>(1));
if (!other_filter_column.empty())
{
mergeNullAndFilterResult(block, filter, other_filter_column, false);
Expand Down Expand Up @@ -1179,9 +1180,9 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
if (isLeftJoin(kind))
{
/// for left join, convert right column to null if not joined
for (size_t i = 0; i < right_table_columns.size(); i++)
for (size_t right_table_column : right_table_columns)
{
auto & column = block.getByPosition(right_table_columns[i]);
auto & column = block.getByPosition(right_table_column);
auto full_column = column.column->isColumnConst() ? column.column->convertToFullColumnIfConst() : column.column;
if (!full_column->isColumnNullable())
{
Expand Down Expand Up @@ -1561,9 +1562,8 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
for (size_t i = 0; i < result_blocks.size(); i++)
for (auto & current_block : result_blocks)
{
auto & current_block = result_blocks[i];
if (current_block.rows() > 0)
{
for (size_t column = 0; column < current_block.columns(); column++)
Expand Down Expand Up @@ -1613,7 +1613,6 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right)
}
}


void Join::joinBlock(Block & block) const
{
// std::cerr << "joinBlock: " << block.dumpStructure() << "\n";
Expand All @@ -1622,7 +1621,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down Expand Up @@ -1967,5 +1968,4 @@ BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sampl
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, index, step, max_block_size);
}


} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -300,7 +306,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

const LogWithPrefixPtr log;

Expand Down
Loading

0 comments on commit ef69b08

Please sign in to comment.