Skip to content

Commit

Permalink
set BuildTableState
Browse files Browse the repository at this point in the history
Signed-off-by: fzhedu <[email protected]>
  • Loading branch information
fzhedu authored and ti-chi-bot committed Mar 14, 2022
1 parent c1724ba commit a986286
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 17 deletions.
19 changes: 9 additions & 10 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setBuildTableState(0);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
Expand Down Expand Up @@ -143,15 +143,14 @@ void CreatingSetsBlockInputStream::createAll()

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
std::stringstream log_msg;
log_msg << std::fixed << std::setprecision(3);
log_msg << (subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< mpp_task_id.toString();
auto log_msg = fmt::format("{} for task {}", (subquery.set ? "Creating set. " : subquery.join ? "Creating join. "
: subquery.table ? "Filling temporary table. "
: ""),
mpp_task_id.toString());
Stopwatch watch;
try
{
LOG_FMT_DEBUG(log, "{}", log_msg.rdbuf()->str());
LOG_FMT_DEBUG(log, "{}", log_msg);
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});
Expand Down Expand Up @@ -217,7 +216,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (subquery.join)
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(1);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
Expand Down Expand Up @@ -264,8 +263,8 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(-1);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg.rdbuf()->str(), getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ struct ReceiverHelper
{
if (join_ptr)
{
join_ptr->setBuildTableState(1);
join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED);
std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl;
}
}
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
, other_condition_ptr(other_condition_ptr_)
, original_strictness(strictness)
, max_block_size_for_cross_join(max_block_size_)
, build_table_state(1)
, build_table_state(BuildTableState::SUCCEED)
, log(getLogWithPrefix(log_, "Join"))
, limits(limits)
{
Expand All @@ -96,7 +96,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setBuildTableState(int state_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
build_table_state = state_;
Expand Down Expand Up @@ -1622,8 +1622,8 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return build_table_state != 0; });
if (build_table_state == -1) /// throw this exception once failed to build the hash table
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setBuildTableState(int);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -300,7 +306,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
int build_table_state; /// -1 means failure, 0 means waiting , 1 means success
BuildTableState build_table_state;

const LogWithPrefixPtr log;

Expand Down

0 comments on commit a986286

Please sign in to comment.