diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index 4e2d3b9ea81..42b111c53ab 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -48,7 +48,8 @@ std::unordered_map> FailPointHelper::f M(force_formal_page_file_not_exists) \ M(force_legacy_or_checkpoint_page_file_exists) \ M(exception_in_creating_set_input_stream) \ - M(exception_when_read_from_log) + M(exception_when_read_from_log) \ + M(exception_mpp_hash_build) #define APPLY_FOR_FAILPOINTS(M) \ M(force_set_page_file_write_errno) \ diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 8bf8ca56142..450b43f6344 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -17,7 +17,8 @@ namespace DB namespace FailPoints { extern const char exception_in_creating_set_input_stream[]; -} +extern const char exception_mpp_hash_build[]; +} // namespace FailPoints namespace ErrorCodes { extern const int SET_SIZE_LIMIT_EXCEEDED; @@ -108,9 +109,10 @@ void CreatingSetsBlockInputStream::createAll() for (auto & elem : subqueries_for_sets) { if (elem.second.join) - elem.second.join->setFinishBuildTable(false); + elem.second.join->setBuildTableState(Join::BuildTableState::WAITING); } } + Stopwatch watch; auto thread_manager = newThreadManager(); for (auto & subqueries_for_sets : subqueries_for_sets_list) { @@ -129,7 +131,11 @@ void CreatingSetsBlockInputStream::createAll() thread_manager->wait(); if (!exception_from_workers.empty()) + { + LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size()); std::rethrow_exception(exception_from_workers.front()); + } + LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds()); created = true; } @@ -137,19 +143,19 @@ void CreatingSetsBlockInputStream::createAll() void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { + auto log_msg = fmt::format("{} for task {}", + subquery.set ? "Creating set. " : subquery.join ? "Creating join. " + : subquery.table ? "Filling temporary table. " + : "null subquery", + mpp_task_id.toString()); + Stopwatch watch; try { - LOG_DEBUG(log, - (subquery.set ? "Creating set. " : "") - << (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task " - << mpp_task_id.toString()); - Stopwatch watch; - + LOG_FMT_DEBUG(log, "{}", log_msg); BlockOutputStreamPtr table_out; if (subquery.table) table_out = subquery.table->write({}, {}); - bool done_with_set = !subquery.set; bool done_with_join = !subquery.join; bool done_with_table = !subquery.table; @@ -164,7 +170,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { if (isCancelled()) { - LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation."); + LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation."); return; } @@ -209,7 +215,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) if (subquery.join) - subquery.join->setFinishBuildTable(true); + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); + subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED); + } if (table_out) table_out->writeSuffix(); @@ -243,20 +252,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) msg << "In " << watch.elapsedSeconds() << " sec. "; msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads "; - if (log != nullptr) - LOG_DEBUG(log, msg.rdbuf()); - else - LOG_DEBUG(log, msg.rdbuf()); + LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str()); } else { - LOG_DEBUG(log, "Subquery has empty result for task " << mpp_task_id.toString() << "."); + LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString()); } } - catch (std::exception & e) + catch (...) { std::unique_lock lock(exception_mutex); exception_from_workers.push_back(std::current_exception()); + if (subquery.join) + subquery.join->setBuildTableState(Join::BuildTableState::FAILED); + LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds()); } } diff --git a/dbms/src/Flash/tests/exchange_perftest.cpp b/dbms/src/Flash/tests/exchange_perftest.cpp index d627fa3f6e3..c9cbb257e13 100644 --- a/dbms/src/Flash/tests/exchange_perftest.cpp +++ b/dbms/src/Flash/tests/exchange_perftest.cpp @@ -458,7 +458,7 @@ struct ReceiverHelper { if (join_ptr) { - join_ptr->setFinishBuildTable(true); + join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED); std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl; } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 474527a32c1..7bd9097b934 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u , other_condition_ptr(other_condition_ptr_) , original_strictness(strictness) , max_block_size_for_cross_join(max_block_size_) - , have_finish_build(true) + , build_table_state(BuildTableState::SUCCEED) , log(getLogWithPrefix(log_, "Join")) , limits(limits) { @@ -96,10 +96,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u throw Exception("Not supported: non right join with right conditions"); } -void Join::setFinishBuildTable(bool finish_) +void Join::setBuildTableState(BuildTableState state_) { std::lock_guard lk(build_table_mutex); - have_finish_build = finish_; + build_table_state = state_; build_table_cv.notify_all(); } @@ -344,7 +344,7 @@ size_t Join::getTotalByteCount() const res += getTotalByteCountImpl(maps_all, type); res += getTotalByteCountImpl(maps_any_full, type); res += getTotalByteCountImpl(maps_all_full, type); - for (auto & pool : pools) + for (const auto & pool : pools) { /// note the return value might not be accurate since it does not use lock, but should be enough for current usage res += pool->size(); @@ -421,7 +421,7 @@ namespace { void insertRowToList(Join::RowRefList * list, Join::RowRefList * elem, Block * stored_block, size_t index) { - elem->next = list->next; + elem->next = list->next; // NOLINT(clang-analyzer-core.NullDereference) list->next = elem; elem->block = stored_block; elem->row_num = index; @@ -493,7 +493,7 @@ void NO_INLINE insertFromBlockImplTypeCase( if (rows_not_inserted_to_map) { /// for right/full out join, need to record the rows not inserted to map - auto elem = reinterpret_cast(pool.alloc(sizeof(Join::RowRefList))); + auto * elem = reinterpret_cast(pool.alloc(sizeof(Join::RowRefList))); insertRowToList(rows_not_inserted_to_map, elem, stored_block, i); } continue; @@ -536,9 +536,9 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( segment_index_info.resize(segment_size); } size_t rows_per_seg = rows / segment_index_info.size(); - for (size_t i = 0; i < segment_index_info.size(); i++) + for (auto & segment_index : segment_index_info) { - segment_index_info[i].reserve(rows_per_seg); + segment_index.reserve(rows_per_seg); } for (size_t i = 0; i < rows; i++) { @@ -567,16 +567,16 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock( { /// null value /// here ignore mutex because rows_not_inserted_to_map is privately owned by each stream thread - for (size_t i = 0; i < segment_index_info[segment_index].size(); i++) + for (auto index : segment_index_info[segment_index]) { /// for right/full out join, need to record the rows not inserted to map - auto elem = reinterpret_cast(pool.alloc(sizeof(Join::RowRefList))); - insertRowToList(rows_not_inserted_to_map, elem, stored_block, segment_index_info[segment_index][i]); + auto * elem = reinterpret_cast(pool.alloc(sizeof(Join::RowRefList))); + insertRowToList(rows_not_inserted_to_map, elem, stored_block, index); } } else { - std::lock_guard lk(map.getSegmentMutex(segment_index)); + std::lock_guard lk(map.getSegmentMutex(segment_index)); for (size_t i = 0; i < segment_index_info[segment_index].size(); i++) { Inserter::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers); @@ -705,7 +705,7 @@ void recordFilteredRows(const Block & block, const String & filter_column, Colum MutableColumnPtr mutable_null_map_holder = (*std::move(null_map_holder)).mutate(); PaddedPODArray & mutable_null_map = static_cast(*mutable_null_map_holder).getData(); - auto & nested_column = column->isColumnNullable() ? static_cast(*column).getNestedColumnPtr() : column; + const auto & nested_column = column->isColumnNullable() ? static_cast(*column).getNestedColumnPtr() : column; for (size_t i = 0, size = nested_column->size(); i < size; ++i) mutable_null_map[i] |= (!nested_column->getInt(i)); @@ -732,7 +732,7 @@ void Join::insertFromBlock(const Block & block, size_t stream_index) std::shared_lock lock(rwlock); Block * stored_block = nullptr; { - std::lock_guard lk(blocks_lock); + std::lock_guard lk(blocks_lock); blocks.push_back(block); stored_block = &blocks.back(); original_blocks.push_back(block); @@ -753,6 +753,7 @@ bool Join::insertFromBlockInternal(Block * stored_block, size_t stream_index) const Block & block = *stored_block; /// Rare case, when keys are constant. To avoid code bloat, simply materialize them. + /// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function. Columns materialized_columns; /// Memoize key columns to work. @@ -981,11 +982,11 @@ void NO_INLINE joinBlockImplTypeCase( hash_value = map.hash(key); segment_index = hash_value % map.getSegmentSize(); } - auto & internalMap = map.getSegmentTable(segment_index); + auto & internal_map = map.getSegmentTable(segment_index); /// do not require segment lock because in join, the hash table can not be changed in probe stage. - auto it = map.getSegmentSize() > 0 ? internalMap.find(key, hash_value) : internalMap.find(key); + auto it = map.getSegmentSize() > 0 ? internal_map.find(key, hash_value) : internal_map.find(key); - if (it != internalMap.end()) + if (it != internal_map.end()) { it->getMapped().setUsed(); Adder::addFound( @@ -1061,8 +1062,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector::Container & fi orig_filter_column = orig_filter_column->convertToFullColumnIfConst(); if (orig_filter_column->isColumnNullable()) { - auto * nullable_column = checkAndGetColumn(orig_filter_column.get()); - auto & nested_column_data = static_cast *>(nullable_column->getNestedColumnPtr().get())->getData(); + const auto * nullable_column = checkAndGetColumn(orig_filter_column.get()); + const auto & nested_column_data = static_cast *>(nullable_column->getNestedColumnPtr().get())->getData(); for (size_t i = 0; i < nullable_column->size(); i++) { if (filter_column[i] == 0) @@ -1075,8 +1076,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector::Container & fi } else { - auto * other_filter_column = checkAndGetColumn>(orig_filter_column.get()); - auto & other_filter_column_data = static_cast *>(other_filter_column)->getData(); + const auto * other_filter_column = checkAndGetColumn>(orig_filter_column.get()); + const auto & other_filter_column_data = static_cast *>(other_filter_column)->getData(); for (size_t i = 0; i < other_filter_column->size(); i++) filter_column[i] = filter_column[i] && other_filter_column_data[i]; } @@ -1100,7 +1101,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr auto filter_column = ColumnUInt8::create(); auto & filter = filter_column->getData(); - filter.assign(block.rows(), (UInt8)1); + filter.assign(block.rows(), static_cast(1)); if (!other_filter_column.empty()) { mergeNullAndFilterResult(block, filter, other_filter_column, false); @@ -1179,9 +1180,9 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr if (isLeftJoin(kind)) { /// for left join, convert right column to null if not joined - for (size_t i = 0; i < right_table_columns.size(); i++) + for (size_t right_table_column : right_table_columns) { - auto & column = block.getByPosition(right_table_columns[i]); + auto & column = block.getByPosition(right_table_column); auto full_column = column.column->isColumnConst() ? column.column->convertToFullColumnIfConst() : column.column; if (!full_column->isColumnNullable()) { @@ -1561,9 +1562,8 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[ { dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty(); } - for (size_t i = 0; i < result_blocks.size(); i++) + for (auto & current_block : result_blocks) { - auto & current_block = result_blocks[i]; if (current_block.rows() > 0) { for (size_t column = 0; column < current_block.columns(); column++) @@ -1613,7 +1613,6 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right) } } - void Join::joinBlock(Block & block) const { // std::cerr << "joinBlock: " << block.dumpStructure() << "\n"; @@ -1622,7 +1621,9 @@ void Join::joinBlock(Block & block) const { std::unique_lock lk(build_table_mutex); - build_table_cv.wait(lk, [&]() { return have_finish_build; }); + build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; }); + if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table + throw Exception("Build failed before join probe!"); } std::shared_lock lock(rwlock); @@ -1967,5 +1968,4 @@ BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sampl return std::make_shared(*this, left_sample_block, index, step, max_block_size); } - } // namespace DB diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 714a671ab26..0572038a550 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -142,7 +142,13 @@ class Join bool isBuildSetExceeded() const { return build_set_exceeded.load(); } size_t getNotJoinedStreamConcurrency() const { return build_concurrency; }; - void setFinishBuildTable(bool); + enum BuildTableState + { + WAITING, + FAILED, + SUCCEED + }; + void setBuildTableState(BuildTableState state_); /// Reference to the row in block. struct RowRef @@ -300,7 +306,7 @@ class Join mutable std::mutex build_table_mutex; mutable std::condition_variable build_table_cv; - bool have_finish_build; + BuildTableState build_table_state; const LogWithPrefixPtr log; diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 006d2abf77d..ebf09408e06 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) +## exception during mpp hash build +## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | id | estRows | task | access object | operator info | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | Projection | 0.99 | root | | test.t.id | +## | └─TableReader | 0.99 | root | | data:ExchangeSender | +## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough | +## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | | +## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo | +## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | | +## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo | +## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id | +## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | | +## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, | +## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) | +## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. +=> DBGInvoke __enable_fail_point(exception_mpp_hash_build) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered. +=> DBGInvoke __disable_fail_point(exception_mpp_hash_build) + # Clean up. mysql> drop table if exists test.t