Skip to content

Commit

Permalink
MPP: update the state of building a hash table when createOnce throw …
Browse files Browse the repository at this point in the history
…exceptions (pingcap#4202) (pingcap#4268)

close pingcap#4195
  • Loading branch information
ti-chi-bot authored and fzhedu committed Jun 16, 2022
1 parent dae392f commit c1f6ab0
Show file tree
Hide file tree
Showing 5 changed files with 216 additions and 223 deletions.
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(segment_merge_after_ingest_packs) \
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream)
M(exception_in_creating_set_input_stream) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
59 changes: 34 additions & 25 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/materializeBlock.h>
#include <Flash/Mpp/getMPPTaskLog.h>
#include <Interpreters/Join.h>
#include <Interpreters/Set.h>
#include <Storages/IStorage.h>
Expand All @@ -13,36 +12,33 @@

namespace DB
{

namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits,
Int64 mpp_task_id_,
const LogWithPrefixPtr & log_)
CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(const BlockInputStreamPtr & input,
std::vector<SubqueriesForSets> && subqueries_for_sets_list_,
const SizeLimits & network_transfer_limits,
Int64 mpp_task_id_)
: subqueries_for_sets_list(std::move(subqueries_for_sets_list_))
, network_transfer_limits(network_transfer_limits)
, mpp_task_id(mpp_task_id_)
, log(getMPPTaskLog(log_, getName(), mpp_task_id))
{
init(input);
}

CreatingSetsBlockInputStream::CreatingSetsBlockInputStream(
const BlockInputStreamPtr & input,
const SubqueriesForSets & subqueries_for_sets,
const SizeLimits & network_transfer_limits,
const LogWithPrefixPtr & log_)
const SizeLimits & network_transfer_limits)
: network_transfer_limits(network_transfer_limits)
, log(getMPPTaskLog(log_, getName(), mpp_task_id))
{
subqueries_for_sets_list.push_back(subqueries_for_sets);
init(input);
Expand Down Expand Up @@ -107,9 +103,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
for (auto & elem : subqueries_for_sets)
Expand All @@ -118,7 +115,7 @@ void CreatingSetsBlockInputStream::createAll()
{
if (isCancelledOrThrowIfKilled())
return;
workers.emplace_back(ThreadFactory(true, "CreatingSets").newThread([this, &subquery = elem.second] { createOne(subquery); }));
workers.emplace_back(ThreadFactory().newThread([this, &subquery = elem.second] { createOne(subquery); }));
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_in_creating_set_input_stream);
}
}
Expand All @@ -129,27 +126,34 @@ void CreatingSetsBlockInputStream::createAll()
}

if (!exception_from_workers.empty())
{
LOG_ERROR(log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds())
<< " sec with exception and rethrow the first of total " << exception_from_workers.size()
<< " exceptions.");
std::rethrow_exception(exception_from_workers.front());

}
LOG_DEBUG(
log,
"Creating all tasks of " << std::to_string(mpp_task_id) << " takes " << std::to_string(watch.elapsedSeconds()) << "sec. ");
created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< std::to_string(mpp_task_id));
Stopwatch watch;

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand Down Expand Up @@ -209,7 +213,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -242,21 +249,23 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)

msg << "In " << watch.elapsedSeconds() << " sec. ";
msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads ";

if (log != nullptr)
LOG_DEBUG(log, msg.rdbuf());
else
LOG_DEBUG(log, msg.rdbuf());
msg << "for task " << std::to_string(mpp_task_id) << ".";
LOG_DEBUG(log, msg.rdbuf());
}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << std::to_string(mpp_task_id) << ".");
LOG_DEBUG(log, "Subquery has empty result for task" << std::to_string(mpp_task_id));
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_ERROR(log,
"task" << std::to_string(mpp_task_id) << " throw exception: " << getCurrentExceptionMessage(false, true) << " In "
<< std::to_string(watch.elapsedSeconds()) << " sec. ");
}
}

Expand Down
Loading

0 comments on commit c1f6ab0

Please sign in to comment.