From 4eabe37728d911f05d04eb57a62e90f0fade9975 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Tue, 28 Jan 2025 10:58:42 -0500 Subject: [PATCH 1/2] mqba::Application: Temporarily disable admin command re-routing (#586) Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqba/mqba_application.cpp | 11 +++- .../test_admin_command_routing.py | 12 +++++ src/integration-tests/test_admin_res_log.py | 6 ++- src/integration-tests/test_domain_remove.py | 54 +++++++++++++++++++ .../test_reconfigure_domains.py | 5 +- ...qprometheus_prometheusstatconsumer_test.py | 3 +- src/python/blazingmq/dev/fuzztest/__init__.py | 4 +- .../blazingmq/util/test/logging_test.py | 3 +- 8 files changed, 89 insertions(+), 9 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index d93d2aceca..0933e9cc76 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -832,13 +832,22 @@ int Application::processCommand(const bslstl::StringRef& source, mqbcmd::InternalResult cmdResult; + // TODO This boolean is added to address failure during in-progress broker + // rollout from v92.8 -> v93.7. If 93.7 broker tries to re-route an admin + // command to a 92.8 broker, 92.8 broker will fail to handle. Therefore, we + // turn off re-routing until all nodes have been upgraded to 93.7. + // + // Also, once reroute is re-enabled, revert the integration test changes + // introduced in https://github.com/bloomberg/blazingmq/pull/586. + const bool neverReroute = true; + // Note that routed commands should never route again to another node. // This should always be the "end of the road" for a command. // Note that this logic is important to prevent a "deadlock" scenario // where two nodes are waiting on a response from each other to continue. // Currently commands from reroutes are executed on their own dedicated // thread. - if (fromReroute) { + if (fromReroute || neverReroute) { if (0 != executeCommand(command, &cmdResult)) { // early exit (caused by "dangerous" command) return rc_EARLY_EXIT; // RETURN diff --git a/src/integration-tests/test_admin_command_routing.py b/src/integration-tests/test_admin_command_routing.py index f593a304e1..844cc5f9fb 100644 --- a/src/integration-tests/test_admin_command_routing.py +++ b/src/integration-tests/test_admin_command_routing.py @@ -50,6 +50,9 @@ def test_primary_rerouting(multi_node: Cluster) -> None: - CLUSTERS CLUSTER STORAGE PARTITION ENABLE/DISABLE """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + admin = AdminClient() # find the first node which is not a known leader @@ -129,6 +132,9 @@ def test_cluster_rerouting(multi_node: Cluster) -> None: - CLUSTERS CLUSTER STATE ELECTOR GET_ALL """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + admin = AdminClient() node = multi_node.nodes()[0] @@ -197,6 +203,9 @@ def test_multi_response_encoding(multi_node: Cluster): """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + def is_compact(json_str: str) -> bool: return " " not in json_str @@ -263,6 +272,9 @@ def test_concurrently_routed_commands(multi_node: Cluster): Stage 2: Issue command in parallel Stage 3: Expect """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + # Connect a client to each node in the cluster clients = [] for node in multi_node.nodes(): diff --git a/src/integration-tests/test_admin_res_log.py b/src/integration-tests/test_admin_res_log.py index 57e9d71a89..2a0b907b20 100644 --- a/src/integration-tests/test_admin_res_log.py +++ b/src/integration-tests/test_admin_res_log.py @@ -132,7 +132,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster): res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}") success_count = res.split().count("SUCCESS") - assert success_count == num_nodes + # TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled + assert success_count == 1 assert leader.capture("Send response message", TIMEOUT) assert not member1.capture("Send response message", TIMEOUT) @@ -145,7 +146,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster): res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}") success_count = res.split().count("SUCCESS") - assert success_count == num_nodes + # TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled + assert success_count == 1 assert member1.capture("Send response message", TIMEOUT) assert not leader.capture("Send response message", TIMEOUT) diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 68deabe9c6..fe1c48b099 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -56,6 +56,9 @@ def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): the command fails with a routing error resend the command and it should succeed """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -103,6 +106,9 @@ def test_remove_different_domain(cluster: Cluster): send DOMAINS REMOVE command to remove a different domain the original one should be intact """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() # open queue in PRIORITY domain but remove PRIORITY_SC @@ -161,6 +167,9 @@ def test_open_queue_after_remove_domain(cluster: Cluster): try to open a queue after the first round of DOMAINS REMOVE command and it should fail since we started remove but not fully finished yet """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() next(proxies) # eastp proxy = next(proxies) # westp @@ -198,6 +207,9 @@ def test_remove_domain_with_producer_queue_open(cluster: Cluster): """ issue DOMAINS REMOVE command when consumer closes connection """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -234,6 +246,9 @@ def test_remove_domain_with_consumer_queue_open(cluster: Cluster): """ issue DOMAINS REMOVE command when producer closes connection """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -271,6 +286,9 @@ def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster): issue DOMAINS REMOVE command when both producer and consumer have queue open and both have queue closed """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -314,6 +332,9 @@ def test_try_open_removed_domain(cluster: Cluster): 3. close both producer and consumer 4. try open both, and they should all fail """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -358,6 +379,9 @@ def test_remove_domain_with_unconfirmed_message(cluster: Cluster): """ issue DOMAINS REMOVE command with unconfirmed messages """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -387,6 +411,9 @@ def test_remove_domain_not_on_disk(cluster: Cluster): """ issue DOMAINS REMOVE command when the domain is not on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -399,6 +426,9 @@ def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): """ issue DOMAINS REMOVE command when the domain is not on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -412,6 +442,9 @@ def test_send_to_replicas(multi_node: Cluster): send DOMAINS REMOVE admin command to replica instead of primary replica will boardcast to all the nodes including the primary """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -464,6 +497,9 @@ def test_second_round(cluster: Cluster): a queue and the removed domain can be opened after finalizing and when the domain exists on the disk """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -525,6 +561,9 @@ def test_purge_then_remove(cluster: Cluster): """ purge queue then remove """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY @@ -549,6 +588,9 @@ def test_remove_without_connection(cluster: Cluster): """ issue DOMAINS REMOVE command without any connection to a domain on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -577,6 +619,9 @@ def test_remove_then_restart(cluster: Cluster): 6. produce 3 messages 7. completely remove the domain """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -642,6 +687,9 @@ def test_remove_with_reconfig(cluster: Cluster): 5. call reconfigure to load the domain 6. produce 3 messages """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -697,6 +745,9 @@ def test_remove_cache_remains(cluster: Cluster): 5. second round of DOMAINS REMOVE 6. try to open a queue and nothing in the cache """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY @@ -741,6 +792,9 @@ def test_remove_cache_cleaned(cluster: Cluster): 5. second round of DOMAINS REMOVE 6. try to open a queue and nothing in the cache """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 24967e6eee..58cf38ec1d 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -67,7 +67,10 @@ def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs: int) -> bo tc.DOMAIN_PRIORITY_SC ].definition.parameters.storage.domain_limits.messages = max_num_msgs return cluster.reconfigure_domain( - tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True + # TODO: Set `leader_only=True` when admin command routing is re-enabled + tc.DOMAIN_PRIORITY_SC, + leader_only=False, + succeed=True, ) # Verify that reconfiguring domain message limits works as expected. diff --git a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py index 67a54e26bd..fbcbabd406 100755 --- a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py +++ b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py @@ -40,7 +40,8 @@ -m {all,pull,push}, --mode {all,pull,push} prometheus mode --no-docker don't run Prometheus in docker, assume it is running on localhost - """ + +""" __test__ = False # This is not for pytest. import argparse diff --git a/src/python/blazingmq/dev/fuzztest/__init__.py b/src/python/blazingmq/dev/fuzztest/__init__.py index 644a0bd3dd..f0e84864d3 100644 --- a/src/python/blazingmq/dev/fuzztest/__init__.py +++ b/src/python/blazingmq/dev/fuzztest/__init__.py @@ -253,7 +253,7 @@ def make_put_message() -> BoofuzzSequence: math=lambda x: x // NumBytes.WORD, ) - guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE" + guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce" message_components = [ boofuzz.BitField(name="options_size", width=3 * NumBits.BYTE), @@ -302,7 +302,7 @@ def make_confirm_message() -> BoofuzzSequence: Constructs boofuzz structures representing ConfirmMessage. """ - guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE" + guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce" message_components = [ boofuzz.BitField( diff --git a/src/python/blazingmq/util/test/logging_test.py b/src/python/blazingmq/util/test/logging_test.py index e5f1d828e8..258ca2c34b 100644 --- a/src/python/blazingmq/util/test/logging_test.py +++ b/src/python/blazingmq/util/test/logging_test.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Test suite for blazingmq.util.logging. -""" +"""Test suite for blazingmq.util.logging.""" from logging import DEBUG, INFO, getLogger from unittest.mock import patch From c9ee151449f75058a105387e462089c459138d85 Mon Sep 17 00:00:00 2001 From: Emelia Lei <69409730+emelialei88@users.noreply.github.com> Date: Tue, 28 Jan 2025 16:37:23 -0500 Subject: [PATCH 2/2] Fix[MQB]: fix pointer dereference in ClusterUtil::validateState (#588) * Fix clusterutil print dereference bug Add helper functions to print ClusterStateQueueInfo::State Signed-off-by: Emelia Lei * Change print functions to be member for ClusterStateQueueInfo::State Signed-off-by: Emelia Lei --------- Signed-off-by: Emelia Lei --- src/groups/mqb/mqbc/mqbc_clusterstate.cpp | 66 ++- src/groups/mqb/mqbc/mqbc_clusterstate.h | 100 +++-- src/groups/mqb/mqbc/mqbc_clusterutil.cpp | 51 +-- src/groups/mqb/mqbc/mqbc_clusterutil.t.cpp | 382 ++++++++++++++++++ .../mqbc/mqbc_incoreclusterstateledger.cpp | 2 +- 5 files changed, 551 insertions(+), 50 deletions(-) create mode 100644 src/groups/mqb/mqbc/mqbc_clusterutil.t.cpp diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp index 6da890132c..838e5d156d 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.cpp @@ -53,6 +53,67 @@ bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream, return stream; } +bsl::ostream& +ClusterStateQueueInfo::State::print(bsl::ostream& stream, + ClusterStateQueueInfo::State::Enum value, + int level, + int spacesPerLevel) +{ + if (stream.bad()) { + return stream; // RETURN + } + + bdlb::Print::indent(stream, level, spacesPerLevel); + stream << ClusterStateQueueInfo::State::toAscii(value); + + if (spacesPerLevel >= 0) { + stream << '\n'; + } + + return stream; +} + +const char* +ClusterStateQueueInfo::State::toAscii(ClusterStateQueueInfo::State::Enum value) +{ +#define CASE(X) \ + case k_##X: return #X; + + switch (value) { + CASE(NONE) + CASE(ASSIGNING) + CASE(ASSIGNED) + CASE(UNASSIGNING) + default: return "(* NONE *)"; + } + +#undef CASE +} + +bool ClusterStateQueueInfo::State::fromAscii( + ClusterStateQueueInfo::State::Enum* out, + const bslstl::StringRef& str) +{ +#define CHECKVALUE(M) \ + if (bdlb::String::areEqualCaseless( \ + toAscii(ClusterStateQueueInfo::State::k_##M), \ + str.data(), \ + static_cast(str.length()))) { \ + *out = ClusterStateQueueInfo::State::k_##M; \ + return true; \ + } + + CHECKVALUE(NONE) + CHECKVALUE(ASSIGNING) + CHECKVALUE(ASSIGNED) + CHECKVALUE(UNASSIGNING) + + // Invalid string + return false; + +#undef CHECKVALUE +} + // -------------------------- // class ClusterStateObserver // -------------------------- @@ -352,7 +413,8 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first; } else { - if (queueIt->second->state() == ClusterStateQueueInfo::k_ASSIGNED) { + if (queueIt->second->state() == + ClusterStateQueueInfo::State::k_ASSIGNED) { // See 'ClusterStateManager::processQueueAssignmentAdvisory' which // insists on re-assigning isNewAssignment = false; @@ -364,7 +426,7 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri, } // Set the queue as assigned - queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNED); + queueIt->second->setState(ClusterStateQueueInfo::State::k_ASSIGNED); updatePartitionQueueMapped(partitionId, 1); diff --git a/src/groups/mqb/mqbc/mqbc_clusterstate.h b/src/groups/mqb/mqbc/mqbc_clusterstate.h index a6aaf967ee..facee974dd 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterstate.h +++ b/src/groups/mqb/mqbc/mqbc_clusterstate.h @@ -162,16 +162,55 @@ class ClusterStateQueueInfo { typedef mqbi::ClusterStateManager::AppInfos AppInfos; typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter; - enum State { - // State of Assignment. In CSL, assignment and unassignment are async, - // hence the need for k_ASSIGNING/k_UNASSIGNING - // Assigning following unassigning is also supported. - // On Replica, the only possible state is k_ASSIGNED. - - k_NONE = 0, - k_ASSIGNING = -1, - k_ASSIGNED = -2, - k_UNASSIGNING = -3 + struct State { + public: + enum Enum { + // State of Assignment. In CSL, assignment and unassignment are + // async, + // hence the need for k_ASSIGNING/k_UNASSIGNING + // Assigning following unassigning is also supported. + // On Replica, the only possible state is k_ASSIGNED. + + k_NONE = 0, + k_ASSIGNING = -1, + k_ASSIGNED = -2, + k_UNASSIGNING = -3 + }; + + /// Write the string representation of the specified enumeration + /// `value` + /// to the specified output `stream`, and return a reference to + /// `stream`. Optionally specify an initial indentation `level`, whose + /// absolute value is incremented recursively for nested objects. If + /// `level` is specified, optionally specify `spacesPerLevel`, whose + /// absolute value indicates the number of spaces per indentation level + /// for this and all of its nested objects. If `level` is negative, + /// suppress indentation of the first line. If `spacesPerLevel` is + /// negative, format the entire output on one line, suppressing all but + /// the initial indentation (as governed by `level`). See `toAscii` + /// for what constitutes the string representation of a + /// `ClusterStateQueueInfo::State` value. + static bsl::ostream& print(bsl::ostream& stream, + ClusterStateQueueInfo::State::Enum value, + int level = 0, + int spacesPerLevel = 4); + + /// Return the non-modifiable string representation corresponding to + /// the specified enumeration `value`, if it exists, and a unique + /// (error) string otherwise. The string representation of `value` + /// matches its corresponding enumerator name with the `e_` prefix + /// elided. Note that specifying a `value` that does not match any of + /// the enumerators will result in a string representation that is + /// distinct from any of those corresponding to the enumerators, but is + /// otherwise unspecified. + static const char* toAscii(ClusterStateQueueInfo::State::Enum value); + + /// Return true and fills the specified `out` with the enum value + /// corresponding to the specified `str`, if valid, or return false and + /// leave `out` untouched if `str` doesn't correspond to any value of + /// the enum. + static bool fromAscii(ClusterStateQueueInfo::State::Enum* out, + const bslstl::StringRef& str); }; private: @@ -192,7 +231,7 @@ class ClusterStateQueueInfo { // // TBD: Should also be added to mqbconfm::Domain - State d_state; + State::Enum d_state; // Flag indicating whether this queue is in the process of // being assigned / unassigned. @@ -228,7 +267,7 @@ class ClusterStateQueueInfo { /// Set the corresponding member to the specified `value` and return a /// reference offering modifiable access to this object. - void setState(State value); + void setState(State::Enum value); /// Get a modifiable reference to this object's appIdInfos. AppInfos& appInfos(); @@ -245,8 +284,8 @@ class ClusterStateQueueInfo { const AppInfos& appInfos() const; /// Return the value of the corresponding member of this object. - State state() const; - bool pendingUnassignment() const; + State::Enum state() const; + bool pendingUnassignment() const; /// Format this object to the specified output `stream` at the (absolute /// value of) the optionally specified indentation `level` and return a @@ -269,6 +308,11 @@ class ClusterStateQueueInfo { bsl::ostream& operator<<(bsl::ostream& stream, const ClusterStateQueueInfo& rhs); +/// Format the specified `value` to the specified output `stream` and return +/// a reference to the modifiable `stream`. +bsl::ostream& operator<<(bsl::ostream& stream, + ClusterStateQueueInfo::State::Enum value); + /// Return `true` if the specified `rhs` object contains the value of the /// same type as contained in the specified `lhs` object and the value /// itself is the same in both objects, return false otherwise. @@ -805,7 +849,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( , d_key() , d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID) , d_appInfos(allocator) -, d_state(k_NONE) +, d_state(State::k_NONE) { // NOTHING } @@ -820,7 +864,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo( , d_key(key) , d_partitionId(partitionId) , d_appInfos(appIdInfos, allocator) -, d_state(k_NONE) +, d_state(State::k_NONE) { // NOTHING } @@ -839,7 +883,8 @@ inline ClusterStateQueueInfo& ClusterStateQueueInfo::setPartitionId(int value) return *this; } -inline void ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value) +inline void +ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State::Enum value) { // k_NONE // | | @@ -895,14 +940,14 @@ ClusterStateQueueInfo::appInfos() const return d_appInfos; } -inline ClusterStateQueueInfo::State ClusterStateQueueInfo::state() const +inline ClusterStateQueueInfo::State::Enum ClusterStateQueueInfo::state() const { return d_state; } inline bool ClusterStateQueueInfo::pendingUnassignment() const { - return d_state == k_UNASSIGNING; + return d_state == State::k_UNASSIGNING; } // ------------------ @@ -1099,8 +1144,9 @@ ClusterState::getAssigned(const bmqt::Uri& uri) const { ClusterStateQueueInfo* queue = getQueueInfo(uri); - return queue ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED ? queue - : 0 + return queue ? queue->state() == ClusterStateQueueInfo::State::k_ASSIGNED + ? queue + : 0 : 0; } @@ -1110,8 +1156,9 @@ ClusterState::getAssignedOrUnassigning(const bmqt::Uri& uri) const ClusterStateQueueInfo* queue = getQueueInfo(uri); return queue - ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED || - queue->state() == ClusterStateQueueInfo::k_UNASSIGNING + ? queue->state() == ClusterStateQueueInfo::State::k_ASSIGNED || + queue->state() == + ClusterStateQueueInfo::State::k_UNASSIGNING ? queue : 0 : 0; @@ -1172,6 +1219,13 @@ inline bsl::ostream& mqbc::operator<<(bsl::ostream& stream, return rhs.print(stream, 0, -1); } +inline bsl::ostream& +mqbc::operator<<(bsl::ostream& stream, + mqbc::ClusterStateQueueInfo::State::Enum value) +{ + return mqbc::ClusterStateQueueInfo::State::print(stream, value, 0, -1); +} + inline bool mqbc::operator==(const ClusterStateQueueInfo& lhs, const ClusterStateQueueInfo& rhs) { diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp index c3988cd51f..f6ab06f6e3 100644 --- a/src/groups/mqb/mqbc/mqbc_clusterutil.cpp +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.cpp @@ -344,7 +344,8 @@ void ClusterUtil::setPendingUnassignment(ClusterState* clusterState, if (iter != clusterState->domainStates().cend()) { UriToQueueInfoMapIter qiter = iter->second->queuesInfo().find(uri); if (qiter != iter->second->queuesInfo().cend()) { - qiter->second->setState(ClusterStateQueueInfo::k_UNASSIGNING); + qiter->second->setState( + ClusterStateQueueInfo::State::k_UNASSIGNING); } } } @@ -853,7 +854,8 @@ ClusterUtil::assignQueue(ClusterState* clusterState, k_ASSIGNMENT_WHILE_UNAVAILABLE; // RETURN } - ClusterStateQueueInfo::State previousState = ClusterStateQueueInfo::k_NONE; + ClusterStateQueueInfo::State::Enum previousState = + ClusterStateQueueInfo::State::k_NONE; ClusterState::DomainStates& domainStates = clusterState->domainStates(); DomainStatesIter domIt = domainStates.find(uri.qualifiedDomain()); @@ -882,13 +884,13 @@ ClusterUtil::assignQueue(ClusterState* clusterState, previousState = queueIt->second->state(); } - if (previousState == ClusterStateQueueInfo::k_ASSIGNING) { + if (previousState == ClusterStateQueueInfo::State::k_ASSIGNING) { BALL_LOG_INFO << cluster->description() << "queueAssignment of '" << uri << "' is already pending."; return QueueAssignmentResult::k_ASSIGNMENT_OK; } - if (previousState == ClusterStateQueueInfo::k_ASSIGNED) { + if (previousState == ClusterStateQueueInfo::State::k_ASSIGNED) { BALL_LOG_INFO << cluster->description() << "queueAssignment of '" << uri << "' is already done."; return QueueAssignmentResult::k_ASSIGNMENT_OK; @@ -966,12 +968,12 @@ ClusterUtil::assignQueue(ClusterState* clusterState, } // Set the queue as assigning (no longer pending unassignment) - queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNING); + queueIt->second->setState(ClusterStateQueueInfo::State::k_ASSIGNING); BALL_LOG_INFO << "Cluster [" << cluster->description() << "]: Transition: " << previousState << " -> " - << ClusterStateQueueInfo::k_ASSIGNING << " for [" << uri - << "]."; + << ClusterStateQueueInfo::State::k_ASSIGNING << " for [" + << uri << "]."; // Populate 'queueAssignmentAdvisory' bdlma::LocalSequentialAllocator<1024> localAllocator(allocator); @@ -1133,7 +1135,7 @@ void ClusterUtil::registerQueueInfo(ClusterState* clusterState, bmqu::Printer printer(&appInfos); BALL_LOG_INFO << cluster->description() - << ": Queue assigned: " << "[uri: " << uri + << ": Queue assigned: [uri: " << uri << ", queueKey: " << queueKey << ", partitionId: " << partitionId << ", appInfos: " << printer << "]"; @@ -1409,8 +1411,8 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData, queueUpdate.domain() = qinfoCit->second->uri().qualifiedDomain(); BSLS_ASSERT_SAFE(queueUpdate.domain() == domain->name()); - bool appIdFound = false; - const AppInfos& appInfos = qinfoCit->second->appInfos(); + bool appIdFound = false; + const AppInfos& appInfos = qinfoCit->second->appInfos(); for (AppInfosCIter appInfoCit = appInfos.cbegin(); appInfoCit != appInfos.cend(); ++appInfoCit) { @@ -1675,11 +1677,11 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, // Validate partition information bsl::vector incorrectPartitions; for (size_t pid = 0; pid < state.partitions().size(); ++pid) { - const ClusterStatePartitionInfo& stateInfo = state.partitions()[pid]; + const ClusterStatePartitionInfo& stateInfo = state.partition(pid); BSLS_ASSERT_SAFE(stateInfo.partitionId() == pid); - const ClusterStatePartitionInfo& referenceInfo = - reference.partitions()[pid]; + const ClusterStatePartitionInfo& referenceInfo = reference.partition( + pid); BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid); if (stateInfo.primaryLeaseId() != referenceInfo.primaryLeaseId()) { // Partition information mismatch. Note that we don't compare @@ -1708,14 +1710,14 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); out << "Partition [" << citer->partitionId() - << "]: " << " primaryLeaseId: " << citer->primaryLeaseId() + << "]: primaryLeaseId: " << citer->primaryLeaseId() << ", primaryNodeId: " << citer->primaryNodeId(); } bdlb::Print::newlineAndIndent(out, level); out << "--------------------------------"; bdlb::Print::newlineAndIndent(out, level); - out << "Partition Infos In Cluster State:"; + out << "Partition Infos In Cluster State :"; bdlb::Print::newlineAndIndent(out, level); out << "--------------------------------"; for (size_t pid = 0; pid < state.partitions().size(); ++pid) { @@ -1723,8 +1725,8 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, reference.partitions()[pid]; BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid); bdlb::Print::newlineAndIndent(out, level + 1); - out << "Partition [" << pid << "]: " << " primaryLeaseId: " - << referenceInfo.primaryLeaseId() + out << "Partition [" << pid + << "]: primaryLeaseId: " << referenceInfo.primaryLeaseId() << ", primaryNodeId: " << referenceInfo.primaryNodeId(); } } @@ -1782,7 +1784,7 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, bdlb::Print::newlineAndIndent(out, level); out << "-----------------"; bdlb::Print::newlineAndIndent(out, level); - out << "Incorrect Queues:"; + out << "Incorrect Queues :"; bdlb::Print::newlineAndIndent(out, level); out << "-----------------"; for (bsl::vector, @@ -1791,9 +1793,9 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, citer != incorrectQueues.cend(); ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << citer->first; + out << *citer->first; bdlb::Print::newlineAndIndent(out, level + 1); - out << "(correct queue info) " << citer->second; + out << "(correct queue info) " << *citer->second; } } @@ -1809,7 +1811,7 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, citer != extraQueues.cend(); ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << *citer; + out << **citer; } } @@ -1862,7 +1864,7 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, citer != missingQueues.cend(); ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << *citer; + out << **citer; } } @@ -1885,7 +1887,7 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription, citer != domCit->second->queuesInfo().cend(); ++citer) { bdlb::Print::newlineAndIndent(out, level + 1); - out << citer->second; + out << *citer->second; } } } @@ -2169,7 +2171,8 @@ void ClusterUtil::loadQueuesInfo(bsl::vector* out, for (UriToQueueInfoMapCIter qCit = queuesInfoPerDomain.cbegin(); qCit != queuesInfoPerDomain.cend(); ++qCit) { - if (qCit->second->state() != ClusterStateQueueInfo::k_ASSIGNED) { + if (qCit->second->state() != + ClusterStateQueueInfo::State::k_ASSIGNED) { continue; // CONTINUE } diff --git a/src/groups/mqb/mqbc/mqbc_clusterutil.t.cpp b/src/groups/mqb/mqbc/mqbc_clusterutil.t.cpp new file mode 100644 index 0000000000..28a2fd6be7 --- /dev/null +++ b/src/groups/mqb/mqbc/mqbc_clusterutil.t.cpp @@ -0,0 +1,382 @@ +// Copyright 2025 Bloomberg Finance L.P. +// SPDX-License-Identifier: Apache-2.0 +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// mqbc_clusterutil.t.cpp -*-C++-*- + +// MQB +#include +#include +#include + +// BDE +#include +#include +#include + +// TEST DRIVER +#include + +// CONVENIENCE +using namespace BloombergLP; +using namespace bsl; + +// CLASSES +// ============= +// struct Tester +// ============= + +struct Tester { + private: + // DATA + bdlbb::PooledBlobBufferFactory d_bufferFactory; + mqbmock::Cluster d_cluster; + bslma::Allocator* d_allocator_p; + + public: + // CREATORS + Tester(bslma::Allocator* allocator = bmqtst::TestHelperUtil::allocator()) + : d_bufferFactory(256, allocator) + , d_cluster(&d_bufferFactory, allocator) + , d_allocator_p(allocator) + { + } + + mqbi::Cluster* cluster() { return &d_cluster; } + + bslma::Allocator* allocator() const { return d_allocator_p; } + + mqbc::ClusterState::DomainStateSp createDomainState() + { + mqbc::ClusterState::DomainStateSp domainState; + domainState.createInplace(d_allocator_p, d_allocator_p); + return domainState; + } + + mqbc::ClusterState::QueueInfoSp + createQueueInfoSp(const bsl::string& uriString, + const mqbu::StorageKey& key, + int partitionId, + const mqbc::ClusterState::AppInfos& appIdInfos) + { + bmqt::Uri uri(uriString, d_allocator_p); + mqbc::ClusterState::QueueInfoSp queueInfo; + queueInfo.createInplace(d_allocator_p, + uri, + key, + partitionId, + appIdInfos, + d_allocator_p); + return queueInfo; + } +}; + +/// This class provides the mock cluster and other components necessary to +/// test the cluster state manager in isolation, as well as some helper +/// methods. + +// ============================================================================ +// TESTS +// ---------------------------------------------------------------------------- + +static void test1_validateState() +// ------------------------------------------------------------------------ +// VALIDATE STATE +// +// Concerns: +// Ensure proper behavior of 'validateState' method. +// +// Testing: +// validateState(...) +// ------------------------------------------------------------------------ +{ + bmqtst::TestHelper::printTestName("VALIDATE STATE"); + + Tester tester; + + // We need to generate two different states and make sure we have the + // expected outputs + mqbc::ClusterState original(tester.cluster(), 5, tester.allocator()); + mqbc::ClusterState reference(tester.cluster(), 5, tester.allocator()); + + // 0. Generate different and same primary lease Id + original.setPartitionPrimary(0, 10, 0); + reference.setPartitionPrimary(0, 9, 0); + original.setPartitionPrimary(1, 20, 0); + reference.setPartitionPrimary(1, 20, 0); + + // 1. Generate extraQueues from an extra domain + bsl::string domainExtraDomain = "domain.extra.domain"; + original.domainStates().emplace(domainExtraDomain, + tester.createDomainState()); + + // 2. Generate extraQueues from a corrrect domain + bsl::string domainExtraQueue = "domain.extra.queue"; + bsl::string queueExtraQueue = "bmq://" + domainExtraQueue + "/qqq"; + bsl::string keyExtraQueue = "extra.queue"; + + mqbu::StorageKey extraQueueKey(mqbu::StorageKey::BinaryRepresentation(), + keyExtraQueue.data()); + mqbc::ClusterState::QueueInfoSp extraQueueInfoSp = + tester.createQueueInfoSp(queueExtraQueue, + extraQueueKey, + 2, + mqbc::ClusterState::AppInfos()); + + mqbc::ClusterState::DomainStateSp extraQueueDomainStateSp = + tester.createDomainState(); + mqbc::ClusterState::DomainStateSp extraQueueDomainStateRefSp = + tester.createDomainState(); + extraQueueDomainStateSp->queuesInfo().emplace(queueExtraQueue, + extraQueueInfoSp); + + original.domainStates().emplace(domainExtraQueue, extraQueueDomainStateSp); + reference.domainStates().emplace(domainExtraQueue, + extraQueueDomainStateRefSp); + + // 3. Generate incorrect queues + bsl::string domainIncorrectQueue = "domain.incorrect.queue"; + bsl::string queueIncorrectQueue = "bmq://" + domainIncorrectQueue + "/qqq"; + bsl::string keyIncorrectQueue = "incorrect.queue"; + bsl::string keyIncorrectQueueRef = "reference.incorrect.queue"; + + // origin + mqbu::StorageKey incorrectQueueKey( + mqbu::StorageKey::BinaryRepresentation(), + keyIncorrectQueue.data()); + mqbc::ClusterState::QueueInfoSp incorrectQueueInfoSp = + tester.createQueueInfoSp(queueIncorrectQueue, + incorrectQueueKey, + 3, + mqbc::ClusterState::AppInfos()); + + mqbc::ClusterState::DomainStateSp incorrectQueueDomainStateSp = + tester.createDomainState(); + incorrectQueueDomainStateSp->queuesInfo().emplace(queueIncorrectQueue, + incorrectQueueInfoSp); + + original.domainStates().emplace(domainIncorrectQueue, + incorrectQueueDomainStateSp); + + // reference + mqbu::StorageKey incorrectQueueKeyRef( + mqbu::StorageKey::BinaryRepresentation(), + keyIncorrectQueueRef.data()); + mqbc::ClusterState::QueueInfoSp incorrectQueueInfoRefSp = + tester.createQueueInfoSp(queueIncorrectQueue, + incorrectQueueKeyRef, + 3, + mqbc::ClusterState::AppInfos()); + + mqbc::ClusterState::DomainStateSp incorrectQueueDomainStateRefSp = + tester.createDomainState(); + incorrectQueueDomainStateRefSp->queuesInfo().emplace( + queueIncorrectQueue, + incorrectQueueInfoRefSp); + + reference.domainStates().emplace(domainIncorrectQueue, + incorrectQueueDomainStateRefSp); + + // 4. Generate a missing queue + bsl::string domainMissingQueue = "domain.missing.queue"; + bsl::string queueMissingQueue = "bmq://" + domainMissingQueue + "/qqq"; + bsl::string keyMissingQueue = "missing.queue"; + + mqbu::StorageKey missingQueueKey(mqbu::StorageKey::BinaryRepresentation(), + keyMissingQueue.data()); + mqbc::ClusterState::QueueInfoSp missingQueueInfoSp = + tester.createQueueInfoSp(queueMissingQueue, + missingQueueKey, + 4, + mqbc::ClusterState::AppInfos()); + + mqbc::ClusterState::DomainStateSp missingQueueDomainStateSp = + tester.createDomainState(); + mqbc::ClusterState::DomainStateSp missingQueueDomainStateRefSp = + tester.createDomainState(); + missingQueueDomainStateRefSp->queuesInfo().emplace(queueMissingQueue, + missingQueueInfoSp); + + original.domainStates().emplace(domainMissingQueue, + missingQueueDomainStateSp); + reference.domainStates().emplace(domainMissingQueue, + missingQueueDomainStateRefSp); + + // 5. Generate a correct queue + bsl::string domainCorrectQueue = "domain.correct.queue"; + bsl::string queueCorrectQueue = "bmq://" + domainCorrectQueue + "/qqq"; + bsl::string keyCorrectQueue = "correct.queue"; + mqbu::StorageKey correctQueueKey(mqbu::StorageKey::BinaryRepresentation(), + keyCorrectQueue.data()); + mqbc::ClusterState::QueueInfoSp correctQueueInfoSp = + tester.createQueueInfoSp(queueCorrectQueue, + correctQueueKey, + 5, + mqbc::ClusterState::AppInfos()); + mqbc::ClusterState::DomainStateSp correctQueueDomainStateSp = + tester.createDomainState(); + mqbc::ClusterState::DomainStateSp correctQueueDomainStateRefSp = + tester.createDomainState(); + correctQueueDomainStateSp->queuesInfo().emplace(queueCorrectQueue, + correctQueueInfoSp); + correctQueueDomainStateRefSp->queuesInfo().emplace(queueCorrectQueue, + correctQueueInfoSp); + original.domainStates().emplace(domainCorrectQueue, + correctQueueDomainStateSp); + reference.domainStates().emplace(domainCorrectQueue, + correctQueueDomainStateRefSp); + + // validate state + bmqu::MemOutStream errorDescription; + const int rc = mqbc::ClusterUtil::validateState(errorDescription, + original, + reference); + BMQTST_ASSERT_NE(rc, 0); + + bmqu::MemOutStream out; + const int level = 0; + + bdlb::Print::newlineAndIndent(out, level); + out << "---------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Incorrect Partition Infos :"; + bdlb::Print::newlineAndIndent(out, level); + out << "---------------------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [0]: primaryLeaseId: 10, primaryNodeId: -1"; + + bdlb::Print::newlineAndIndent(out, level); + out << "--------------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Partition Infos In Cluster State :"; + bdlb::Print::newlineAndIndent(out, level); + out << "--------------------------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [0]: primaryLeaseId: 9, primaryNodeId: -1"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [1]: primaryLeaseId: 20, primaryNodeId: -1"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [2]: primaryLeaseId: 0, primaryNodeId: -1"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [3]: primaryLeaseId: 0, primaryNodeId: -1"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "Partition [4]: primaryLeaseId: 0, primaryNodeId: -1"; + + bdlb::Print::newlineAndIndent(out, level); + out << "-----------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Incorrect Queues :"; + bdlb::Print::newlineAndIndent(out, level); + out << "-----------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueIncorrectQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyIncorrectQueue.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 3 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "(correct queue info) [ uri = " << queueIncorrectQueue + << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyIncorrectQueueRef.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 3 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + + bdlb::Print::newlineAndIndent(out, level); + out << "--------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Extra queues :"; + bdlb::Print::newlineAndIndent(out, level); + out << "--------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueExtraQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyExtraQueue.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 2 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + + bdlb::Print::newlineAndIndent(out, level); + out << "----------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "Missing queues :"; + bdlb::Print::newlineAndIndent(out, level); + out << "----------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueMissingQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyMissingQueue.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 4 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + + bdlb::Print::newlineAndIndent(out, level); + out << "-------------------------"; + bdlb::Print::newlineAndIndent(out, level); + out << "QUEUES IN CLUSTER STATE :"; + bdlb::Print::newlineAndIndent(out, level); + out << "-------------------------"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueCorrectQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyCorrectQueue.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 5 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueMissingQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyMissingQueue.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 4 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + bdlb::Print::newlineAndIndent(out, level + 1); + out << "[ uri = " << queueIncorrectQueue << " queueKey = "; + bdlb::Print::singleLineHexDump(out, + keyIncorrectQueueRef.begin(), + mqbu::StorageKey::e_KEY_LENGTH_BINARY); + out << " partitionId = 3 appIdInfos = [ ] " + "stateOfAssignment = NONE ]"; + + BMQTST_ASSERT_EQ(errorDescription.str(), out.str()); +} + +// ============================================================================ +// MAIN PROGRAM +// ---------------------------------------------------------------------------- + +int main(int argc, char* argv[]) +{ + TEST_PROLOG(bmqtst::TestHelper::e_DEFAULT); + + bmqt::UriParser::initialize(bmqtst::TestHelperUtil::allocator()); + + switch (_testCase) { + case 0: + case 1: test1_validateState(); break; + default: { + cerr << "WARNING: CASE '" << _testCase << "' NOT FOUND." << endl; + bmqtst::TestHelperUtil::testStatus() = -1; + } break; + } + + bmqt::UriParser::shutdown(); + + TEST_EPILOG(bmqtst::TestHelper::e_CHECK_GBL_ALLOC); + // Can't ensure no default memory is allocated because + // 'bdlmt::EventSchedulerTestTimeSource' inside 'mqbmock::Cluster' uses + // the default allocator in its constructor. +} diff --git a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp index db877b2ba8..e75b163a4a 100644 --- a/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp +++ b/src/groups/mqb/mqbc/mqbc_incoreclusterstateledger.cpp @@ -341,7 +341,7 @@ int IncoreClusterStateLedger::onLogRolloverCb(const mqbu::StorageKey& oldLogId, ++citer) { const mqbc::ClusterState::QueueInfoSp& infoSp = citer->second; - if (infoSp->state() == ClusterStateQueueInfo::k_ASSIGNED) { + if (infoSp->state() == ClusterStateQueueInfo::State::k_ASSIGNED) { bmqp_ctrlmsg::QueueInfo queueInfo; infoSp->key().loadBinary(&queueInfo.key()); queueInfo.uri() = infoSp->uri().asString();