Skip to content

Commit

Permalink
This is an automated cherry-pick of #9450
Browse files Browse the repository at this point in the history
Signed-off-by: ti-chi-bot <[email protected]>
  • Loading branch information
windtalker authored and ti-chi-bot committed Sep 21, 2024
1 parent 50bd2b0 commit a9488da
Show file tree
Hide file tree
Showing 19 changed files with 3,174 additions and 9 deletions.
2 changes: 1 addition & 1 deletion dbms/src/DataStreams/AggregatingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ Block AggregatingBlockInputStream::readImpl()
executed = true;
AggregatedDataVariantsPtr data_variants = std::make_shared<AggregatedDataVariants>();

Aggregator::CancellationHook hook = [&]() {
CancellationHook hook = [&]() {
return this->isCancelled();
};
aggregator.setCancellationHook(hook);
Expand Down
109 changes: 109 additions & 0 deletions dbms/src/DataStreams/HashJoinProbeExec.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Common/PtrHolder.h>
#include <Core/Block.h>
#include <DataStreams/IBlockInputStream.h>
#include <Interpreters/Join.h>

#include <memory>

#pragma once

namespace DB
{
class HashJoinProbeExec;
using HashJoinProbeExecPtr = std::shared_ptr<HashJoinProbeExec>;

class HashJoinProbeExec : public std::enable_shared_from_this<HashJoinProbeExec>
{
public:
static HashJoinProbeExecPtr build(
const String & req_id,
const JoinPtr & join,
size_t stream_index,
const BlockInputStreamPtr & probe_stream,
size_t max_block_size);

HashJoinProbeExec(
const String & req_id,
const JoinPtr & join_,
size_t stream_index_,
const BlockInputStreamPtr & restore_build_stream_,
const BlockInputStreamPtr & probe_stream_,
bool need_scan_hash_map_after_probe_,
const BlockInputStreamPtr & scan_hash_map_after_probe_stream_,
size_t max_block_size_);

void waitUntilAllBuildFinished();

void waitUntilAllProbeFinished();

HashJoinProbeExecPtr tryGetRestoreExec();

void cancel();

void meetError(const String & error_message);

void restoreBuild();

void onProbeStart();
// Returns empty block if probe finish.
Block probe();
// Returns true if the probe_exec ends.
// Returns false if the probe_exec continues to execute.
bool onProbeFinish();

bool needScanHashMap() const { return need_scan_hash_map_after_probe; }
void onScanHashMapAfterProbeStart();
Block fetchScanHashMapData();
// Returns true if the probe_exec ends.
// Returns false if the probe_exec continues to execute.
bool onScanHashMapAfterProbeFinish();

void setCancellationHook(CancellationHook cancellation_hook) { is_cancelled = std::move(cancellation_hook); }

private:
PartitionBlock getProbeBlock();

HashJoinProbeExecPtr doTryGetRestoreExec();

private:
const LoggerPtr log;

const JoinPtr join;

const size_t stream_index;

const BlockInputStreamPtr restore_build_stream;

const BlockInputStreamPtr probe_stream;

const bool need_scan_hash_map_after_probe;
const BlockInputStreamPtr scan_hash_map_after_probe_stream;

const size_t max_block_size;

CancellationHook is_cancelled{[]() {
return false;
}};

ProbeProcessInfo probe_process_info;
PartitionBlocks probe_partition_blocks;

HashJoinProbeExecPtr parent;
};

using HashJoinProbeExecHolder = PtrHolder<HashJoinProbeExecPtr>;
} // namespace DB
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ Block ParallelAggregatingBlockInputStream::readImpl()
{
if (!executed)
{
Aggregator::CancellationHook hook = [&]() {
CancellationHook hook = [&]() {
return this->isCancelled();
};
aggregator.setCancellationHook(hook);
Expand Down
14 changes: 11 additions & 3 deletions dbms/src/Flash/Mpp/MPPTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,13 @@ void MPPTask::runImpl()
GET_METRIC(tiflash_coprocessor_handling_request_count, type_run_mpp_task).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_run_mpp_task).Observe(stopwatch.elapsedSeconds());
});
<<<<<<< HEAD
=======

// set cancellation hook
context->setCancellationHook([this] { return is_cancelled.load(); });

>>>>>>> 8aba9f0ce3 (join be aware of cancel signal (#9450))
String err_msg;
try
{
Expand Down Expand Up @@ -494,7 +501,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
if (previous_status == FINISHED || previous_status == CANCELLED || previous_status == FAILED)
{
LOG_WARNING(log, "task already in {} state", magic_enum::enum_name(previous_status));
return;
break;
}
else if (previous_status == INITIALIZING && switchStatus(INITIALIZING, next_task_status))
{
Expand All @@ -503,7 +510,7 @@ void MPPTask::abort(const String & message, AbortType abort_type)
/// so just close all tunnels here
abortTunnels("", false);
LOG_WARNING(log, "Finish abort task from uninitialized");
return;
break;
}
else if (previous_status == RUNNING && switchStatus(RUNNING, next_task_status))
{
Expand All @@ -517,9 +524,10 @@ void MPPTask::abort(const String & message, AbortType abort_type)
scheduleThisTask(ScheduleState::FAILED);
/// runImpl is running, leave remaining work to runImpl
LOG_WARNING(log, "Finish abort task from running");
return;
break;
}
}
is_cancelled = true;
}

bool MPPTask::switchStatus(TaskStatus from, TaskStatus to)
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/Flash/Mpp/MPPTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,12 @@ class MPPTask : public std::enable_shared_from_this<MPPTask>
ContextPtr context;

MPPTaskManager * manager;
<<<<<<< HEAD
std::atomic<bool> registered{false};
=======
std::atomic<bool> is_registered{false};
std::atomic<bool> is_cancelled{false};
>>>>>>> 8aba9f0ce3 (join be aware of cancel signal (#9450))

MPPTaskScheduleEntry schedule_entry;

Expand Down
Loading

0 comments on commit a9488da

Please sign in to comment.