From c1f6ab07b9bcec6eee14df7d4f63438f620fb3b2 Mon Sep 17 00:00:00 2001 From: Ti Chi Robot Date: Fri, 15 Apr 2022 00:52:36 +0800 Subject: [PATCH] MPP: update the state of building a hash table when createOnce throw exceptions (#4202) (#4268) close pingcap/tiflash#4195 --- dbms/src/Common/FailPoint.cpp | 3 +- .../CreatingSetsBlockInputStream.cpp | 59 ++-- dbms/src/Interpreters/Join.cpp | 307 ++++++++---------- dbms/src/Interpreters/Join.h | 38 +-- tests/fullstack-test/mpp/mpp_fail.test | 32 ++ 5 files changed, 216 insertions(+), 223 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index a989c51d03f..87ae6cda5e4 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -47,7 +47,8 @@ std::unordered_map> FailPointHelper::f M(segment_merge_after_ingest_packs) \ M(force_formal_page_file_not_exists) \ M(force_legacy_or_checkpoint_page_file_exists) \ - M(exception_in_creating_set_input_stream) + M(exception_in_creating_set_input_stream) \ + M(exception_mpp_hash_build) #define APPLY_FOR_FAILPOINTS(M) \ M(force_set_page_file_write_errno) \ diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 52f6433ce07..d964c6ddc20 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -3,7 +3,6 @@ #include #include #include -#include #include #include #include @@ -13,25 +12,24 @@ namespace DB { + namespace FailPoints { extern const char exception_in_creating_set_input_stream[]; -} +extern const char exception_mpp_hash_build[]; +} // namespace FailPoints namespace ErrorCodes { extern const int SET_SIZE_LIMIT_EXCEEDED; } -CreatingSetsBlockInputStream::CreatingSetsBlockInputStream( - const BlockInputStreamPtr & input, - std::vector && subqueries_for_sets_list_, - const SizeLimits & network_transfer_limits, - Int64 mpp_task_id_, - const LogWithPrefixPtr & log_) +CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input, + std::vector && subqueries_for_sets_list_, + const SizeLimits & network_transfer_limits, + Int64 mpp_task_id_) : subqueries_for_sets_list(std::move(subqueries_for_sets_list_)) , network_transfer_limits(network_transfer_limits) , mpp_task_id(mpp_task_id_) - , log(getMPPTaskLog(log_, getName(), mpp_task_id)) { init(input); } @@ -39,10 +37,8 @@ CreatingSetsBlockInputStream::CreatingSetsBlockInputStream( CreatingSetsBlockInputStream::CreatingSetsBlockInputStream( const BlockInputStreamPtr & input, const SubqueriesForSets & subqueries_for_sets, - const SizeLimits & network_transfer_limits, - const LogWithPrefixPtr & log_) + const SizeLimits & network_transfer_limits) : network_transfer_limits(network_transfer_limits) - , log(getMPPTaskLog(log_, getName(), mpp_task_id)) { subqueries_for_sets_list.push_back(subqueries_for_sets); init(input); @@ -107,9 +103,10 @@ void CreatingSetsBlockInputStream::createAll() for (auto & elem : subqueries_for_sets) { if (elem.second.join) - elem.second.join->setFinishBuildTable(false); + elem.second.join->setBuildTableState(Join::BuildTableState::WAITING); } } + Stopwatch watch; for (auto & subqueries_for_sets : subqueries_for_sets_list) { for (auto & elem : subqueries_for_sets) @@ -118,7 +115,7 @@ void CreatingSetsBlockInputStream::createAll() { if (isCancelledOrThrowIfKilled()) return; - workers.emplace_back(ThreadFactory(true, "CreatingSets").newThread([this, &subquery = elem.second] { createOne(subquery); })); + workers.emplace_back(ThreadFactory().newThread([this, &subquery = elem.second] { createOne(subquery); })); FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream); } } @@ -129,27 +126,34 @@ void CreatingSetsBlockInputStream::createAll() } if (!exception_from_workers.empty()) + { + LOG_ERROR(log, + "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) + << " sec with exception and rethrow the first of total " << exception_from_workers.size() + << " exceptions."); std::rethrow_exception(exception_from_workers.front()); - + } + LOG_DEBUG( + log, + "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. "); created = true; } } void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { + Stopwatch watch; try { LOG_DEBUG(log, (subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task " << std::to_string(mpp_task_id)); - Stopwatch watch; BlockOutputStreamPtr table_out; if (subquery.table) table_out = subquery.table->write({}, {}); - bool done_with_set = !subquery.set; bool done_with_join = !subquery.join; bool done_with_table = !subquery.table; @@ -209,7 +213,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) if (subquery.join) - subquery.join->setFinishBuildTable(true); + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); + subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED); + } if (table_out) table_out->writeSuffix(); @@ -242,21 +249,23 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) msg << "In " << watch.elapsedSeconds() << " sec. "; msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads "; - - if (log != nullptr) - LOG_DEBUG(log, msg.rdbuf()); - else - LOG_DEBUG(log, msg.rdbuf()); + msg << "for task " << std::to_string(mpp_task_id) << "."; + LOG_DEBUG(log, msg.rdbuf()); } else { - LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << "."); + LOG_DEBUG(log, "Subquery has empty result for task" << std::to_string(mpp_task_id)); } } - catch (std::exception & e) + catch (...) { std::unique_lock lock(exception_mutex); exception_from_workers.push_back(std::current_exception()); + if (subquery.join) + subquery.join->setBuildTableState(Join::BuildTableState::FAILED); + LOG_ERROR(log, + "task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In " + << std::to_string(watch.elapsedSeconds()) << " sec. "); } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 474527a32c1..3b5bf2a509c 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -18,6 +18,7 @@ namespace DB { + namespace ErrorCodes { extern const int UNKNOWN_SET_DATA_VARIANT; @@ -50,12 +51,12 @@ static bool isAntiJoin(ASTTableJoin::Kind kind) } static bool isCrossJoin(ASTTableJoin::Kind kind) { - return kind == ASTTableJoin::Kind::Cross || kind == ASTTableJoin::Kind::Cross_Left - || kind == ASTTableJoin::Kind::Cross_Right || kind == ASTTableJoin::Kind::Cross_Anti; + return kind == ASTTableJoin::Kind::Cross || kind == ASTTableJoin::Kind::Cross_Left || kind == ASTTableJoin::Kind::Cross_Right + || kind == ASTTableJoin::Kind::Cross_Anti; } -Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits, ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, size_t build_concurrency_, const TiDB::TiDBCollators & collators_, const String & left_filter_column_, const String & right_filter_column_, const String & other_filter_column_, const String & other_eq_filter_from_in_column_, ExpressionActionsPtr other_condition_ptr_, size_t max_block_size_, const LogWithPrefixPtr & log_) +Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits, ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, size_t build_concurrency_, const TiDB::TiDBCollators & collators_, const String & left_filter_column_, const String & right_filter_column_, const String & other_filter_column_, const String & other_eq_filter_from_in_column_, ExpressionActionsPtr other_condition_ptr_, size_t max_block_size_) : kind(kind_) , strictness(strictness_) , key_names_left(key_names_left_) @@ -70,8 +71,8 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u , other_condition_ptr(other_condition_ptr_) , original_strictness(strictness) , max_block_size_for_cross_join(max_block_size_) - , have_finish_build(true) - , log(getLogWithPrefix(log_, "Join")) + , build_table_state(BuildTableState::SUCCEED) + , log(&Logger::get("Join")) , limits(limits) { build_set_exceeded.store(false); @@ -96,10 +97,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u throw Exception("Not supported: non right join with right conditions"); } -void Join::setFinishBuildTable(bool finish_) +void Join::setBuildTableState(BuildTableState state_) { std::lock_guard lk(build_table_mutex); - have_finish_build = finish_; + build_table_state = state_; build_table_cv.notify_all(); } @@ -151,7 +152,8 @@ Join::Type Join::chooseMethod(const ColumnRawPtrs & key_columns, Sizes & key_siz /// If there is single string key, use hash table of it's values. if (keys_size == 1 && (typeid_cast(key_columns[0]) - || (key_columns[0]->isColumnConst() && typeid_cast(&static_cast(key_columns[0])->getDataColumn())))) + || (key_columns[0]->isColumnConst() + && typeid_cast(&static_cast(key_columns[0])->getDataColumn())))) return Type::key_string; if (keys_size == 1 && typeid_cast(key_columns[0])) @@ -437,7 +439,13 @@ struct Inserter template struct Inserter { - static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, std::vector & sort_key_container) + static void insert( + Map & map, + KeyGetter & key_getter, + Block * stored_block, + size_t i, + Arena & pool, + std::vector & sort_key_container) { auto emplace_result = key_getter.emplaceKey(map, i, pool, sort_key_container); @@ -450,7 +458,13 @@ template struct Inserter { using MappedType = typename Map::mapped_type; - static void insert(Map & map, KeyGetter & key_getter, Block * stored_block, size_t i, Arena & pool, std::vector & sort_key_container) + static void insert( + Map & map, + KeyGetter & key_getter, + Block * stored_block, + size_t i, + Arena & pool, + std::vector & sort_key_container) { auto emplace_result = key_getter.emplaceKey(map, i, pool, sort_key_container); @@ -470,17 +484,7 @@ struct Inserter template -void NO_INLINE insertFromBlockImplTypeCase( - Map & map, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - const TiDB::TiDBCollators & collators, - Block * stored_block, - ConstNullMapPtr null_map, - Join::RowRefList * rows_not_inserted_to_map, - size_t, - Arena & pool) +void NO_INLINE insertFromBlockImplTypeCase(Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const TiDB::TiDBCollators & collators, Block * stored_block, ConstNullMapPtr null_map, Join::RowRefList * rows_not_inserted_to_map, size_t, Arena & pool) { KeyGetter key_getter(key_columns, key_sizes, collators); std::vector sort_key_containers; @@ -499,22 +503,18 @@ void NO_INLINE insertFromBlockImplTypeCase( continue; } - Inserter::insert(map.getSegmentTable(0), key_getter, stored_block, i, pool, sort_key_containers); + Inserter::insert( + map.getSegmentTable(0), + key_getter, + stored_block, + i, + pool, + sort_key_containers); } } template -void NO_INLINE insertFromBlockImplTypeCaseWithLock( - Map & map, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - const TiDB::TiDBCollators & collators, - Block * stored_block, - ConstNullMapPtr null_map, - Join::RowRefList * rows_not_inserted_to_map, - size_t stream_index, - Arena & pool) +void NO_INLINE insertFromBlockImplTypeCaseWithLock(Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const TiDB::TiDBCollators & collators, Block * stored_block, ConstNullMapPtr null_map, Join::RowRefList * rows_not_inserted_to_map, size_t stream_index, Arena & pool) { KeyGetter key_getter(key_columns, key_sizes, collators); std::vector sort_key_containers; @@ -579,7 +579,12 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( std::lock_guard lk(map.getSegmentMutex(segment_index)); for (size_t i = 0; i < segment_index_info[segment_index].size(); i++) { - Inserter::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers); + Inserter::insert(map.getSegmentTable(segment_index), + key_getter, + stored_block, + segment_index_info[segment_index][i], + pool, + sort_key_containers); } } } @@ -587,58 +592,75 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( template -void insertFromBlockImplType( - Map & map, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - const TiDB::TiDBCollators & collators, - Block * stored_block, - ConstNullMapPtr null_map, - Join::RowRefList * rows_not_inserted_to_map, - size_t stream_index, - size_t insert_concurrency, - Arena & pool) +void insertFromBlockImplType(Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const TiDB::TiDBCollators & collators, Block * stored_block, ConstNullMapPtr null_map, Join::RowRefList * rows_not_inserted_to_map, size_t stream_index, size_t insert_concurrency, Arena & pool) { if (null_map) { if (insert_concurrency > 1) { - insertFromBlockImplTypeCaseWithLock(map, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map, stream_index, pool); + insertFromBlockImplTypeCaseWithLock( + map, + rows, + key_columns, + key_sizes, + collators, + stored_block, + null_map, + rows_not_inserted_to_map, + stream_index, + pool); } else { - insertFromBlockImplTypeCase(map, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map, stream_index, pool); + insertFromBlockImplTypeCase( + map, + rows, + key_columns, + key_sizes, + collators, + stored_block, + null_map, + rows_not_inserted_to_map, + stream_index, + pool); } } else { if (insert_concurrency > 1) { - insertFromBlockImplTypeCaseWithLock(map, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map, stream_index, pool); + insertFromBlockImplTypeCaseWithLock( + map, + rows, + key_columns, + key_sizes, + collators, + stored_block, + null_map, + rows_not_inserted_to_map, + stream_index, + pool); } else { - insertFromBlockImplTypeCase(map, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map, stream_index, pool); + insertFromBlockImplTypeCase( + map, + rows, + key_columns, + key_sizes, + collators, + stored_block, + null_map, + rows_not_inserted_to_map, + stream_index, + pool); } } } template -void insertFromBlockImpl( - Join::Type type, - Maps & maps, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - const TiDB::TiDBCollators & collators, - Block * stored_block, - ConstNullMapPtr null_map, - Join::RowRefList * rows_not_inserted_to_map, - size_t stream_index, - size_t insert_concurrency, - Arena & pool) +void insertFromBlockImpl(Join::Type type, Maps & maps, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, const TiDB::TiDBCollators & collators, Block * stored_block, ConstNullMapPtr null_map, Join::RowRefList * rows_not_inserted_to_map, size_t stream_index, size_t insert_concurrency, Arena & pool) { switch (type) { @@ -647,20 +669,10 @@ void insertFromBlockImpl( case Join::Type::CROSS: break; /// Do nothing. We have already saved block, and it is enough. -#define M(TYPE) \ - case Join::Type::TYPE: \ - insertFromBlockImplType>::Type>( \ - *maps.TYPE, \ - rows, \ - key_columns, \ - key_sizes, \ - collators, \ - stored_block, \ - null_map, \ - rows_not_inserted_to_map, \ - stream_index, \ - insert_concurrency, \ - pool); \ +#define M(TYPE) \ + case Join::Type::TYPE: \ + insertFromBlockImplType>::Type>(*maps.TYPE, rows, key_columns, key_sizes, collators, stored_block, null_map, rows_not_inserted_to_map, stream_index, insert_concurrency, pool); \ break; APPLY_FOR_JOIN_VARIANTS(M) #undef M @@ -901,7 +913,8 @@ struct Adder static void addFound(const typename Map::SegmentType::HashTable::ConstLookupResult & it, size_t num_columns_to_add, MutableColumns & added_columns, size_t i, IColumn::Filter * filter, IColumn::Offset & current_offset, IColumn::Offsets * offsets, const std::vector & right_indexes) { size_t rows_joined = 0; - for (auto current = &static_cast(it->getMapped()); current != nullptr; current = current->next) + for (auto current = &static_cast(it->getMapped()); current != nullptr; + current = current->next) { for (size_t j = 0; j < num_columns_to_add; ++j) added_columns[j]->insertFrom(*current->block->getByPosition(right_indexes[j]).column.get(), current->row_num); @@ -938,18 +951,7 @@ struct Adder }; template -void NO_INLINE joinBlockImplTypeCase( - const Map & map, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - MutableColumns & added_columns, - ConstNullMapPtr null_map, - std::unique_ptr & filter, - IColumn::Offset & current_offset, - std::unique_ptr & offsets_to_replicate, - const std::vector & right_indexes, - const TiDB::TiDBCollators & collators) +void NO_INLINE joinBlockImplTypeCase(const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, MutableColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr & filter, IColumn::Offset & current_offset, std::unique_ptr & offsets_to_replicate, const std::vector & right_indexes, const TiDB::TiDBCollators & collators) { size_t num_columns_to_add = right_indexes.size(); @@ -1012,49 +1014,20 @@ void NO_INLINE joinBlockImplTypeCase( } template -void joinBlockImplType( - const Map & map, - size_t rows, - const ColumnRawPtrs & key_columns, - const Sizes & key_sizes, - MutableColumns & added_columns, - ConstNullMapPtr null_map, - std::unique_ptr & filter, - IColumn::Offset & current_offset, - std::unique_ptr & offsets_to_replicate, - const std::vector & right_indexes, - const TiDB::TiDBCollators & collators) +void joinBlockImplType(const Map & map, size_t rows, const ColumnRawPtrs & key_columns, const Sizes & key_sizes, MutableColumns & added_columns, ConstNullMapPtr null_map, std::unique_ptr & filter, IColumn::Offset & current_offset, std::unique_ptr & offsets_to_replicate, const std::vector & right_indexes, const TiDB::TiDBCollators & collators) { if (null_map) - joinBlockImplTypeCase( - map, - rows, - key_columns, - key_sizes, - added_columns, - null_map, - filter, - current_offset, - offsets_to_replicate, - right_indexes, - collators); + joinBlockImplTypeCase(map, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators); else - joinBlockImplTypeCase( - map, - rows, - key_columns, - key_sizes, - added_columns, - null_map, - filter, - current_offset, - offsets_to_replicate, - right_indexes, - collators); + joinBlockImplTypeCase(map, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators); } } // namespace -void mergeNullAndFilterResult(Block & block, ColumnVector::Container & filter_column, const String & filter_column_name, bool null_as_true) +void mergeNullAndFilterResult( + Block & block, + ColumnVector::Container & filter_column, + const String & filter_column_name, + bool null_as_true) { auto orig_filter_column = block.getByName(filter_column_name).column; if (orig_filter_column->isColumnConst()) @@ -1312,20 +1285,9 @@ void Join::joinBlockImpl(Block & block, const Maps & maps) const switch (type) { -#define M(TYPE) \ - case Join::Type::TYPE: \ - joinBlockImplType>::Type>( \ - *maps.TYPE, \ - rows, \ - key_columns, \ - key_sizes, \ - added_columns, \ - null_map, \ - filter, \ - current_offset, \ - offsets_to_replicate, \ - right_indexes, \ - collators); \ +#define M(TYPE) \ + case Join::Type::TYPE: \ + joinBlockImplType>::Type>(*maps.TYPE, rows, key_columns, key_sizes, added_columns, null_map, filter, current_offset, offsets_to_replicate, right_indexes, collators); \ break; APPLY_FOR_JOIN_VARIANTS(M) #undef M @@ -1406,10 +1368,7 @@ struct CrossJoinAdder (*is_row_matched)[i - start_offset] = 0; (*expanded_row_size_after_join)[i - start_offset] = current_offset; } - static bool allRightRowsMaybeAdded() - { - return STRICTNESS == ASTTableJoin::Strictness::All; - } + static bool allRightRowsMaybeAdded() { return STRICTNESS == ASTTableJoin::Strictness::All; } }; template struct CrossJoinAdder @@ -1429,10 +1388,7 @@ struct CrossJoinAdder for (size_t col_num = 0; col_num < num_columns_to_add; ++col_num) dst_columns[num_existing_columns + col_num]->insertDefault(); } - static bool allRightRowsMaybeAdded() - { - return STRICTNESS == ASTTableJoin::Strictness::All; - } + static bool allRightRowsMaybeAdded() { return STRICTNESS == ASTTableJoin::Strictness::All; } }; template <> struct CrossJoinAdder @@ -1446,10 +1402,7 @@ struct CrossJoinAdder::addNotFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, is_row_matched, current_offset, expanded_row_size_after_join); } - static bool allRightRowsMaybeAdded() - { - return false; - } + static bool allRightRowsMaybeAdded() { return false; } }; template <> struct CrossJoinAdder @@ -1462,10 +1415,7 @@ struct CrossJoinAdder::addNotFound(dst_columns, num_existing_columns, src_left_columns, num_columns_to_add, start_offset, i, is_row_matched, current_offset, expanded_row_size_after_join); } - static bool allRightRowsMaybeAdded() - { - return true; - } + static bool allRightRowsMaybeAdded() { return true; } }; } // namespace @@ -1606,8 +1556,7 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) DataTypePtr right_type = removeNullable(block_right.getByName(key_names_right[i]).type); if (!left_type->equals(*right_type)) - throw Exception("Type mismatch of columns to JOIN by: " - + key_names_left[i] + " " + left_type->getName() + " at left, " + throw Exception("Type mismatch of columns to JOIN by: " + key_names_left[i] + " " + left_type->getName() + " at left, " + key_names_right[i] + " " + right_type->getName() + " at right", ErrorCodes::TYPE_MISMATCH); } @@ -1622,7 +1571,9 @@ void Join::joinBlock(Block & block) const { std::unique_lock lk(build_table_mutex); - build_table_cv.wait(lk, [&]() { return have_finish_build; }); + build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; }); + if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table + throw Exception("Build failed before join probe!"); } std::shared_lock lock(rwlock); @@ -1685,9 +1636,7 @@ void Join::joinTotals(Block & block) const for (size_t i = 0; i < sample_block_with_columns_to_add.columns(); ++i) { const auto & col = sample_block_with_columns_to_add.getByPosition(i); - block.insert({col.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst(), - col.type, - col.name}); + block.insert({col.type->createColumnConstWithDefaultValue(1)->convertToFullColumnIfConst(), col.type, col.name}); } } } @@ -1890,11 +1839,7 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream template - size_t fillColumns(const Map & map, - size_t num_columns_left, - MutableColumns & mutable_columns_left, - size_t num_columns_right, - MutableColumns & mutable_columns_right) + size_t fillColumns(const Map & map, size_t num_columns_left, MutableColumns & mutable_columns_left, size_t num_columns_right, MutableColumns & mutable_columns_right) { size_t rows_added = 0; size_t key_num = parent.key_names_right.size(); @@ -1905,8 +1850,9 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream mutable_columns_left[j]->insertDefault(); for (size_t j = 0; j < num_columns_right; ++j) - mutable_columns_right[j]->insertFrom(*current_not_mapped_row->block->getByPosition(key_num + j).column.get(), - current_not_mapped_row->row_num); + mutable_columns_right[j]->insertFrom( + *current_not_mapped_row->block->getByPosition(key_num + j).column.get(), + current_not_mapped_row->row_num); current_not_mapped_row = current_not_mapped_row->next; setNextCurrentNotMappedRow(); @@ -1936,10 +1882,9 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream do { current_segment += step; - position = decltype(position)( - static_cast(new typename Map::SegmentType::HashTable::const_iterator( - map.getSegmentTable(current_segment).begin())), - [](void * ptr) { delete reinterpret_cast(ptr); }); + position = decltype(position)(static_cast(new typename Map::SegmentType::HashTable::const_iterator( + map.getSegmentTable(current_segment).begin())), + [](void * ptr) { delete reinterpret_cast(ptr); }); it = reinterpret_cast(position.get()); end = map.getSegmentTable(current_segment).end(); } while (*it == end && current_segment < map.getSegmentSize() - step); @@ -1949,7 +1894,13 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream if ((*it)->getMapped().getUsed()) continue; - rows_added += AdderNonJoined::add((*it)->getMapped(), key_num, num_columns_left, mutable_columns_left, num_columns_right, mutable_columns_right); + rows_added += AdderNonJoined::add( + (*it)->getMapped(), + key_num, + num_columns_left, + mutable_columns_left, + num_columns_right, + mutable_columns_right); if (rows_added >= max_block_size) { @@ -1962,7 +1913,11 @@ class NonJoinedBlockInputStream : public IProfilingBlockInputStream }; -BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sample_block, size_t index, size_t step, size_t max_block_size) const +BlockInputStreamPtr Join::createStreamWithNonJoinedRows( + const Block & left_sample_block, + size_t index, + size_t step, + size_t max_block_size) const { return std::make_shared(*this, left_sample_block, index, step, max_block_size); } diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 714a671ab26..c02747ddd18 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -5,7 +5,6 @@ #include #include #include -#include #include #include #include @@ -19,6 +18,7 @@ namespace DB { + /** Data structure for implementation of JOIN. * It is just a hash table: keys -> rows of joined ("right") table. * Additionally, CROSS JOIN is supported: instead of hash table, it use just set of blocks without keys. @@ -78,21 +78,7 @@ namespace DB class Join { public: - Join(const Names & key_names_left_, - const Names & key_names_right_, - bool use_nulls_, - const SizeLimits & limits, - ASTTableJoin::Kind kind_, - ASTTableJoin::Strictness strictness_, - size_t build_concurrency = 1, - const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators, - const String & left_filter_column = "", - const String & right_filter_column = "", - const String & other_filter_column = "", - const String & other_eq_filter_from_in_column = "", - ExpressionActionsPtr other_condition_ptr = nullptr, - size_t max_block_size = 0, - const LogWithPrefixPtr & log_ = nullptr); + Join(const Names & key_names_left_, const Names & key_names_right_, bool use_nulls_, const SizeLimits & limits, ASTTableJoin::Kind kind_, ASTTableJoin::Strictness strictness_, size_t build_concurrency = 1, const TiDB::TiDBCollators & collators_ = TiDB::dummy_collators, const String & left_filter_column = "", const String & right_filter_column = "", const String & other_filter_column = "", const String & other_eq_filter_from_in_column = "", ExpressionActionsPtr other_condition_ptr = nullptr, size_t max_block_size = 0); bool empty() { return type == Type::EMPTY; } @@ -118,7 +104,7 @@ class Join /** Keep "totals" (separate part of dataset, see WITH TOTALS) to use later. */ void setTotals(const Block & block) { totals = block; } - bool hasTotals() const { return static_cast(totals); }; + bool hasTotals() const { return totals; }; void joinTotals(Block & block) const; @@ -127,7 +113,11 @@ class Join * Use only after all calls to joinBlock was done. * left_sample_block is passed without account of 'use_nulls' setting (columns will be converted to Nullable inside). */ - BlockInputStreamPtr createStreamWithNonJoinedRows(const Block & left_sample_block, size_t index, size_t step, size_t max_block_size) const; + BlockInputStreamPtr createStreamWithNonJoinedRows( + const Block & left_sample_block, + size_t index, + size_t step, + size_t max_block_size) const; /// Number of keys in all built JOIN maps. size_t getTotalRowCount() const; @@ -142,7 +132,13 @@ class Join bool isBuildSetExceeded() const { return build_set_exceeded.load(); } size_t getNotJoinedStreamConcurrency() const { return build_concurrency; }; - void setFinishBuildTable(bool); + enum BuildTableState + { + WAITING, + FAILED, + SUCCEED + }; + void setBuildTableState(BuildTableState state_); /// Reference to the row in block. struct RowRef @@ -300,9 +296,9 @@ class Join mutable std::mutex build_table_mutex; mutable std::condition_variable build_table_cv; - bool have_finish_build; + BuildTableState build_table_state; - const LogWithPrefixPtr log; + Poco::Logger * log; /// Limits for maximum map size. SizeLimits limits; diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 006d2abf77d..ebf09408e06 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) +## exception during mpp hash build +## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | id | estRows | task | access object | operator info | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | Projection | 0.99 | root | | test.t.id | +## | └─TableReader | 0.99 | root | | data:ExchangeSender | +## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough | +## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | | +## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo | +## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | | +## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo | +## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id | +## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | | +## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, | +## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) | +## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. +=> DBGInvoke __enable_fail_point(exception_mpp_hash_build) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered. +=> DBGInvoke __disable_fail_point(exception_mpp_hash_build) + # Clean up. mysql> drop table if exists test.t