diff --git a/src/Storages/Streaming/StorageMaterializedView.cpp b/src/Storages/Streaming/StorageMaterializedView.cpp index e767daaf60..a0900d0750 100644 --- a/src/Storages/Streaming/StorageMaterializedView.cpp +++ b/src/Storages/Streaming/StorageMaterializedView.cpp @@ -350,6 +350,12 @@ void StorageMaterializedView::shutdown() if (shutdown_called.test_and_set()) return; + /// Wake up the thread waiting + { + std::lock_guard lock(wait_cv_mutex); + wait_cv.notify_all(); + } + background_state.terminate(); auto storage_id = getStorageID(); @@ -360,6 +366,14 @@ void StorageMaterializedView::shutdown() DatabaseCatalog::instance().removeDependency(target_table_id, storage_id); } +template +void StorageMaterializedView::waitFor(Duration && duration) +{ + /// No care of spurious awakenings + std::unique_lock lock(wait_cv_mutex); + wait_cv.wait_for(lock, std::forward(duration)); +} + void StorageMaterializedView::checkDependencies() const { std::vector waiting_storages; @@ -481,7 +495,7 @@ void StorageMaterializedView::initBackgroundState() bool log_error = (retry_times > 1); background_state.setException( err_code, fmt::format("{}, {}th wait for ready", getExceptionMessage(e, true), retry_times), log_error); - std::this_thread::sleep_for(recheck_dependencies_interval); + waitFor(recheck_dependencies_interval); } else if (err_code == ErrorCodes::DIRECTORY_DOESNT_EXIST) { @@ -506,16 +520,16 @@ void StorageMaterializedView::initBackgroundState() background_state.setException( err_code, fmt::format( - "Wait for {}th recovering background pipeline from {}. Background runtime error: {}. " - "processed_sns={} checkpointed_sns={}", + "Wait for {}th recovering background pipeline from {}, processed_sns={} checkpointed_sns={}. " + "Background runtime error: {}", retry_times, dumpStreamingSourcesWithSNs(streaming_sources, /*recover_from_checkpointed*/ {}), - getExceptionMessage(e, true), dumpSNs(getProcessedSNOfStreamingSources(streaming_sources)), - dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)))); + dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)), + getExceptionMessage(e, true))); /// For some queries that always go wrong, we don’t want to recover too frequently. Now wait 5s, 10s, 15s, ... 30s, 30s for each time - std::this_thread::sleep_for(recover_interval * std::min(retry_times, 6)); + waitFor(recover_interval * std::min(retry_times, 6)); break; } case RecoveryPolicy::BestEffort: @@ -541,17 +555,17 @@ void StorageMaterializedView::initBackgroundState() background_state.setException( err_code, fmt::format( - "Wait for {}th recovering background pipeline from {}. Background runtime error: {}. " - "processed_sns={} checkpointed_sns={}", + "Wait for {}th recovering background pipeline from {}, processed_sns={} checkpointed_sns={}. " + "Background runtime error: {}", retry_times, dumpStreamingSourcesWithSNs(streaming_sources, recover_sns), - getExceptionMessage(e, true), dumpSNs(current_failed_sns), - dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)))); + dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)), + getExceptionMessage(e, true))); failed_sn_queue.emplace_back(std::move(current_failed_sns)); - std::this_thread::sleep_for(recover_interval); + waitFor(recover_interval); break; } } @@ -853,12 +867,15 @@ void StorageMaterializedView::createInnerTable() { try { + if (shutdown_called.test(std::memory_order_relaxed)) + return; + doCreateInnerTable(metadata_snapshot, local_context); return; } catch (...) { - std::this_thread::sleep_for(std::chrono::seconds(2)); + waitFor(std::chrono::seconds(2)); LOG_DEBUG(log, "'{}' waiting for inner stream ready", getStorageID().getFullTableName()); } } diff --git a/src/Storages/Streaming/StorageMaterializedView.h b/src/Storages/Streaming/StorageMaterializedView.h index 0c5cd04e87..49d6ba71fa 100644 --- a/src/Storages/Streaming/StorageMaterializedView.h +++ b/src/Storages/Streaming/StorageMaterializedView.h @@ -87,6 +87,9 @@ class StorageMaterializedView final : public shared_ptr_helper + void waitFor(Duration && duration); + private: Poco::Logger * log; @@ -98,6 +101,9 @@ class StorageMaterializedView final : public shared_ptr_helper