Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: update the state of building a hash table when createOnce throw exceptions #4202

Merged
merged 15 commits into from
Mar 14, 2022
Merged
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log)
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
43 changes: 26 additions & 17 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -108,9 +109,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
Expand All @@ -129,27 +131,31 @@ void CreatingSetsBlockInputStream::createAll()
thread_manager->wait();

if (!exception_from_workers.empty())
{
LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds());

created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
auto log_msg = fmt::format("{} for task {}",
subquery.set ? "Creating set. " : subquery.join ? "Creating join. "
: subquery.table ? "Filling temporary table. "
: "null subquery",
mpp_task_id.toString());
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< mpp_task_id.toString());
Stopwatch watch;

LOG_FMT_DEBUG(log, "{}", log_msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my suggestion is:

Suggested change
LOG_FMT_DEBUG(log, "{}", log_msg);
LOG_FMT_DEBUG(log, "{} for task {}", xxxx);

then the fmt will only run when log_level <= debug.

BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand All @@ -164,7 +170,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -209,7 +215,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -243,20 +252,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
msg << "In " << watch.elapsedSeconds() << " sec. ";
msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads ";

if (log != nullptr)
LOG_DEBUG(log, msg.rdbuf());
else
LOG_DEBUG(log, msg.rdbuf());
LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use fmt for generating msg.

}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << mpp_task_id.toString() << ".");
LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString());
}
}
catch (std::exception & e)
catch (...)
{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not merge the two catch block, seems the only difference is the error message, and you can use getCurrentExceptionMessage to get error message for current exception.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ struct ReceiverHelper
{
if (join_ptr)
{
join_ptr->setFinishBuildTable(true);
join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED);
std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl;
}
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
, other_condition_ptr(other_condition_ptr_)
, original_strictness(strictness)
, max_block_size_for_cross_join(max_block_size_)
, have_finish_build(true)
, build_table_state(BuildTableState::SUCCEED)
, log(getLogWithPrefix(log_, "Join"))
, limits(limits)
{
Expand All @@ -108,10 +108,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -1771,7 +1771,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -307,7 +313,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

const LogWithPrefixPtr log;

Expand Down
32 changes: 32 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)
=> DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel)

## exception during mpp hash build
## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | id | estRows | task | access object | operator info |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | Projection | 0.99 | root | | test.t.id |
## | └─TableReader | 0.99 | root | | data:ExchangeSender |
## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough |
## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | |
## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo |
## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | |
## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo |
## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id |
## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | |
## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, |
## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) |
## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs.
=> DBGInvoke __enable_fail_point(exception_mpp_hash_build)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this test let TiFlash hang before this pr?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, the test progress below guarantee this. also the integration test ensure this.

ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered.
=> DBGInvoke __disable_fail_point(exception_mpp_hash_build)

# Clean up.
mysql> drop table if exists test.t