diff --git a/src/groups/mqb/mqba/mqba_domainmanager.cpp b/src/groups/mqb/mqba/mqba_domainmanager.cpp index ec9bb2470..19c82153a 100644 --- a/src/groups/mqb/mqba/mqba_domainmanager.cpp +++ b/src/groups/mqb/mqba/mqba_domainmanager.cpp @@ -566,17 +566,21 @@ void DomainManager::stop() << k_MAX_WAIT_SECONDS_AT_SHUTDOWN << " seconds while shutting down" << " bmqbrkr. rc: " << rc << "."; - - // Note that 'self' variable will get invalidated when this function - // returns, which will ensure that any pending 'onDomainClosed' - // callbacks are not invoked. So there is no need to explicitly call - // 'self.invalidate()' here. } if (d_domainResolver_mp) { d_domainResolver_mp->stop(); d_domainResolver_mp.clear(); } + + // Notice that this invalidation is necessary. + // Without this explicit call, `self` will be invalidated + // when the function returns, which will ensure that any pending + // `onDomainClosed` callbacks are not invoked. But this is not enough + // since we want to prevent a (tiny) possibility where `latch` is + // destructed before `self` and `onDomainClosed` would be called on an + // invalid `latch`. + self.invalidate(); } int DomainManager::locateDomain(DomainSp* domain, @@ -706,7 +710,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, else if (command.isRemoveValue()) { const bsl::string& name = command.remove().domain(); - // First pass + // First round if (command.remove().finalize().isNull()) { DomainSp domainSp; @@ -717,25 +721,23 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, return -1; // RETURN } - // 1. Reject if there's any opened or opening queue + // 1. Reject if there's any open queue request on the fly + // Mark DOMAIN PREREMOVE to block openQueue requests if (!domainSp->tryRemove()) { bmqu::MemOutStream os; os << "Trying to remove the domain '" << name - << "' while there are queues opened or opening"; + << "' while there are open queue requests on the fly or " + "the domain is shutting down"; result->makeError().message() = os.str(); return -1; // RETURN } - // 2. Mark DOMAIN PREREMOVE to block openQueue requests - domainSp->removeDomainReset(); - - // 3. Purge inactive queues - // remove virtual storage; add a record in journal file + // 2. Purge and GC mqbcmd::DomainResult domainResult; mqbcmd::ClusterResult clusterResult; mqbi::Cluster* cluster = domainSp->cluster(); - cluster->purgeQueueOnDomain(&clusterResult, name); + cluster->purgeAndGCQueueOnDomain(&clusterResult, name); if (clusterResult.isErrorValue()) { result->makeError(clusterResult.error()); @@ -752,18 +754,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, clusterResult.storageResult().purgedQueues().queues(); result->makeDomainResult(domainResult); - // 4. Force GC queues - // unregister Queue from domain; - // remove queue storage from partition - mqbcmd::ClusterResult clusterForceGCResult; - int rc = cluster->gcQueueOnDomain(&clusterForceGCResult, name); - if (clusterForceGCResult.isErrorValue()) { - result->makeError(clusterForceGCResult.error()); - return -1; // RETURN - } - - // 5. Mark DOMAIN REMOVED to accecpt the second pass - + // 3. Mark DOMAIN REMOVED to accecpt the second pass bmqu::SharedResource self(this); bslmt::Latch latch(1, bsls::SystemClockType::e_MONOTONIC); @@ -777,18 +768,19 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, bmqsys::Time::nowMonotonicClock().addSeconds( k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE); - rc = latch.timedWait(timeout); + int rc = latch.timedWait(timeout); if (0 != rc) { BALL_LOG_ERROR << "DOMAINS REMOVE fail to finish in " << k_MAX_WAIT_SECONDS_AT_DOMAIN_REMOVE << " seconds. rc: " << rc << "."; - return rc; } - // 6. Mark DOMAINS REMOVE command first round as complete - domainSp->removeDomainComplete(); + // Refer to `DomainManager::stop` to see why we need to invalidate + // `self` explicitly. + self.invalidate(); + return rc; // RETURN } - // Second pass + // Second round else { DomainSp domainSp; @@ -802,7 +794,7 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, if (!domainSp->isRemoveComplete()) { bmqu::MemOutStream os; - os << "First pass of DOMAINS REMOVE '" << name + os << "First round of DOMAINS REMOVE '" << name << "' is not completed."; result->makeError().message() = os.str(); return -1; // RETURN @@ -823,8 +815,6 @@ int DomainManager::processCommand(mqbcmd::DomainsResult* result, result->makeSuccess(); return 0; // RETURN } - - return 0; } bmqu::MemOutStream os; diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp index 96c5bd6d0..d8915ca4a 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.cpp @@ -3665,65 +3665,61 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* result) storageResult.clusterStorageSummary(); } -int Cluster::gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // exected by *ANY* thread dispatcher()->execute( - bdlf::BindUtil::bind(&Cluster::gcQueueOnDomainDispatched, + bdlf::BindUtil::bind(&Cluster::purgeAndGCQueueOnDomainDispatched, this, result, domainName), this); dispatcher()->synchronize(this); - - return 0; } -void Cluster::gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName) +void Cluster::purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName) { // executed by the *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(this)); - // 'true' implies immediate + // Check if there's any live connection to a queue + if (d_clusterOrchestrator.queueHelper().hasActiveQueue(domainName)) { + BALL_LOG_ERROR << "Trying to remove the domain '" << domainName + << "' while there are queues opened or opening"; + result->makeError().message() = + "Trying to remove the domain '" + domainName + + "' while there are queues opened or opening"; + return; // RETURN + } + + // Purge queues on the given domain + mqbcmd::StorageResult storageResult; + d_storageManager_mp->purgeQueueOnDomain(&storageResult, domainName); + result->makeStorageResult(storageResult); + + if (result->isErrorValue()) { + result->makeError(result->error()); + return; // RETURN + } + + // GC queues on the given domain const int rc = d_clusterOrchestrator.queueHelper().gcExpiredQueues(true, domainName); if (rc == -1 || rc == -3) { // TBD: We allow the node to not be an active primary for *any* // partition; this has to be changed once we allow leader != primary - BALL_LOG_ERROR << "Failed to execute force GC queues command (rc: " - << rc << ")"; - result->makeError().message() = "Failed to execute command (rc: " + - bsl::to_string(rc) + ")"; + BALL_LOG_ERROR << "Failed to force GC queues on domain '" << domainName + << "' (rc: " << rc << ")"; + result->makeError().message() = + "Failed to force GC queues on domain '" + domainName + + "' (rc: " + bsl::to_string(rc) + ")"; } - else { - // Otherwise the command succeeded. - result->makeSuccess(); - } -} - -void Cluster::purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) -{ - // exected by *ANY* thread - - mqbcmd::StorageResult storageResult; - - dispatcher()->execute( - bdlf::BindUtil::bind(&mqbi::StorageManager::purgeQueueOnDomain, - d_storageManager_mp.get(), - &storageResult, - domainName), - this); - - dispatcher()->synchronize(this); - - result->makeStorageResult(storageResult); } void Cluster::printClusterStateSummary(bsl::ostream& out, diff --git a/src/groups/mqb/mqbblp/mqbblp_cluster.h b/src/groups/mqb/mqbblp/mqbblp_cluster.h index 534c8eb78..452efa3e6 100644 --- a/src/groups/mqb/mqbblp/mqbblp_cluster.h +++ b/src/groups/mqb/mqbblp/mqbblp_cluster.h @@ -673,18 +673,14 @@ class Cluster : public mqbi::Cluster, /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; /// Executed by dispatcher thread. - void gcQueueOnDomainDispatched(mqbcmd::ClusterResult* result, - const bsl::string& domainName); + void purgeAndGCQueueOnDomainDispatched(mqbcmd::ClusterResult* result, + const bsl::string& domainName); // MANIPULATORS // (virtual: mqbnet::SessionEventProcessor) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp index 25d608fd6..f290f7e79 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp @@ -1342,27 +1342,14 @@ void ClusterProxy::loadClusterStatus(mqbcmd::ClusterResult* out) loadQueuesInfo(&clusterProxyStatus.queuesInfo()); } -void ClusterProxy::purgeQueueOnDomain( +void ClusterProxy::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); -} - -int ClusterProxy::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - // exected by *ANY* thread - bdlma::LocalSequentialAllocator<256> localAllocator(d_allocator_p); bmqu::MemOutStream os(&localAllocator); - os << "GC Queue not supported on a Proxy."; + os << "Purge and GC queue not supported on a Proxy."; result->makeError().message() = os.str(); - - return 0; } // MANIPULATORS diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h index 096b59130..5393c4c7b 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterproxy.h @@ -539,14 +539,10 @@ class ClusterProxy : public mqbc::ClusterStateObserver, /// Load the cluster state in the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; void getPrimaryNodes(int* rc, bsl::ostream& errorDescription, diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index e088cff81..257670f97 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -6222,6 +6222,47 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate, return rc_SUCCESS; // RETURN } +bool ClusterQueueHelper::hasActiveQueue(const bsl::string& domainName) +{ + // executed by the cluster *DISPATCHER* thread + + // PRECONDITIONS + BSLS_ASSERT_SAFE( + d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); + + const mqbc::ClusterState::DomainStates& domainStates = + d_clusterState_p->domainStates(); + + DomainStatesCIter domCit = domainStates.find(domainName); + + if (domCit == domainStates.end()) { + return false; // RETURN + } + + const UriToQueueInfoMap& queuesInfoPerDomain = + domCit->second->queuesInfo(); + + for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin(); + qCit != queuesInfoPerDomain.cend(); + ++qCit) { + QueueContextMapConstIter queueContextCIt = d_queues.find( + qCit->second->uri()); + + if (queueContextCIt == d_queues.end()) { + continue; + } + + if (queueContextCIt->second->d_liveQInfo.d_inFlight != 0 || + queueContextCIt->second->d_liveQInfo + .d_numHandleCreationsInProgress != 0 || + queueContextCIt->second->d_liveQInfo.d_numQueueHandles != 0) { + return true; // RETURN + } + } + + return false; // RETURN +} + void ClusterQueueHelper::loadQueuesInfo(mqbcmd::StorageContent* out) const { // executed by the cluster *DISPATCHER* thread diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 3907bfb13..256c5449e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -1089,6 +1089,8 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL int gcExpiredQueues(bool immediate = false, const bsl::string& domainName = ""); + bool hasActiveQueue(const bsl::string& domainName); + /// Start executing multi-step processing of StopRequest or CLOSING node /// advisory received from the specified `clusterNode`. In the case of /// StopRequest the specified `request` references the request; in the diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 9955ff7f0..0e907fd1e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -14,6 +14,7 @@ // limitations under the License. // mqbblp_domain.cpp -*-C++-*- +#include #include #include @@ -375,8 +376,7 @@ Domain::Domain(const bsl::string& name, Domain::~Domain() { - BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state || - e_POSTREMOVE == d_state) && + BSLS_ASSERT_SAFE((e_STOPPING == d_state || e_STOPPED == d_state) && "'teardown' must be called before the destructor"); } @@ -485,7 +485,7 @@ int Domain::configure(bsl::ostream& errorDescription, void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) { // PRECONDITIONS - BSLS_ASSERT_SAFE(d_state != e_STOPPING && d_state != e_STOPPED); + BSLS_ASSERT_SAFE(d_state != e_STOPPING); BSLS_ASSERT_SAFE(!d_teardownCb); BSLS_ASSERT_SAFE(teardownCb); @@ -507,7 +507,8 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) if (d_queues.empty()) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -519,7 +520,7 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) void Domain::teardownRemove(const TeardownCb& teardownCb) { - BSLS_ASSERT_SAFE(d_state != e_REMOVING && d_state != e_REMOVED); + // PRECONDITIONS BSLS_ASSERT_SAFE(!d_teardownRemoveCb); BSLS_ASSERT_SAFE(teardownCb); @@ -529,13 +530,13 @@ void Domain::teardownRemove(const TeardownCb& teardownCb) << d_queues.size() << " registered queues."; d_teardownRemoveCb = teardownCb; - d_state = e_REMOVING; d_cluster_sp->unregisterStateObserver(this); if (d_queues.empty()) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; return; // RETURN } @@ -565,8 +566,7 @@ void Domain::openQueue( bmqp_ctrlmsg::Status status; - if (d_state == e_REMOVING || d_state == e_REMOVED || - d_state == e_PREREMOVE || d_state == e_POSTREMOVE) { + if (d_state == e_REMOVING || d_state == e_STOPPED) { status.category() = bmqp_ctrlmsg::StatusCategory::E_REFUSED; status.code() = mqbi::ClusterErrorCode::e_UNKNOWN; status.message() = k_DOMAIN_IS_REMOVING_OR_REMOVED; @@ -731,20 +731,16 @@ void Domain::unregisterQueue(mqbi::Queue* queue) // Refer to note in 'teardown' routine to see why 'd_state' is updated // while 'd_mutex' is acquired. - if (d_state == e_STOPPING) { - BSLS_ASSERT_SAFE(d_teardownCb); - - if (d_queues.empty()) { + if (d_queues.empty()) { + if (d_teardownCb) { d_teardownCb(d_name); - d_state = e_STOPPED; + d_teardownCb = bsl::nullptr_t(); + d_state = e_STOPPED; } - } - else if (d_state == e_REMOVING) { - BSLS_ASSERT_SAFE(d_teardownRemoveCb); - - if (d_queues.empty()) { + if (d_teardownRemoveCb) { d_teardownRemoveCb(d_name); - d_state = e_REMOVED; + d_teardownRemoveCb = bsl::nullptr_t(); + d_state = e_STOPPED; } } } @@ -920,21 +916,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_PREREMOVE; - d_teardownRemoveCb = bsl::nullptr_t(); -} - -void Domain::removeDomainComplete() -{ - bslmt::LockGuard guard(&d_mutex); // LOCK - - d_state = e_POSTREMOVE; -} - // ACCESSORS int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const @@ -1030,34 +1011,29 @@ void Domain::loadRoutingConfiguration( } } -bool Domain::tryRemove() const +bool Domain::tryRemove() { bslmt::LockGuard guard(&d_mutex); // LOCK - if (d_pendingRequests != 0) { + if (d_state == e_STOPPING) { return false; } - // If there's queue in this domain, check to see if there's any active - // handle to it - if (!d_queues.empty()) { - for (QueueMapCIter it = d_queues.begin(); it != d_queues.end(); ++it) { - // Looks like in RootQueueEngine::releaseHandle, queueHandle is - // removed and r/w counts reset (in `proctor.releaseHandle`) before - // substreams are unregistered; should we check substream? - // handle->subStreamInfos().size() == 0 - if (it->second->hasActiveHandle()) { - return false; - } - } + if (d_pendingRequests != 0) { + return false; } + // Reset d_teardownRemoveCb in case the first round of + // DOMAINS REMOVE fails and we want to call it again + d_state = e_REMOVING; + d_teardownRemoveCb = bsl::nullptr_t(); + return true; } bool Domain::isRemoveComplete() const { - return d_state == e_POSTREMOVE; + return d_state == e_STOPPED; } } // close package namespace diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 093e5b5e1..e66f4b512 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -112,14 +112,9 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, e_STARTED = 0, e_STOPPING = 1, e_STOPPED = 2, - // Used for teardownRemove function - e_REMOVING = 3, - e_REMOVED = 4, - // Used as flags to indicate - // the start and finish of - // the first round for DOMAINS REMOVE - e_PREREMOVE = 5, - e_POSTREMOVE = 6, + // indicate the start of the + // first round of DOMAINS REMOVE + e_REMOVING = 3 }; private: @@ -337,13 +332,6 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -381,8 +369,8 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, const BSLS_KEYWORD_OVERRIDE; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + /// queues opened or opening, or if the domain is closed or closing. + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbblp/mqbblp_queue.h b/src/groups/mqb/mqbblp/mqbblp_queue.h index 4b81866c4..b3c073445 100644 --- a/src/groups/mqb/mqbblp/mqbblp_queue.h +++ b/src/groups/mqb/mqbblp/mqbblp_queue.h @@ -448,9 +448,6 @@ class Queue BSLS_CPP11_FINAL : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; }; // ============================================================================ @@ -602,11 +599,6 @@ inline bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -inline bool Queue::hasActiveHandle() const -{ - return d_state.handleCatalog().handlesCount() != 0; -} - } // close package namespace } // close enterprise namespace diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 472e44d32..6da890132 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -421,8 +421,9 @@ bool ClusterState::unassignQueue(const bmqt::Uri& uri) } domIt->second->queuesInfo().erase(cit); + if (domIt->second->queuesInfo().empty()) { - domIt->second->setDomain(NULL); + d_domainStates.erase(domIt); } // POSTCONDITIONS diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index cfc8f716c..db877b2ba 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -208,12 +208,30 @@ void IncoreClusterStateLeger_LogIdGenerator::generateLogId( // ------------------------------ // PRIVATE MANIPULATORS -int IncoreClusterStateLedger::cleanupLog( - BSLS_ANNOTATION_UNUSED const bsl::string& logPath) +int IncoreClusterStateLedger::cleanupLog(const bsl::string& logPath) { - // TODO: Implement + enum RcEnum { + // Value for the various RC error categories + rc_SUCCESS = 0 // Success + , + rc_REMOVE_FILE_FAILURE = -1 // Fail to remove log file + }; - return 0; + const bsl::string& cluster = d_clusterData_p->cluster().name(); + + const int rc = bdls::FilesystemUtil::remove(logPath); + if (0 != rc) { + BMQTSK_ALARMLOG_ALARM("FILE_IO") + << cluster << ": Failed to remove [" << logPath + << "] file during CSL file cleanup, rc: " << rc + << BMQTSK_ALARMLOG_END; + return rc_REMOVE_FILE_FAILURE; // RETURN + } + + BALL_LOG_INFO << cluster << ": Removed file [" << logPath + << "] during CSL file cleanup"; + + return rc_SUCCESS; } int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, @@ -1194,6 +1212,7 @@ IncoreClusterStateLedger::IncoreClusterStateLedger( .setReserveOnDisk(partitionCfg.preallocate()) .setPrefaultPages(partitionCfg.prefaultPages()) .setLogIdGenerator(logIdGenerator) + .setScheduler(&clusterData->scheduler()) .setLogFactory(logFactory) .setExtractLogIdCallback(ClusterStateLedgerUtil::extractLogId) .setValidateLogCallback(ClusterStateLedgerUtil::validateLog) diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 894642502..96b536766 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -371,13 +371,9 @@ class Cluster : public DispatcherClient { /// Load the cluster state to the specified `out` object. virtual void loadClusterStatus(mqbcmd::ClusterResult* out) = 0; - /// Purge queues in this cluster on a given domain. - virtual void purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; - - /// Force GC queues in this cluster on a given domain. - virtual int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) = 0; + /// Purge and force GC queues in this cluster on a given domain. + virtual void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) = 0; // ACCESSORS diff --git a/src/groups/mqb/mqbi/mqbi_domain.h b/src/groups/mqb/mqbi/mqbi_domain.h index a80c22de9..791bb0e91 100644 --- a/src/groups/mqb/mqbi/mqbi_domain.h +++ b/src/groups/mqb/mqbi/mqbi_domain.h @@ -185,13 +185,6 @@ class Domain { virtual int processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) = 0; - /// Mark the state of domain to be PREREMOVE - virtual void removeDomainReset() = 0; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - virtual void removeDomainComplete() = 0; - // ACCESSORS /// Load into the specified `out` the queue corresponding to the @@ -229,8 +222,8 @@ class Domain { bmqp_ctrlmsg::RoutingConfiguration* config) const = 0; /// Check the state of the queues in this domain, return false if there's - /// queues opened or opening. - virtual bool tryRemove() const = 0; + /// queues opened or opening, or if the domain is closed or closing. + virtual bool tryRemove() = 0; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbi/mqbi_queue.h b/src/groups/mqb/mqbi/mqbi_queue.h index a8077172f..cd3b7dbf6 100644 --- a/src/groups/mqb/mqbi/mqbi_queue.h +++ b/src/groups/mqb/mqbi/mqbi_queue.h @@ -956,9 +956,6 @@ class Queue : public DispatcherClient { /// Return the Schema Leaner associated with this queue. virtual bmqp::SchemaLearner& schemaLearner() const = 0; - - /// Return true if there's queue handle and they're actively used. - virtual bool hasActiveHandle() const = 0; }; // ======================== diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp index 82ffc442d..75c6adabe 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.cpp @@ -489,25 +489,15 @@ void Cluster::loadClusterStatus(mqbcmd::ClusterResult* out) out->makeClusterStatus(); } -void Cluster::purgeQueueOnDomain( +void Cluster::purgeAndGCQueueOnDomain( mqbcmd::ClusterResult* result, BSLS_ANNOTATION_UNUSED const bsl::string& domainName) { bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; + os << "MockCluster::purgeAndGCQueueOnDomain not implemented!"; result->makeError().message() = os.str(); } -int Cluster::gcQueueOnDomain( - mqbcmd::ClusterResult* result, - BSLS_ANNOTATION_UNUSED const bsl::string& domainName) -{ - bmqu::MemOutStream os; - os << "MockCluster::gcQueueOnDomain not implemented!"; - result->makeError().message() = os.str(); - return -1; -} - // MANIPULATORS // (specific to mqbmock::Cluster) Cluster& Cluster::_setIsClusterMember(bool value) diff --git a/src/groups/mqb/mqbmock/mqbmock_cluster.h b/src/groups/mqb/mqbmock/mqbmock_cluster.h index eb36702e5..098f6e6be 100644 --- a/src/groups/mqb/mqbmock/mqbmock_cluster.h +++ b/src/groups/mqb/mqbmock/mqbmock_cluster.h @@ -402,14 +402,10 @@ class Cluster : public mqbi::Cluster { /// Load the cluster state to the specified `out` object. void loadClusterStatus(mqbcmd::ClusterResult* out) BSLS_KEYWORD_OVERRIDE; - /// Purge queues in this cluster on a given domain. - void - purgeQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; - - /// Force GC queues in this cluster on a given domain. - int gcQueueOnDomain(mqbcmd::ClusterResult* result, - const bsl::string& domainName) BSLS_KEYWORD_OVERRIDE; + /// Purge and force GC queues in this cluster on a given domain. + void purgeAndGCQueueOnDomain(mqbcmd::ClusterResult* result, + const bsl::string& domainName) + BSLS_KEYWORD_OVERRIDE; // MANIPULATORS // (specific to mqbmock::Cluster) diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.cpp b/src/groups/mqb/mqbmock/mqbmock_domain.cpp index c67ddcd18..57bf4209a 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_domain.cpp @@ -155,16 +155,6 @@ int Domain::processCommand(mqbcmd::DomainResult* result, return -1; } -void Domain::removeDomainReset() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - -void Domain::removeDomainComplete() -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); -} - int Domain::lookupQueue(bsl::shared_ptr* out, const bmqt::Uri& uri) const { @@ -236,7 +226,7 @@ void Domain::loadRoutingConfiguration( // NOTHING } -bool Domain::tryRemove() const +bool Domain::tryRemove() { BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); return true; diff --git a/src/groups/mqb/mqbmock/mqbmock_domain.h b/src/groups/mqb/mqbmock/mqbmock_domain.h index 7f4fe059c..900c08008 100644 --- a/src/groups/mqb/mqbmock/mqbmock_domain.h +++ b/src/groups/mqb/mqbmock/mqbmock_domain.h @@ -211,13 +211,6 @@ class Domain : public mqbi::Domain { processCommand(mqbcmd::DomainResult* result, const mqbcmd::DomainCommand& command) BSLS_KEYWORD_OVERRIDE; - /// Mark the state of domain to be PREREMOVE - void removeDomainReset() BSLS_KEYWORD_OVERRIDE; - - /// Mark the state of domain to be POSTREMOVE, - /// indicating the first round of DOMAINS REMOVE is completed - void removeDomainComplete() BSLS_KEYWORD_OVERRIDE; - /// Load into the specified `out`, if `out` is not 0, the queue /// corresponding to the specified `uri`, if found. Return 0 on success, /// or a non-zero return code otherwise. @@ -254,7 +247,7 @@ class Domain : public mqbi::Domain { /// Check the state of the queues in this domain, return false if there's /// queues opened or opening. - bool tryRemove() const BSLS_KEYWORD_OVERRIDE; + bool tryRemove() BSLS_KEYWORD_OVERRIDE; /// Check the state of the domain, return true if the first round /// of DOMAINS REMOVE is completed diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.cpp b/src/groups/mqb/mqbmock/mqbmock_queue.cpp index 0c6963c03..98eaf2ca2 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.cpp +++ b/src/groups/mqb/mqbmock/mqbmock_queue.cpp @@ -527,13 +527,6 @@ bmqp::SchemaLearner& Queue::schemaLearner() const return d_schemaLearner; } -bool Queue::hasActiveHandle() const -{ - BSLS_ASSERT_SAFE(false && "NOT IMPLEMENTED!"); - - return false; -} - // ------------------- // class HandleFactory // ------------------- diff --git a/src/groups/mqb/mqbmock/mqbmock_queue.h b/src/groups/mqb/mqbmock/mqbmock_queue.h index 932f9555a..1c80082eb 100644 --- a/src/groups/mqb/mqbmock/mqbmock_queue.h +++ b/src/groups/mqb/mqbmock/mqbmock_queue.h @@ -430,9 +430,6 @@ class Queue : public mqbi::Queue { /// Return the Schema Leaner associated with this queue. bmqp::SchemaLearner& schemaLearner() const BSLS_KEYWORD_OVERRIDE; - /// Return true if there's queue handle and they're actively used. - bool hasActiveHandle() const BSLS_KEYWORD_OVERRIDE; - // ACCESSORS // (specific to mqbi::MockQueue) }; diff --git a/src/groups/mqb/mqbsi/mqbsi_ledger.cpp b/src/groups/mqb/mqbsi/mqbsi_ledger.cpp index 54ae66a60..df68451cd 100644 --- a/src/groups/mqb/mqbsi/mqbsi_ledger.cpp +++ b/src/groups/mqb/mqbsi/mqbsi_ledger.cpp @@ -185,6 +185,7 @@ LedgerConfig::LedgerConfig(bslma::Allocator* allocator) , d_keepOldLogs(false) , d_logFactory_sp() , d_logIdGenerator_sp() +, d_scheduler_p() , d_extractLogIdCallback(bsl::allocator_arg, allocator) , d_validateLogCallback(bsl::allocator_arg, allocator) , d_rolloverCallback(bsl::allocator_arg, allocator) @@ -203,6 +204,7 @@ LedgerConfig::LedgerConfig(const LedgerConfig& other, , d_keepOldLogs(other.d_keepOldLogs) , d_logFactory_sp(other.d_logFactory_sp) , d_logIdGenerator_sp(other.d_logIdGenerator_sp) +, d_scheduler_p(other.d_scheduler_p) , d_extractLogIdCallback(bsl::allocator_arg, allocator, other.d_extractLogIdCallback) @@ -266,6 +268,16 @@ LedgerConfig& LedgerConfig::setLogIdGenerator( return *this; } +LedgerConfig& LedgerConfig::setScheduler(bdlmt::EventScheduler* value) +{ + // PRECONDITIONS + BSLS_ASSERT_SAFE(value); + BSLS_ASSERT_SAFE(value->clockType() == bsls::SystemClockType::e_MONOTONIC); + + d_scheduler_p = value; + return *this; +} + LedgerConfig& LedgerConfig::setExtractLogIdCallback(const ExtractLogIdCb& value) { @@ -332,6 +344,11 @@ mqbsi::LogIdGenerator* LedgerConfig::logIdGenerator() const return d_logIdGenerator_sp.get(); } +bdlmt::EventScheduler* LedgerConfig::scheduler() const +{ + return d_scheduler_p; +} + const LedgerConfig::ExtractLogIdCb& LedgerConfig::extractLogIdCallback() const { return d_extractLogIdCallback; diff --git a/src/groups/mqb/mqbsi/mqbsi_ledger.h b/src/groups/mqb/mqbsi/mqbsi_ledger.h index dd564d7d9..bab984259 100644 --- a/src/groups/mqb/mqbsi/mqbsi_ledger.h +++ b/src/groups/mqb/mqbsi/mqbsi_ledger.h @@ -44,6 +44,7 @@ // BDE #include #include +#include #include #include #include @@ -55,6 +56,12 @@ #include namespace BloombergLP { + +// FORWARD DECLARATIONS +namespace bdlmt { +class EventScheduler; +} + namespace mqbsi { // ===================== @@ -294,6 +301,8 @@ class LedgerConfig { bsl::shared_ptr d_logIdGenerator_sp; // Pointer to generator of log ids. + bdlmt::EventScheduler* d_scheduler_p; + ExtractLogIdCb d_extractLogIdCallback; // Callback invoked to extract the ID // of a log. @@ -335,6 +344,7 @@ class LedgerConfig { setLogFactory(const bsl::shared_ptr& value); LedgerConfig& setLogIdGenerator(const bsl::shared_ptr& value); + LedgerConfig& setScheduler(bdlmt::EventScheduler* value); LedgerConfig& setExtractLogIdCallback(const ExtractLogIdCb& value); LedgerConfig& setValidateLogCallback(const ValidateLogCb& value); LedgerConfig& setRolloverCallback(const OnRolloverCb& value); @@ -352,6 +362,7 @@ class LedgerConfig { bool keepOldLogs() const; mqbsi::LogFactory* logFactory() const; mqbsi::LogIdGenerator* logIdGenerator() const; + bdlmt::EventScheduler* scheduler() const; const ExtractLogIdCb& extractLogIdCallback() const; const ValidateLogCb& validateLogCallback() const; const OnRolloverCb& rolloverCallback() const; diff --git a/src/groups/mqb/mqbsl/mqbsl_ledger.cpp b/src/groups/mqb/mqbsl/mqbsl_ledger.cpp index b4596a317..e3bff9380 100644 --- a/src/groups/mqb/mqbsl/mqbsl_ledger.cpp +++ b/src/groups/mqb/mqbsl/mqbsl_ledger.cpp @@ -20,6 +20,10 @@ // MQB #include +// BMQ +#include +#include + // BDE #include #include @@ -275,16 +279,12 @@ int Ledger::rollOver() return rc; // RETURN } - // If not keeping old logs, close the log and invoke the cleanup callback + // If not keeping old logs, enqueue an event in the scheduler thread + // to be executed right away to close and cleanup the log file. if (!d_config.keepOldLogs()) { - rc = lastLog->close(); - if (rc != LogOpResult::e_SUCCESS) { - return rc * 100 + LedgerOpResult::e_LOG_CLOSE_FAILURE; // RETURN - } - rc = d_config.cleanupCallback()(lastLog->logConfig().location()); - if (rc != 0) { - return LedgerOpResult::e_LOG_CLEANUP_FAILURE; // RETURN - } + d_config.scheduler()->scheduleEvent( + bmqsys::Time::nowMonotonicClock(), + bdlf::BindUtil::bind(&Ledger::closeAndCleanup, this, lastLog)); } return LedgerOpResult::e_SUCCESS; @@ -334,6 +334,31 @@ int Ledger::writeRecordImpl(LedgerRecordId* recordId, return LedgerOpResult::e_SUCCESS; } +void Ledger::closeAndCleanup(const LogSp& log) +{ + const bsls::Types::Int64 start = bmqsys::Time::highResolutionTimer(); + const bsl::string& logPath = log->logConfig().location(); + + int rc = log->close(); + if (rc != LogOpResult::e_SUCCESS) { + BALL_LOG_ERROR << "Failed to close the log " << logPath + << ", rc: " << rc; + return; // RETURN + } + + rc = d_config.cleanupCallback()(logPath); + if (rc != 0) { + BALL_LOG_ERROR << "Failed to clean up the log " << logPath + << ", rc: " << rc; + return; // RETURN + } + + const bsls::Types::Int64 end = bmqsys::Time::highResolutionTimer(); + + BALL_LOG_INFO << "Log closed and cleaned up. Time taken: " + << bmqu::PrintUtil::prettyTimeInterval(end - start); +} + // CREATORS Ledger::Ledger(const mqbsi::LedgerConfig& config, bslma::Allocator* allocator) : d_isFirstOpen(true) diff --git a/src/groups/mqb/mqbsl/mqbsl_ledger.h b/src/groups/mqb/mqbsl/mqbsl_ledger.h index 863981d69..1ed260c5d 100644 --- a/src/groups/mqb/mqbsl/mqbsl_ledger.h +++ b/src/groups/mqb/mqbsl/mqbsl_ledger.h @@ -151,7 +151,7 @@ class Ledger BSLS_KEYWORD_FINAL : public mqbsi::Ledger { /// Insert the specified `log` to the list and map of logs maintained by /// this object and return true if successful and false otherwise (e.g., /// a log with the same `logId` already exists). - bool insert(const bsl::shared_ptr& log); + bool insert(const LogSp& log); /// Create a new log, add it to the ledger, and populate the specified /// `logPtr`. Return true if was able to successfully add a new log, @@ -185,6 +185,9 @@ class Ledger BSLS_KEYWORD_FINAL : public mqbsi::Ledger { OFFSET offset, int length); + /// Close and cleanup a log. Scheduled to run in seperate thread. + void closeAndCleanup(const LogSp& log); + // PRIVATE ACCESSORS /// Return true if the current log being written to can accommodate diff --git a/src/integration-tests/test_csl_cleanup.py b/src/integration-tests/test_csl_cleanup.py new file mode 100644 index 000000000..1195177fd --- /dev/null +++ b/src/integration-tests/test_csl_cleanup.py @@ -0,0 +1,60 @@ +# Copyright 2025 Bloomberg Finance L.P. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Testing cleanup of CSL file. +""" +import blazingmq.dev.it.testconstants as tc +from blazingmq.dev.it.fixtures import ( # pylint: disable=unused-import + Cluster, + cluster, + order, + tweak, +) +import glob + +pytestmark = order(2) +timeout = 10 + + +@tweak.cluster.partition_config.max_qlist_file_size(2000) +def test_csl_cleanup(cluster: Cluster): + leader = cluster.last_known_leader + proxy = next(cluster.proxy_cycle()) + + producer = proxy.create_client("producer") + + cluster._logger.info("Start to write to clients") + + csl_files_before_rollover = glob.glob( + str(cluster.work_dir.joinpath(leader.name, "storage")) + "/*csl*" + ) + assert len(csl_files_before_rollover) == 1 + + # opening 10 queues would cause a rollover + for i in range(0, 10): + producer.open( + f"bmq://{tc.DOMAIN_PRIORITY_SC}/q{i}", flags=["write,ack"], succeed=True + ) + producer.close(f"bmq://{tc.DOMAIN_PRIORITY_SC}/q{i}", succeed=True) + + csl_files_after_rollover = glob.glob( + str(cluster.work_dir.joinpath(leader.name, "storage")) + "/*csl*" + ) + + assert leader.outputs_regex(r"Log closed and cleaned up. Time taken", timeout) + + assert len(csl_files_after_rollover) == 1 + assert csl_files_before_rollover[0] != csl_files_after_rollover[0] diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 57358f457..68deabe9c 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -50,47 +50,6 @@ def write_messages(proxy, uri, n_msgs=5, do_confirm=True): consumer.close(uri, succeed=True) -def test_remove_domain_with_queue_closed(cluster: Cluster): - """ - send DOMAINS REMOVE command after both queue closed - command should succeed - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - # producer and consumer open the queue, - # post and confirm messages and both close - write_messages(proxy, tc.URI_PRIORITY) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "Purged 0 message(s)" in res - - -def test_remove_domain_with_queue_open(cluster: Cluster): - """ - send DOMAINS REMOVE command with a queue still open - command should fail - """ - proxies = cluster.proxy_cycle() - proxy = next(proxies) - - uri = tc.URI_PRIORITY - producer = proxy.create_client("producer") - producer.open(uri, flags=["write"], succeed=True) - producer.post(uri, [f"msg{i}" for i in range(5)], succeed=True, wait_ack=True) - - # send remove domain admin command - admin = AdminClient() - leader = cluster.last_known_leader - admin.connect(leader.config.host, int(leader.config.port)) - res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" in res - - def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): """ send DOMAINS REMOVE command when the cluster is not healthy @@ -235,16 +194,14 @@ def test_open_queue_after_remove_domain(cluster: Cluster): assert producer.open(uri, flags=["write"], block=True) != Client.e_SUCCESS -def test_remove_domain_with_queue_open(cluster: Cluster): +def test_remove_domain_with_producer_queue_open(cluster: Cluster): """ - issue DOMAINS REMOVE command when both producer and consumer close connections, - both open, or one of them has the connection open + issue DOMAINS REMOVE command when consumer closes connection """ proxies = cluster.proxy_cycle() proxy = next(proxies) # producer produces messages and consumer confirms - # then both close connections producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) @@ -259,41 +216,148 @@ def test_remove_domain_with_queue_open(cluster: Cluster): ) consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + # consumer closes connection + consumer.close(tc.URI_PRIORITY, succeed=True) + # send admin command - # when both producer and consumer open admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close producer and send the command again + +def test_remove_domain_with_consumer_queue_open(cluster: Cluster): + """ + issue DOMAINS REMOVE command when producer closes connection + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # producer closes connection producer.close(tc.URI_PRIORITY, succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # open producer and close consumer and send the command again + +def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster): + """ + issue DOMAINS REMOVE command when both producer and consumer have queue open + and both have queue closed + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert ( + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" + in res + ) + + # close connections and try again + producer.close(tc.URI_PRIORITY, succeed=True) consumer.close(tc.URI_PRIORITY, succeed=True) + + res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") + assert "Purged 0 message(s)" in res + + +def test_try_open_removed_domain(cluster: Cluster): + """ + 1. producer send messages and consumer confirms + 2. send DOMAINS REMOVE admin command + 3. close both producer and consumer + 4. try open both, and they should all fail + """ + proxies = cluster.proxy_cycle() + proxy = next(proxies) + + # producer produces messages and consumer confirms + producer = proxy.create_client("producer") + producer.open(tc.URI_PRIORITY, flags=["write"], succeed=True) + + consumer = proxy.create_client("consumer") + consumer.open(tc.URI_PRIORITY, flags=["read"], succeed=True) + + producer.post( + tc.URI_PRIORITY, + [f"msg{i}" for i in range(3)], + succeed=True, + wait_ack=True, + ) + consumer.confirm(tc.URI_PRIORITY, "*", succeed=True) + + # send admin command + # when both producer and consumer open + admin = AdminClient() + leader = cluster.last_known_leader + admin.connect(leader.config.host, int(leader.config.port)) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") assert ( - f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues open" + f"Trying to remove the domain '{tc.DOMAIN_PRIORITY}' while there are queues opened or opening" in res ) - # close both and send the command again + # close producer and send the command again producer.close(tc.URI_PRIORITY, succeed=True) + consumer.close(tc.URI_PRIORITY, succeed=True) res = admin.send_admin(f"DOMAINS REMOVE {tc.DOMAIN_PRIORITY}") - assert "while there are queues open" not in res + assert "Purged 0 message(s)" in res + + # try open producer and consumer again + assert producer.open(tc.URI_PRIORITY, flags=["write"], block=True) < 0 + assert consumer.open(tc.URI_PRIORITY, flags=["read"], block=True) < 0 def test_remove_domain_with_unconfirmed_message(cluster: Cluster): + """ + issue DOMAINS REMOVE command with unconfirmed messages + """ proxies = cluster.proxy_cycle() proxy = next(proxies)