Skip to content

Commit

Permalink
Corrections after reworking synchronization of ON CLUSTER BACKUPs/RES…
Browse files Browse the repository at this point in the history
…TOREs #2.
  • Loading branch information
vitlibar committed Nov 14, 2024
1 parent 19bcc55 commit 380aea0
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 14 deletions.
28 changes: 17 additions & 11 deletions src/Backups/BackupCoordinationStageSync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,8 +121,7 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(

try
{
concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_);
createStartAndAliveNodes();
createStartAndAliveNodesAndCheckConcurrency(concurrency_counters_);
startWatchingThread();
}
catch (...)
Expand Down Expand Up @@ -221,7 +220,7 @@ void BackupCoordinationStageSync::createRootNodes()
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected path in ZooKeeper specified: {}", zookeeper_path);
}

auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createRootNodes", WithRetries::kInitialization);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createRootNodes", WithRetries::kInitialization);
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
{
Expand All @@ -232,18 +231,22 @@ void BackupCoordinationStageSync::createRootNodes()
}


void BackupCoordinationStageSync::createStartAndAliveNodes()
void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_)
{
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createStartAndAliveNodes", WithRetries::kInitialization);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::createStartAndAliveNodes", WithRetries::kInitialization);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
createStartAndAliveNodes(zookeeper);
createStartAndAliveNodesAndCheckConcurrency(zookeeper);
});

/// The local concurrency check should be done here after BackupCoordinationStageSync::checkConcurrency() checked that
/// there are no 'alive' nodes corresponding to other backups or restores.
local_concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_);
}


void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
void BackupCoordinationStageSync::createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
{
/// The "num_hosts" node keeps the number of hosts which started (created the "started" node)
/// but not yet finished (not created the "finished" node).
Expand Down Expand Up @@ -464,7 +467,7 @@ void BackupCoordinationStageSync::watchingThread()
try
{
/// Recreate the 'alive' node if necessary and read a new state from ZooKeeper.
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::watchingThread");
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::watchingThread");
auto & zookeeper = holder.faulty_zookeeper;
with_retries.renewZooKeeper(zookeeper);

Expand Down Expand Up @@ -496,6 +499,9 @@ void BackupCoordinationStageSync::watchingThread()
tryLogCurrentException(log, "Caught exception while watching");
}

if (should_stop())
return;

zk_nodes_changed->tryWait(sync_period_ms.count());
}
}
Expand Down Expand Up @@ -769,7 +775,7 @@ void BackupCoordinationStageSync::setStage(const String & stage, const String &
stopWatchingThread();
}

auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setStage");
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setStage");
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
Expand Down Expand Up @@ -938,7 +944,7 @@ bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::K

try
{
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::finish", retries_kind);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::finish", retries_kind);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
Expand Down Expand Up @@ -1309,7 +1315,7 @@ bool BackupCoordinationStageSync::setError(const Exception & exception, bool thr
}
}

auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setError", WithRetries::kErrorHandling);
auto holder = with_retries.createRetriesControlHolder("BackupCoordinationStageSync::setError", WithRetries::kErrorHandling);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
Expand Down
6 changes: 3 additions & 3 deletions src/Backups/BackupCoordinationStageSync.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@ class BackupCoordinationStageSync
void createRootNodes();

/// Atomically creates both 'start' and 'alive' nodes and also checks that there is no concurrent backup or restore if `allow_concurrency` is false.
void createStartAndAliveNodes();
void createStartAndAliveNodes(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
void createStartAndAliveNodesAndCheckConcurrency(BackupConcurrencyCounters & concurrency_counters_);
void createStartAndAliveNodesAndCheckConcurrency(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);

/// Deserialize the version of a node stored in the 'start' node.
int parseStartNode(const String & start_node_contents, const String & host) const;
Expand Down Expand Up @@ -171,7 +171,7 @@ class BackupCoordinationStageSync
const String alive_node_path;
const String alive_tracker_node_path;

std::optional<BackupConcurrencyCheck> concurrency_check;
std::optional<BackupConcurrencyCheck> local_concurrency_check;

std::shared_ptr<Poco::Event> zk_nodes_changed;

Expand Down

0 comments on commit 380aea0

Please sign in to comment.