Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: update the state of building a hash table when createOnce throw exceptions (#4202) #4270

Merged
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log)
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
43 changes: 26 additions & 17 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -108,9 +109,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
Expand All @@ -129,27 +131,31 @@ void CreatingSetsBlockInputStream::createAll()
thread_manager->wait();

if (!exception_from_workers.empty())
{
LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds());

created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
auto log_msg = fmt::format("{} for task {}",
subquery.set ? "Creating set. " : subquery.join ? "Creating join. "
: subquery.table ? "Filling temporary table. "
: "null subquery",
mpp_task_id.toString());
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< mpp_task_id.toString());
Stopwatch watch;

LOG_FMT_DEBUG(log, "{}", log_msg);
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand All @@ -164,7 +170,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -209,7 +215,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -243,20 +252,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
msg << "In " << watch.elapsedSeconds() << " sec. ";
msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads ";

if (log != nullptr)
LOG_DEBUG(log, msg.rdbuf());
else
LOG_DEBUG(log, msg.rdbuf());
LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str());
}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << mpp_task_id.toString() << ".");
LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString());
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ struct ReceiverHelper
{
if (join_ptr)
{
join_ptr->setFinishBuildTable(true);
join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED);
std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl;
}
}
Expand Down
10 changes: 6 additions & 4 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
, other_condition_ptr(other_condition_ptr_)
, original_strictness(strictness)
, max_block_size_for_cross_join(max_block_size_)
, have_finish_build(true)
, build_table_state(BuildTableState::SUCCEED)
, log(getLogWithPrefix(log_, "Join"))
, limits(limits)
{
Expand All @@ -96,10 +96,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -1622,7 +1622,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -300,7 +306,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

const LogWithPrefixPtr log;

Expand Down
32 changes: 32 additions & 0 deletions tests/fullstack-test/mpp/mpp_fail.test
Original file line number Diff line number Diff line change
Expand Up @@ -81,5 +81,37 @@ ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Exchang
=> DBGInvoke __disable_fail_point(exception_during_mpp_non_root_task_run)
=> DBGInvoke __disable_fail_point(exception_during_mpp_close_tunnel)

## exception during mpp hash build
## desc format='brief' select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | id | estRows | task | access object | operator info |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## | Projection | 0.99 | root | | test.t.id |
## | └─TableReader | 0.99 | root | | data:ExchangeSender |
## | └─ExchangeSender | 0.99 | batchCop[tiflash] | | ExchangeType: PassThrough |
## | └─HashJoin | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | ├─HashJoin(Build) | 0.99 | batchCop[tiflash] | | inner join, equal:[eq(test.t.id, test.t.id)] |
## | │ ├─ExchangeReceiver(Build) | 1.00 | batchCop[tiflash] | | |
## | │ │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t1 | keep order:false, stats:pseudo |
## | │ └─ExchangeReceiver(Probe) | 1.00 | batchCop[tiflash] | | |
## | │ └─ExchangeSender | 1.00 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | │ └─Selection | 1.00 | batchCop[tiflash] | | lt(test.t.id, 2), not(isnull(test.t.id)) |
## | │ └─TableFullScan | 3.00 | batchCop[tiflash] | table:t2 | keep order:false, stats:pseudo |
## | └─Projection(Probe) | 2.40 | batchCop[tiflash] | | test.t.id |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, funcs:firstrow(test.t.id)->test.t.id |
## | └─ExchangeReceiver | 2.40 | batchCop[tiflash] | | |
## | └─ExchangeSender | 2.40 | batchCop[tiflash] | | ExchangeType: HashPartition, Hash Cols: [name: test.t.id, collate: N/A] |
## | └─HashAgg | 2.40 | batchCop[tiflash] | | group by:test.t.id, |
## | └─Selection | 3.00 | batchCop[tiflash] | | not(isnull(test.t.id)) |
## | └─TableFullScan | 3.00 | batchCop[tiflash] | table:t | keep order:false, stats:pseudo |
## +-----------------------------------------+---------+-------------------+---------------+-------------------------------------------------------------------------+
## ensure build1, build2-probe1, probe2 in the CreatingSets, test the bug where build1 throw exception but not change the build state, thus block the build2-probe1, at last this query hangs.
=> DBGInvoke __enable_fail_point(exception_mpp_hash_build)
mysql> use test; set @@tidb_isolation_read_engines='tiflash'; set @@tidb_allow_mpp=1; set @@tidb_broadcast_join_threshold_count=0; set @@tidb_broadcast_join_threshold_size=0; select t1.id from test.t t1 join test.t t2 on t1.id = t2.id and t1.id <2 join (select id from test.t group by id) t3 on t2.id=t3.id;
ERROR 1105 (HY000) at line 1: other error for mpp stream: DB::Exception: Fail point FailPoints::exception_mpp_hash_build is triggered.
=> DBGInvoke __disable_fail_point(exception_mpp_hash_build)

# Clean up.
mysql> drop table if exists test.t