From 79c0df6fd4d992c6db2b3751b2b2ffadc6ef50a7 Mon Sep 17 00:00:00 2001 From: Zhuhe Fang Date: Mon, 14 Mar 2022 22:51:51 +0800 Subject: [PATCH] This is an automated cherry-pick of #4202 Signed-off-by: ti-chi-bot --- dbms/src/Common/FailPoint.cpp | 6 +++ .../CreatingSetsBlockInputStream.cpp | 46 +++++++++++++++---- dbms/src/Flash/tests/exchange_perftest.cpp | 2 +- dbms/src/Interpreters/Join.cpp | 10 ++-- dbms/src/Interpreters/Join.h | 10 +++- tests/fullstack-test/mpp/mpp_fail.test | 32 +++++++++++++ 6 files changed, 89 insertions(+), 17 deletions(-) diff --git a/dbms/src/Common/FailPoint.cpp b/dbms/src/Common/FailPoint.cpp index a989c51d03f..5c35d547ac1 100644 --- a/dbms/src/Common/FailPoint.cpp +++ b/dbms/src/Common/FailPoint.cpp @@ -47,7 +47,13 @@ std::unordered_map> FailPointHelper::f M(segment_merge_after_ingest_packs) \ M(force_formal_page_file_not_exists) \ M(force_legacy_or_checkpoint_page_file_exists) \ +<<<<<<< HEAD M(exception_in_creating_set_input_stream) +======= + M(exception_in_creating_set_input_stream) \ + M(exception_when_read_from_log) \ + M(exception_mpp_hash_build) +>>>>>>> cc8a5c51b7 (MPP: update the state of building a hash table when createOnce throw exceptions (#4202)) #define APPLY_FOR_FAILPOINTS(M) \ M(force_set_page_file_write_errno) \ diff --git a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp index 52f6433ce07..92571875d20 100644 --- a/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp +++ b/dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp @@ -16,7 +16,8 @@ namespace DB namespace FailPoints { extern const char exception_in_creating_set_input_stream[]; -} +extern const char exception_mpp_hash_build[]; +} // namespace FailPoints namespace ErrorCodes { extern const int SET_SIZE_LIMIT_EXCEEDED; @@ -107,9 +108,14 @@ void CreatingSetsBlockInputStream::createAll() for (auto & elem : subqueries_for_sets) { if (elem.second.join) - elem.second.join->setFinishBuildTable(false); + elem.second.join->setBuildTableState(Join::BuildTableState::WAITING); } } +<<<<<<< HEAD +======= + Stopwatch watch; + auto thread_manager = newThreadManager(); +>>>>>>> cc8a5c51b7 (MPP: update the state of building a hash table when createOnce throw exceptions (#4202)) for (auto & subqueries_for_sets : subqueries_for_sets_list) { for (auto & elem : subqueries_for_sets) @@ -129,7 +135,11 @@ void CreatingSetsBlockInputStream::createAll() } if (!exception_from_workers.empty()) + { + LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size()); std::rethrow_exception(exception_from_workers.front()); + } + LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds()); created = true; } @@ -137,19 +147,28 @@ void CreatingSetsBlockInputStream::createAll() void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { + auto log_msg = fmt::format("{} for task {}", + subquery.set ? "Creating set. " : subquery.join ? "Creating join. " + : subquery.table ? "Filling temporary table. " + : "null subquery", + mpp_task_id.toString()); + Stopwatch watch; try { +<<<<<<< HEAD LOG_DEBUG(log, (subquery.set ? "Creating set. " : "") << (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task " << std::to_string(mpp_task_id)); Stopwatch watch; +======= + LOG_FMT_DEBUG(log, "{}", log_msg); +>>>>>>> cc8a5c51b7 (MPP: update the state of building a hash table when createOnce throw exceptions (#4202)) BlockOutputStreamPtr table_out; if (subquery.table) table_out = subquery.table->write({}, {}); - bool done_with_set = !subquery.set; bool done_with_join = !subquery.join; bool done_with_table = !subquery.table; @@ -164,7 +183,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) { if (isCancelled()) { - LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation."); + LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation."); return; } @@ -209,7 +228,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) if (subquery.join) - subquery.join->setFinishBuildTable(true); + { + FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build); + subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED); + } if (table_out) table_out->writeSuffix(); @@ -243,20 +265,24 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery) msg << "In " << watch.elapsedSeconds() << " sec. "; msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads "; - if (log != nullptr) - LOG_DEBUG(log, msg.rdbuf()); - else - LOG_DEBUG(log, msg.rdbuf()); + LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str()); } else { +<<<<<<< HEAD LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << "."); +======= + LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString()); +>>>>>>> cc8a5c51b7 (MPP: update the state of building a hash table when createOnce throw exceptions (#4202)) } } - catch (std::exception & e) + catch (...) { std::unique_lock lock(exception_mutex); exception_from_workers.push_back(std::current_exception()); + if (subquery.join) + subquery.join->setBuildTableState(Join::BuildTableState::FAILED); + LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds()); } } diff --git a/dbms/src/Flash/tests/exchange_perftest.cpp b/dbms/src/Flash/tests/exchange_perftest.cpp index 7fb1559c675..3b92b460655 100644 --- a/dbms/src/Flash/tests/exchange_perftest.cpp +++ b/dbms/src/Flash/tests/exchange_perftest.cpp @@ -457,7 +457,7 @@ struct ReceiverHelper { if (join_ptr) { - join_ptr->setFinishBuildTable(true); + join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED); std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl; } } diff --git a/dbms/src/Interpreters/Join.cpp b/dbms/src/Interpreters/Join.cpp index 474527a32c1..197bc754df5 100644 --- a/dbms/src/Interpreters/Join.cpp +++ b/dbms/src/Interpreters/Join.cpp @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u , other_condition_ptr(other_condition_ptr_) , original_strictness(strictness) , max_block_size_for_cross_join(max_block_size_) - , have_finish_build(true) + , build_table_state(BuildTableState::SUCCEED) , log(getLogWithPrefix(log_, "Join")) , limits(limits) { @@ -96,10 +96,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u throw Exception("Not supported: non right join with right conditions"); } -void Join::setFinishBuildTable(bool finish_) +void Join::setBuildTableState(BuildTableState state_) { std::lock_guard lk(build_table_mutex); - have_finish_build = finish_; + build_table_state = state_; build_table_cv.notify_all(); } @@ -1622,7 +1622,9 @@ void Join::joinBlock(Block & block) const { std::unique_lock lk(build_table_mutex); - build_table_cv.wait(lk, [&]() { return have_finish_build; }); + build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; }); + if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table + throw Exception("Build failed before join probe!"); } std::shared_lock lock(rwlock); diff --git a/dbms/src/Interpreters/Join.h b/dbms/src/Interpreters/Join.h index 714a671ab26..0572038a550 100644 --- a/dbms/src/Interpreters/Join.h +++ b/dbms/src/Interpreters/Join.h @@ -142,7 +142,13 @@ class Join bool isBuildSetExceeded() const { return build_set_exceeded.load(); } size_t getNotJoinedStreamConcurrency() const { return build_concurrency; }; - void setFinishBuildTable(bool); + enum BuildTableState + { + WAITING, + FAILED, + SUCCEED + }; + void setBuildTableState(BuildTableState state_); /// Reference to the row in block. struct RowRef @@ -300,7 +306,7 @@ class Join mutable std::mutex build_table_mutex; mutable std::condition_variable build_table_cv; - bool have_finish_build; + BuildTableState build_table_state; const LogWithPrefixPtr log; diff --git a/tests/fullstack-test/mpp/mpp_fail.test b/tests/fullstack-test/mpp/mpp_fail.test index 006d2abf77d..ebf09408e06 100644 --- a/tests/fullstack-test/mpp/mpp_fail.test +++ b/tests/fullstack-test/mpp/mpp_fail.test @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang => DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run) => DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel) +## exception during mpp hash build +## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | id | estRows | task | access object | operator info | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## | Projection | 0.99 | root | | test.t.id | +## | └─TableReader | 0.99 | root | | data:ExchangeSender | +## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough | +## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] | +## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | | +## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo | +## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | | +## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) | +## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo | +## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id | +## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | | +## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] | +## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, | +## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) | +## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo | +## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+ +## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs. +=> DBGInvoke __enable_fail_point(exception_mpp_hash_build) +mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id; +ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered. +=> DBGInvoke __disable_fail_point(exception_mpp_hash_build) + # Clean up. mysql> drop table if exists test.t