Skip to content

Commit

Permalink
Merge branch 'main' into 678098-patch-4
Browse files Browse the repository at this point in the history
  • Loading branch information
678098 authored Jan 16, 2025
2 parents 8b14b0e + acbb52d commit 0a2d051
Show file tree
Hide file tree
Showing 27 changed files with 422 additions and 313 deletions.
58 changes: 24 additions & 34 deletions src/groups/mqb/mqba/mqba_domainmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,17 +566,21 @@ void DomainManager::stop()
<< k_MAX_WAIT_SECONDS_AT_SHUTDOWN
<< " seconds while shutting down"
<< " bmqbrkr. rc: " << rc << ".";

// Note that 'self' variable will get invalidated when this function
// returns, which will ensure that any pending 'onDomainClosed'
// callbacks are not invoked. So there is no need to explicitly call
// 'self.invalidate()' here.
}

if (d_domainResolver_mp) {
d_domainResolver_mp->stop();
d_domainResolver_mp.clear();
}

// Notice that this invalidation is necessary.
// Without this explicit call, `self` will be invalidated
// when the function returns, which will ensure that any pending
// `onDomainClosed` callbacks are not invoked. But this is not enough
// since we want to prevent a (tiny) possibility where `latch` is
// destructed before `self` and `onDomainClosed` would be called on an
// invalid `latch`.
self.invalidate();
}

int DomainManager::locateDomain(DomainSp* domain,
Expand Down Expand Up @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
else if (command.isRemoveValue()) {
const bsl::string& name = command.remove().domain();

// First pass
// First round
if (command.remove().finalize().isNull()) {
DomainSp domainSp;

Expand All @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
return -1; // RETURN
}

// 1. Reject if there's any opened or opening queue
// 1. Reject if there's any open queue request on the fly
// Mark DOMAIN PREREMOVE to block openQueue requests
if (!domainSp->tryRemove()) {
bmqu::MemOutStream os;
os << "Trying to remove the domain '" << name
<< "' while there are queues opened or opening";
<< "' while there are open queue requests on the fly or "
"the domain is shutting down";
result->makeError().message() = os.str();
return -1; // RETURN
}

// 2. Mark DOMAIN PREREMOVE to block openQueue requests
domainSp->removeDomainReset();

// 3. Purge inactive queues
// remove virtual storage; add a record in journal file
// 2. Purge and GC
mqbcmd::DomainResult domainResult;
mqbcmd::ClusterResult clusterResult;
mqbi::Cluster* cluster = domainSp->cluster();

cluster->purgeQueueOnDomain(&clusterResult, name);
cluster->purgeAndGCQueueOnDomain(&clusterResult, name);

if (clusterResult.isErrorValue()) {
result->makeError(clusterResult.error());
Expand All @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
clusterResult.storageResult().purgedQueues().queues();
result->makeDomainResult(domainResult);

// 4. Force GC queues
// unregister Queue from domain;
// remove queue storage from partition
mqbcmd::ClusterResult clusterForceGCResult;
int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name);
if (clusterForceGCResult.isErrorValue()) {
result->makeError(clusterForceGCResult.error());
return -1; // RETURN
}

// 5. Mark DOMAIN REMOVED to accecpt the second pass

// 3. Mark DOMAIN REMOVED to accecpt the second pass
bmqu::SharedResource<DomainManager> self(this);
bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC);

Expand All @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
bmqsys::Time::nowMonotonicClock().addSeconds(
k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE);

rc = latch.timedWait(timeout);
int rc = latch.timedWait(timeout);
if (0 != rc) {
BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in "
<< k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE
<< " seconds. rc: " << rc << ".";
return rc;
}

// 6. Mark DOMAINS REMOVE command first round as complete
domainSp->removeDomainComplete();
// Refer to `DomainManager::stop` to see why we need to invalidate
// `self` explicitly.
self.invalidate();
return rc; // RETURN
}
// Second pass
// Second round
else {
DomainSp domainSp;

Expand All @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,

if (!domainSp->isRemoveComplete()) {
bmqu::MemOutStream os;
os << "First pass of DOMAINS REMOVE '" << name
os << "First round of DOMAINS REMOVE '" << name
<< "' is not completed.";
result->makeError().message() = os.str();
return -1; // RETURN
Expand All @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result,
result->makeSuccess();
return 0; // RETURN
}

return 0;
}

bmqu::MemOutStream os;
Expand Down
66 changes: 31 additions & 35 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result)
storageResult.clusterStorageSummary();
}

int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

dispatcher()->execute(
bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched,
bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched,
this,
result,
domainName),
this);

dispatcher()->synchronize(this);

return 0;
}

void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// executed by the *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this));

// 'true' implies immediate
// Check if there's any live connection to a queue
if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) {
BALL_LOG_ERROR << "Trying to remove the domain '" << domainName
<< "' while there are queues opened or opening";
result->makeError().message() =
"Trying to remove the domain '" + domainName +
"' while there are queues opened or opening";
return; // RETURN
}

// Purge queues on the given domain
mqbcmd::StorageResult storageResult;
d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName);
result->makeStorageResult(storageResult);

if (result->isErrorValue()) {
result->makeError(result->error());
return; // RETURN
}

// GC queues on the given domain
const int rc =
d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName);
if (rc == -1 || rc == -3) {
// TBD: We allow the node to not be an active primary for *any*
// partition; this has to be changed once we allow leader != primary
BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: "
<< rc << ")";
result->makeError().message() = "Failed to execute command (rc: " +
bsl::to_string(rc) + ")";
BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName
<< "' (rc: " << rc << ")";
result->makeError().message() =
"Failed to force GC queues on domain '" + domainName +
"' (rc: " + bsl::to_string(rc) + ")";
}
else {
// Otherwise the command succeeded.
result->makeSuccess();
}
}

void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
{
// exected by *ANY* thread

mqbcmd::StorageResult storageResult;

dispatcher()->execute(
bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain,
d_storageManager_mp.get(),
&storageResult,
domainName),
this);

dispatcher()->synchronize(this);

result->makeStorageResult(storageResult);
}

void Cluster::printClusterStateSummary(bsl::ostream& out,
Expand Down
16 changes: 6 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster,
/// Load the cluster state to the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

/// Executed by dispatcher thread.
void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);
void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result,
const bsl::string& domainName);

// MANIPULATORS
// (virtual: mqbnet::SessionEventProcessor)
Expand Down
17 changes: 2 additions & 15 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out)
loadQueuesInfo(&clusterProxyStatus.queuesInfo());
}

void ClusterProxy::purgeQueueOnDomain(
void ClusterProxy::purgeAndGCQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
bmqu::MemOutStream os;
os << "MockCluster::gcQueueOnDomain not implemented!";
result->makeError().message() = os.str();
}

int ClusterProxy::gcQueueOnDomain(
mqbcmd::ClusterResult* result,
BSLS_ANNOTATION_UNUSED const bsl::string& domainName)
{
// exected by *ANY* thread

bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p);
bmqu::MemOutStream os(&localAllocator);
os << "GC Queue not supported on a Proxy.";
os << "Purge and GC queue not supported on a Proxy.";
result->makeError().message() = os.str();

return 0;
}

// MANIPULATORS
Expand Down
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver,
/// Load the cluster state in the specified `out` object.
void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE;

/// Purge queues in this cluster on a given domain.
void
purgeQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;

/// Force GC queues in this cluster on a given domain.
int gcQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE;
/// Purge and force GC queues in this cluster on a given domain.
void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result,
const bsl::string& domainName)
BSLS_KEYWORD_OVERRIDE;

void getPrimaryNodes(int* rc,
bsl::ostream& errorDescription,
Expand Down
41 changes: 41 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate,
return rc_SUCCESS; // RETURN
}

bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName)
{
// executed by the cluster *DISPATCHER* thread

// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));

const mqbc::ClusterState::DomainStates& domainStates =
d_clusterState_p->domainStates();

DomainStatesCIter domCit = domainStates.find(domainName);

if (domCit == domainStates.end()) {
return false; // RETURN
}

const UriToQueueInfoMap& queuesInfoPerDomain =
domCit->second->queuesInfo();

for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin();
qCit != queuesInfoPerDomain.cend();
++qCit) {
QueueContextMapConstIter queueContextCIt = d_queues.find(
qCit->second->uri());

if (queueContextCIt == d_queues.end()) {
continue;
}

if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 ||
queueContextCIt->second->d_liveQInfo
.d_numHandleCreationsInProgress != 0 ||
queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) {
return true; // RETURN
}
}

return false; // RETURN
}

void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const
{
// executed by the cluster *DISPATCHER* thread
Expand Down
2 changes: 2 additions & 0 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h
Original file line number Diff line number Diff line change
Expand Up @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL
int gcExpiredQueues(bool immediate = false,
const bsl::string& domainName = "");

bool hasActiveQueue(const bsl::string& domainName);

/// Start executing multi-step processing of StopRequest or CLOSING node
/// advisory received from the specified `clusterNode`. In the case of
/// StopRequest the specified `request` references the request; in the
Expand Down
Loading

0 comments on commit 0a2d051

Please sign in to comment.