Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MPP: update the state of building a hash table when createOnce throw exceptions (#4202) #4270

Merged
3 changes: 2 additions & 1 deletion dbms/src/Common/FailPoint.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ std::unordered_map<String, std::shared_ptr<FailPointChannel>> FailPointHelper::f
M(force_formal_page_file_not_exists) \
M(force_legacy_or_checkpoint_page_file_exists) \
M(exception_in_creating_set_input_stream) \
M(exception_when_read_from_log)
M(exception_when_read_from_log) \
M(exception_mpp_hash_build)

#define APPLY_FOR_FAILPOINTS(M) \
M(force_set_page_file_write_errno) \
Expand Down
43 changes: 26 additions & 17 deletions dbms/src/DataStreams/CreatingSetsBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ namespace DB
namespace FailPoints
{
extern const char exception_in_creating_set_input_stream[];
}
extern const char exception_mpp_hash_build[];
} // namespace FailPoints
namespace ErrorCodes
{
extern const int SET_SIZE_LIMIT_EXCEEDED;
Expand Down Expand Up @@ -108,9 +109,10 @@ void CreatingSetsBlockInputStream::createAll()
for (auto & elem : subqueries_for_sets)
{
if (elem.second.join)
elem.second.join->setFinishBuildTable(false);
elem.second.join->setBuildTableState(Join::BuildTableState::WAITING);
}
}
Stopwatch watch;
auto thread_manager = newThreadManager();
for (auto & subqueries_for_sets : subqueries_for_sets_list)
{
Expand All @@ -129,27 +131,31 @@ void CreatingSetsBlockInputStream::createAll()
thread_manager->wait();

if (!exception_from_workers.empty())
{
LOG_FMT_ERROR(log, "Creating all tasks of {} takes {} sec with exception and rethrow the first of total {} exceptions", mpp_task_id.toString(), watch.elapsedSeconds(), exception_from_workers.size());
std::rethrow_exception(exception_from_workers.front());
}
LOG_FMT_DEBUG(log, "Creating all tasks of {} takes {} sec. ", mpp_task_id.toString(), watch.elapsedSeconds());

created = true;
}
}

void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
auto log_msg = fmt::format("{} for task {}",
subquery.set ? "Creating set. " : subquery.join ? "Creating join. "
: subquery.table ? "Filling temporary table. "
: "null subquery",
mpp_task_id.toString());
Stopwatch watch;
try
{
LOG_DEBUG(log,
(subquery.set ? "Creating set. " : "")
<< (subquery.join ? "Creating join. " : "") << (subquery.table ? "Filling temporary table. " : "") << " for task "
<< mpp_task_id.toString());
Stopwatch watch;

LOG_FMT_DEBUG(log, "{}", log_msg);
BlockOutputStreamPtr table_out;
if (subquery.table)
table_out = subquery.table->write({}, {});


bool done_with_set = !subquery.set;
bool done_with_join = !subquery.join;
bool done_with_table = !subquery.table;
Expand All @@ -164,7 +170,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
{
if (isCancelled())
{
LOG_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
LOG_FMT_DEBUG(log, "Query was cancelled during set / join or temporary table creation.");
return;
}

Expand Down Expand Up @@ -209,7 +215,10 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)


if (subquery.join)
subquery.join->setFinishBuildTable(true);
{
FAIL_POINT_TRIGGER_EXCEPTION(FailPoints::exception_mpp_hash_build);
subquery.join->setBuildTableState(Join::BuildTableState::SUCCEED);
}

if (table_out)
table_out->writeSuffix();
Expand Down Expand Up @@ -243,20 +252,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
msg << "In " << watch.elapsedSeconds() << " sec. ";
msg << "using " << std::to_string(subquery.join == nullptr ? 1 : subquery.join->getBuildConcurrency()) << " threads ";

if (log != nullptr)
LOG_DEBUG(log, msg.rdbuf());
else
LOG_DEBUG(log, msg.rdbuf());
LOG_FMT_DEBUG(log, "{}", msg.rdbuf()->str());
}
else
{
LOG_DEBUG(log, "Subquery has empty result for task " << mpp_task_id.toString() << ".");
LOG_FMT_DEBUG(log, "Subquery has empty result for task {}. ", mpp_task_id.toString());
}
}
catch (std::exception & e)
catch (...)
{
std::unique_lock<std::mutex> lock(exception_mutex);
exception_from_workers.push_back(std::current_exception());
if (subquery.join)
subquery.join->setBuildTableState(Join::BuildTableState::FAILED);
LOG_FMT_ERROR(log, "{} throw exception: {} In {} sec. ", log_msg, getCurrentExceptionMessage(false, true), watch.elapsedSeconds());
}
}

Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Flash/tests/exchange_perftest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -458,7 +458,7 @@ struct ReceiverHelper
{
if (join_ptr)
{
join_ptr->setFinishBuildTable(true);
join_ptr->setBuildTableState(Join::BuildTableState::SUCCEED);
std::cout << fmt::format("Hash table size: {} bytes", join_ptr->getTotalByteCount()) << std::endl;
}
}
Expand Down
58 changes: 29 additions & 29 deletions dbms/src/Interpreters/Join.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
, other_condition_ptr(other_condition_ptr_)
, original_strictness(strictness)
, max_block_size_for_cross_join(max_block_size_)
, have_finish_build(true)
, build_table_state(BuildTableState::SUCCEED)
, log(getLogWithPrefix(log_, "Join"))
, limits(limits)
{
Expand All @@ -96,10 +96,10 @@ Join::Join(const Names & key_names_left_, const Names & key_names_right_, bool u
throw Exception("Not supported: non right join with right conditions");
}

void Join::setFinishBuildTable(bool finish_)
void Join::setBuildTableState(BuildTableState state_)
{
std::lock_guard<std::mutex> lk(build_table_mutex);
have_finish_build = finish_;
build_table_state = state_;
build_table_cv.notify_all();
}

Expand Down Expand Up @@ -344,7 +344,7 @@ size_t Join::getTotalByteCount() const
res += getTotalByteCountImpl(maps_all, type);
res += getTotalByteCountImpl(maps_any_full, type);
res += getTotalByteCountImpl(maps_all_full, type);
for (auto & pool : pools)
for (const auto & pool : pools)
{
/// note the return value might not be accurate since it does not use lock, but should be enough for current usage
res += pool->size();
Expand Down Expand Up @@ -421,7 +421,7 @@ namespace
{
void insertRowToList(Join::RowRefList * list, Join::RowRefList * elem, Block * stored_block, size_t index)
{
elem->next = list->next;
elem->next = list->next; // NOLINT(clang-analyzer-core.NullDereference)
list->next = elem;
elem->block = stored_block;
elem->row_num = index;
Expand Down Expand Up @@ -493,7 +493,7 @@ void NO_INLINE insertFromBlockImplTypeCase(
if (rows_not_inserted_to_map)
{
/// for right/full out join, need to record the rows not inserted to map
auto elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
auto * elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, i);
}
continue;
Expand Down Expand Up @@ -536,9 +536,9 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
segment_index_info.resize(segment_size);
}
size_t rows_per_seg = rows / segment_index_info.size();
for (size_t i = 0; i < segment_index_info.size(); i++)
for (auto & segment_index : segment_index_info)
{
segment_index_info[i].reserve(rows_per_seg);
segment_index.reserve(rows_per_seg);
}
for (size_t i = 0; i < rows; i++)
{
Expand Down Expand Up @@ -567,16 +567,16 @@ void NO_INLINE insertFromBlockImplTypeCaseWithLock(
{
/// null value
/// here ignore mutex because rows_not_inserted_to_map is privately owned by each stream thread
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
for (auto index : segment_index_info[segment_index])
{
/// for right/full out join, need to record the rows not inserted to map
auto elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, segment_index_info[segment_index][i]);
auto * elem = reinterpret_cast<Join::RowRefList *>(pool.alloc(sizeof(Join::RowRefList)));
insertRowToList(rows_not_inserted_to_map, elem, stored_block, index);
}
}
else
{
std::lock_guard<std::mutex> lk(map.getSegmentMutex(segment_index));
std::lock_guard lk(map.getSegmentMutex(segment_index));
for (size_t i = 0; i < segment_index_info[segment_index].size(); i++)
{
Inserter<STRICTNESS, typename Map::SegmentType::HashTable, KeyGetter>::insert(map.getSegmentTable(segment_index), key_getter, stored_block, segment_index_info[segment_index][i], pool, sort_key_containers);
Expand Down Expand Up @@ -705,7 +705,7 @@ void recordFilteredRows(const Block & block, const String & filter_column, Colum
MutableColumnPtr mutable_null_map_holder = (*std::move(null_map_holder)).mutate();
PaddedPODArray<UInt8> & mutable_null_map = static_cast<ColumnUInt8 &>(*mutable_null_map_holder).getData();

auto & nested_column = column->isColumnNullable() ? static_cast<const ColumnNullable &>(*column).getNestedColumnPtr() : column;
const auto & nested_column = column->isColumnNullable() ? static_cast<const ColumnNullable &>(*column).getNestedColumnPtr() : column;
for (size_t i = 0, size = nested_column->size(); i < size; ++i)
mutable_null_map[i] |= (!nested_column->getInt(i));

Expand All @@ -732,7 +732,7 @@ void Join::insertFromBlock(const Block & block, size_t stream_index)
std::shared_lock lock(rwlock);
Block * stored_block = nullptr;
{
std::lock_guard<std::mutex> lk(blocks_lock);
std::lock_guard lk(blocks_lock);
blocks.push_back(block);
stored_block = &blocks.back();
original_blocks.push_back(block);
Expand All @@ -753,6 +753,7 @@ bool Join::insertFromBlockInternal(Block * stored_block, size_t stream_index)
const Block & block = *stored_block;

/// Rare case, when keys are constant. To avoid code bloat, simply materialize them.
/// Note: this variable can't be removed because it will take smart pointers' lifecycle to the end of this function.
Columns materialized_columns;

/// Memoize key columns to work.
Expand Down Expand Up @@ -981,11 +982,11 @@ void NO_INLINE joinBlockImplTypeCase(
hash_value = map.hash(key);
segment_index = hash_value % map.getSegmentSize();
}
auto & internalMap = map.getSegmentTable(segment_index);
auto & internal_map = map.getSegmentTable(segment_index);
/// do not require segment lock because in join, the hash table can not be changed in probe stage.
auto it = map.getSegmentSize() > 0 ? internalMap.find(key, hash_value) : internalMap.find(key);
auto it = map.getSegmentSize() > 0 ? internal_map.find(key, hash_value) : internal_map.find(key);

if (it != internalMap.end())
if (it != internal_map.end())
{
it->getMapped().setUsed();
Adder<KIND, STRICTNESS, Map>::addFound(
Expand Down Expand Up @@ -1061,8 +1062,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
orig_filter_column = orig_filter_column->convertToFullColumnIfConst();
if (orig_filter_column->isColumnNullable())
{
auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
const auto * nullable_column = checkAndGetColumn<ColumnNullable>(orig_filter_column.get());
const auto & nested_column_data = static_cast<const ColumnVector<UInt8> *>(nullable_column->getNestedColumnPtr().get())->getData();
for (size_t i = 0; i < nullable_column->size(); i++)
{
if (filter_column[i] == 0)
Expand All @@ -1075,8 +1076,8 @@ void mergeNullAndFilterResult(Block & block, ColumnVector<UInt8>::Container & fi
}
else
{
auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
const auto * other_filter_column = checkAndGetColumn<ColumnVector<UInt8>>(orig_filter_column.get());
const auto & other_filter_column_data = static_cast<const ColumnVector<UInt8> *>(other_filter_column)->getData();
for (size_t i = 0; i < other_filter_column->size(); i++)
filter_column[i] = filter_column[i] && other_filter_column_data[i];
}
Expand All @@ -1100,7 +1101,7 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>

auto filter_column = ColumnUInt8::create();
auto & filter = filter_column->getData();
filter.assign(block.rows(), (UInt8)1);
filter.assign(block.rows(), static_cast<UInt8>(1));
if (!other_filter_column.empty())
{
mergeNullAndFilterResult(block, filter, other_filter_column, false);
Expand Down Expand Up @@ -1179,9 +1180,9 @@ void Join::handleOtherConditions(Block & block, std::unique_ptr<IColumn::Filter>
if (isLeftJoin(kind))
{
/// for left join, convert right column to null if not joined
for (size_t i = 0; i < right_table_columns.size(); i++)
for (size_t right_table_column : right_table_columns)
{
auto & column = block.getByPosition(right_table_columns[i]);
auto & column = block.getByPosition(right_table_column);
auto full_column = column.column->isColumnConst() ? column.column->convertToFullColumnIfConst() : column.column;
if (!full_column->isColumnNullable())
{
Expand Down Expand Up @@ -1561,9 +1562,8 @@ void Join::joinBlockImplCrossInternal(Block & block, ConstNullMapPtr null_map [[
{
dst_columns[i] = sample_block.getByPosition(i).column->cloneEmpty();
}
for (size_t i = 0; i < result_blocks.size(); i++)
for (auto & current_block : result_blocks)
{
auto & current_block = result_blocks[i];
if (current_block.rows() > 0)
{
for (size_t column = 0; column < current_block.columns(); column++)
Expand Down Expand Up @@ -1613,7 +1613,6 @@ void Join::checkTypesOfKeys(const Block & block_left, const Block & block_right)
}
}


void Join::joinBlock(Block & block) const
{
// std::cerr << "joinBlock: " << block.dumpStructure() << "\n";
Expand All @@ -1622,7 +1621,9 @@ void Join::joinBlock(Block & block) const
{
std::unique_lock lk(build_table_mutex);

build_table_cv.wait(lk, [&]() { return have_finish_build; });
build_table_cv.wait(lk, [&]() { return build_table_state != BuildTableState::WAITING; });
if (build_table_state == BuildTableState::FAILED) /// throw this exception once failed to build the hash table
throw Exception("Build failed before join probe!");
}

std::shared_lock lock(rwlock);
Expand Down Expand Up @@ -1967,5 +1968,4 @@ BlockInputStreamPtr Join::createStreamWithNonJoinedRows(const Block & left_sampl
return std::make_shared<NonJoinedBlockInputStream>(*this, left_sample_block, index, step, max_block_size);
}


} // namespace DB
10 changes: 8 additions & 2 deletions dbms/src/Interpreters/Join.h
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,13 @@ class Join
bool isBuildSetExceeded() const { return build_set_exceeded.load(); }
size_t getNotJoinedStreamConcurrency() const { return build_concurrency; };

void setFinishBuildTable(bool);
enum BuildTableState
{
WAITING,
FAILED,
SUCCEED
};
void setBuildTableState(BuildTableState state_);

/// Reference to the row in block.
struct RowRef
Expand Down Expand Up @@ -300,7 +306,7 @@ class Join

mutable std::mutex build_table_mutex;
mutable std::condition_variable build_table_cv;
bool have_finish_build;
BuildTableState build_table_state;

const LogWithPrefixPtr log;

Expand Down
Loading