From 03058041643ece0c23236710639766077a0dcdc7 Mon Sep 17 00:00:00 2001 From: JaySon Date: Mon, 23 Oct 2023 10:44:01 +0800 Subject: [PATCH] disagg: Fix error when there are empty partitions (#8221) (#110) close pingcap/tiflash#8220 --- dbms/src/Common/MPMCQueue.h | 2 +- dbms/src/Flash/Coprocessor/DAGContext.cpp | 8 +++---- .../Storages/DeltaMerge/Remote/RNWorkers.cpp | 23 +++++++++++++++---- .../Storages/DeltaMerge/Remote/RNWorkers.h | 2 ++ 4 files changed, 25 insertions(+), 10 deletions(-) diff --git a/dbms/src/Common/MPMCQueue.h b/dbms/src/Common/MPMCQueue.h index cfe5956d23c..f152d40e47d 100644 --- a/dbms/src/Common/MPMCQueue.h +++ b/dbms/src/Common/MPMCQueue.h @@ -131,7 +131,7 @@ class MPMCQueue using Result = MPMCQueueResult; using ElementAuxiliaryMemoryUsageFunc = std::function; - MPMCQueue( + explicit MPMCQueue( const CapacityLimits & capacity_limits_, ElementAuxiliaryMemoryUsageFunc && get_auxiliary_memory_usage_ = [](const T &) { return 0; }) : capacity_limits(capacity_limits_) diff --git a/dbms/src/Flash/Coprocessor/DAGContext.cpp b/dbms/src/Flash/Coprocessor/DAGContext.cpp index 0dedcf1510f..fd39701b97b 100644 --- a/dbms/src/Flash/Coprocessor/DAGContext.cpp +++ b/dbms/src/Flash/Coprocessor/DAGContext.cpp @@ -51,7 +51,7 @@ DAGContext::DAGContext( const String & resource_group_name_, LoggerPtr log_) : dag_request(&dag_request_) - , dummy_query_string(dag_request->DebugString()) + , dummy_query_string(dag_request->ShortDebugString()) , dummy_ast(makeDummyQuery()) , tidb_host(tidb_host_) , collect_execution_summaries( @@ -76,7 +76,7 @@ DAGContext::DAGContext( // for mpp DAGContext::DAGContext(tipb::DAGRequest & dag_request_, const mpp::TaskMeta & meta_, bool is_root_mpp_task_) : dag_request(&dag_request_) - , dummy_query_string(dag_request->DebugString()) + , dummy_query_string(dag_request->ShortDebugString()) , dummy_ast(makeDummyQuery()) , collect_execution_summaries( dag_request->has_collect_execution_summaries() && dag_request->collect_execution_summaries()) @@ -106,7 +106,7 @@ DAGContext::DAGContext( const String & compute_node_host_, LoggerPtr log_) : dag_request(&dag_request_) - , dummy_query_string(dag_request->DebugString()) + , dummy_query_string(dag_request->ShortDebugString()) , dummy_ast(makeDummyQuery()) , tidb_host(compute_node_host_) , collect_execution_summaries( @@ -146,7 +146,7 @@ DAGContext::DAGContext(UInt64 max_error_count_) // for tests need to run query tasks. DAGContext::DAGContext(tipb::DAGRequest & dag_request_, String log_identifier, size_t concurrency) : dag_request(&dag_request_) - , dummy_query_string(dag_request->DebugString()) + , dummy_query_string(dag_request->ShortDebugString()) , dummy_ast(makeDummyQuery()) , initialize_concurrency(concurrency) , collect_execution_summaries( diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp index fda616d6cc4..c9bcbcbedba 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.cpp @@ -22,7 +22,12 @@ RNWorkers::RNWorkers(const Context & context, const Options & options, size_t nu { RUNTIME_CHECK(num_streams > 0, num_streams); size_t n = options.read_task->segment_read_tasks.size(); - RUNTIME_CHECK(n > 0, n); + if (n == 0) + { + empty_channel = std::make_shared(0); + empty_channel->finish(); + return; + } auto fetch_pages_concurrency = n; auto prepare_streams_concurrency = n; @@ -69,18 +74,26 @@ RNWorkers::RNWorkers(const Context & context, const Options & options, size_t nu void RNWorkers::startInBackground() { - worker_fetch_pages->startInBackground(); - worker_prepare_streams->startInBackground(); + if (!empty_channel) + { + worker_fetch_pages->startInBackground(); + worker_prepare_streams->startInBackground(); + } } void RNWorkers::wait() { - worker_fetch_pages->wait(); - worker_prepare_streams->wait(); + if (!empty_channel) + { + worker_fetch_pages->wait(); + worker_prepare_streams->wait(); + } } RNWorkers::ChannelPtr RNWorkers::getReadyChannel() const { + if (empty_channel) + return empty_channel; return worker_prepare_streams->result_queue; } } // namespace DB::DM::Remote diff --git a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h index ed07341d3c6..36c452151c3 100644 --- a/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h +++ b/dbms/src/Storages/DeltaMerge/Remote/RNWorkers.h @@ -61,6 +61,8 @@ class RNWorkers : private boost::noncopyable } private: + ChannelPtr empty_channel; + RNWorkerFetchPagesPtr worker_fetch_pages; RNWorkerPrepareStreamsPtr worker_prepare_streams; };