Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: update the state of building a hash table when createOnce throw exceptions (#4202) #4268

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b630775
This is an automated cherry-pick of #4202
fzhedu Mar 14, 2022
4883671
Update FailPoint.cpp
fzhedu Apr 12, 2022
817fcef
Update CreatingSetsBlockInputStream.cpp
fzhedu Apr 12, 2022
ab2ceb8
Update CreatingSetsBlockInputStream.cpp
fzhedu Apr 12, 2022
babbd56
Update Join.cpp
fzhedu Apr 12, 2022
0be8466
Delete exchange_perftest.cpp
fzhedu Apr 12, 2022
023b35c
Update Join.cpp
fzhedu Apr 12, 2022
7983965
Update FailPoint.cpp
fzhedu Apr 12, 2022
ca83277
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
fzhedu Apr 12, 2022
c76a744
cherry pick
fzhedu Apr 12, 2022
b7860d2
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
fzhedu Apr 12, 2022
1229701
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 12, 2022
cb75051
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 12, 2022
bb3728d
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 12, 2022
848f614
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 13, 2022
b5cf616
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 13, 2022
7800469
formated
fzhedu Apr 13, 2022
6ee1397
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
fzhedu Apr 13, 2022
ff1fe87
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 13, 2022
11d27a1
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
fzhedu Apr 14, 2022
f96e2bd
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
972828f
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
3b99bc8
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
358084b
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
64e7a3c
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
67a330b
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
2f5f7f6
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
4dfd33c
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
c781370
Merge branch 'release-5.2' into cherry-pick-4202-to-release-5.2
ti-chi-bot Apr 14, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)
M(exception_in_creating_set_input_stream) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
32 changes: 24 additions & 8 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -94,9 +95,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto &elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto & elem : subqueries_for_sets)
Expand All @@ -116,27 +118,33 @@ void CreatingSetsBlockInputStream::createAll()
}

if (!exception_from_workers.empty())
{
LOG_ERROR(log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds())
<< " sec with exception and rethrow the first of total " << exception_from_workers.size()
<< " exceptions.");
std::rethrow_exception(exception_from_workers.front());

}
LOG_DEBUG(
log, "Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. ");
created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Stopwatch watch;

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand Down Expand Up @@ -193,7 +201,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -231,13 +242,18 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << ".");
LOG_DEBUG(log, "Subquery has empty result for task" << std::to_string(mpp_task_id));
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_ERROR(log,
"task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In "
<< std::to_string(watch.elapsedSeconds()) << " sec. ");
}
}

Expand Down
Loading