Skip to content

Commit

Permalink
Use wait instead of sleep for materialized view recovery (#638)
Browse files Browse the repository at this point in the history
* Use wait instead of sleep for materialized view recovery

* fix flaky smoke test
  • Loading branch information
yl-lisen authored Apr 1, 2024
1 parent c19b770 commit aa9d940
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 14 deletions.
41 changes: 29 additions & 12 deletions src/Storages/Streaming/StorageMaterializedView.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,12 @@ void StorageMaterializedView::shutdown()
if (shutdown_called.test_and_set())
return;

/// Wake up the thread waiting
{
std::lock_guard<std::mutex> lock(wait_cv_mutex);
wait_cv.notify_all();
}

background_state.terminate();

auto storage_id = getStorageID();
Expand All @@ -360,6 +366,14 @@ void StorageMaterializedView::shutdown()
DatabaseCatalog::instance().removeDependency(target_table_id, storage_id);
}

template <typename Duration>
void StorageMaterializedView::waitFor(Duration && duration)
{
/// No care of spurious awakenings
std::unique_lock lock(wait_cv_mutex);
wait_cv.wait_for(lock, std::forward<Duration>(duration));
}

void StorageMaterializedView::checkDependencies() const
{
std::vector<StoragePtr> waiting_storages;
Expand Down Expand Up @@ -481,7 +495,7 @@ void StorageMaterializedView::initBackgroundState()
bool log_error = (retry_times > 1);
background_state.setException(
err_code, fmt::format("{}, {}th wait for ready", getExceptionMessage(e, true), retry_times), log_error);
std::this_thread::sleep_for(recheck_dependencies_interval);
waitFor(recheck_dependencies_interval);
}
else if (err_code == ErrorCodes::DIRECTORY_DOESNT_EXIST)
{
Expand All @@ -506,16 +520,16 @@ void StorageMaterializedView::initBackgroundState()
background_state.setException(
err_code,
fmt::format(
"Wait for {}th recovering background pipeline from {}. Background runtime error: {}. "
"processed_sns={} checkpointed_sns={}",
"Wait for {}th recovering background pipeline from {}, processed_sns={} checkpointed_sns={}. "
"Background runtime error: {}",
retry_times,
dumpStreamingSourcesWithSNs(streaming_sources, /*recover_from_checkpointed*/ {}),
getExceptionMessage(e, true),
dumpSNs(getProcessedSNOfStreamingSources(streaming_sources)),
dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources))));
dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)),
getExceptionMessage(e, true)));

/// For some queries that always go wrong, we don’t want to recover too frequently. Now wait 5s, 10s, 15s, ... 30s, 30s for each time
std::this_thread::sleep_for(recover_interval * std::min<size_t>(retry_times, 6));
waitFor(recover_interval * std::min<size_t>(retry_times, 6));
break;
}
case RecoveryPolicy::BestEffort:
Expand All @@ -541,17 +555,17 @@ void StorageMaterializedView::initBackgroundState()
background_state.setException(
err_code,
fmt::format(
"Wait for {}th recovering background pipeline from {}. Background runtime error: {}. "
"processed_sns={} checkpointed_sns={}",
"Wait for {}th recovering background pipeline from {}, processed_sns={} checkpointed_sns={}. "
"Background runtime error: {}",
retry_times,
dumpStreamingSourcesWithSNs(streaming_sources, recover_sns),
getExceptionMessage(e, true),
dumpSNs(current_failed_sns),
dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources))));
dumpSNs(getCheckpointedSNOfStreamingSources(streaming_sources)),
getExceptionMessage(e, true)));

failed_sn_queue.emplace_back(std::move(current_failed_sns));

std::this_thread::sleep_for(recover_interval);
waitFor(recover_interval);
break;
}
}
Expand Down Expand Up @@ -853,12 +867,15 @@ void StorageMaterializedView::createInnerTable()
{
try
{
if (shutdown_called.test(std::memory_order_relaxed))
return;

doCreateInnerTable(metadata_snapshot, local_context);
return;
}
catch (...)
{
std::this_thread::sleep_for(std::chrono::seconds(2));
waitFor(std::chrono::seconds(2));
LOG_DEBUG(log, "'{}' waiting for inner stream ready", getStorageID().getFullTableName());
}
}
Expand Down
6 changes: 6 additions & 0 deletions src/Storages/Streaming/StorageMaterializedView.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ class StorageMaterializedView final : public shared_ptr_helper<StorageMaterializ

void checkDependencies() const;

template <typename Duration>
void waitFor(Duration && duration);

private:
Poco::Logger * log;

Expand All @@ -98,6 +101,9 @@ class StorageMaterializedView final : public shared_ptr_helper<StorageMaterializ

std::atomic_flag shutdown_called;

std::condition_variable wait_cv;
std::mutex wait_cv_mutex;

/// Background state
struct State
{
Expand Down
4 changes: 2 additions & 2 deletions tests/stream/test_stream_smoke/0098_fixed_issues2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,13 @@ tests:
- client: python
query_id: fixed-issues2-1
query_type: stream
wait: 1
wait: 2
query: subscribe to select i in (select i from table(fixed_issues2_stream)), multi_if(i in (select i from table(fixed_issues2_stream)), 1, 0) as t from fixed_issues2_stream settings seek_to='earliest', checkpoint_interval=1;

- client: python
query_id: fixed-issues2-2
query_type: stream
wait: 1
wait: 2
query: subscribe to with cte as (select 1), cte2 as (select multi_if(i in (cte), '1', '2') as t from fixed_issues2_stream where t = '1') select t from cte2 settings seek_to='earliest', checkpoint_interval=1

- client: python
Expand Down

0 comments on commit aa9d940

Please sign in to comment.