From 4eabe37728d911f05d04eb57a62e90f0fade9975 Mon Sep 17 00:00:00 2001 From: Yuan Jing Vincent Yan Date: Tue, 28 Jan 2025 10:58:42 -0500 Subject: [PATCH] mqba::Application: Temporarily disable admin command re-routing (#586) Signed-off-by: Yuan Jing Vincent Yan --- src/groups/mqb/mqba/mqba_application.cpp | 11 +++- .../test_admin_command_routing.py | 12 +++++ src/integration-tests/test_admin_res_log.py | 6 ++- src/integration-tests/test_domain_remove.py | 54 +++++++++++++++++++ .../test_reconfigure_domains.py | 5 +- ...qprometheus_prometheusstatconsumer_test.py | 3 +- src/python/blazingmq/dev/fuzztest/__init__.py | 4 +- .../blazingmq/util/test/logging_test.py | 3 +- 8 files changed, 89 insertions(+), 9 deletions(-) diff --git a/src/groups/mqb/mqba/mqba_application.cpp b/src/groups/mqb/mqba/mqba_application.cpp index d93d2aceca..0933e9cc76 100644 --- a/src/groups/mqb/mqba/mqba_application.cpp +++ b/src/groups/mqb/mqba/mqba_application.cpp @@ -832,13 +832,22 @@ int Application::processCommand(const bslstl::StringRef& source, mqbcmd::InternalResult cmdResult; + // TODO This boolean is added to address failure during in-progress broker + // rollout from v92.8 -> v93.7. If 93.7 broker tries to re-route an admin + // command to a 92.8 broker, 92.8 broker will fail to handle. Therefore, we + // turn off re-routing until all nodes have been upgraded to 93.7. + // + // Also, once reroute is re-enabled, revert the integration test changes + // introduced in https://github.com/bloomberg/blazingmq/pull/586. + const bool neverReroute = true; + // Note that routed commands should never route again to another node. // This should always be the "end of the road" for a command. // Note that this logic is important to prevent a "deadlock" scenario // where two nodes are waiting on a response from each other to continue. // Currently commands from reroutes are executed on their own dedicated // thread. - if (fromReroute) { + if (fromReroute || neverReroute) { if (0 != executeCommand(command, &cmdResult)) { // early exit (caused by "dangerous" command) return rc_EARLY_EXIT; // RETURN diff --git a/src/integration-tests/test_admin_command_routing.py b/src/integration-tests/test_admin_command_routing.py index f593a304e1..844cc5f9fb 100644 --- a/src/integration-tests/test_admin_command_routing.py +++ b/src/integration-tests/test_admin_command_routing.py @@ -50,6 +50,9 @@ def test_primary_rerouting(multi_node: Cluster) -> None: - CLUSTERS CLUSTER STORAGE PARTITION ENABLE/DISABLE """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + admin = AdminClient() # find the first node which is not a known leader @@ -129,6 +132,9 @@ def test_cluster_rerouting(multi_node: Cluster) -> None: - CLUSTERS CLUSTER STATE ELECTOR GET_ALL """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + admin = AdminClient() node = multi_node.nodes()[0] @@ -197,6 +203,9 @@ def test_multi_response_encoding(multi_node: Cluster): """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + def is_compact(json_str: str) -> bool: return " " not in json_str @@ -263,6 +272,9 @@ def test_concurrently_routed_commands(multi_node: Cluster): Stage 2: Issue command in parallel Stage 3: Expect """ + # TODO Skip admin command routing tests until admin command routing is re-enabled + return + # Connect a client to each node in the cluster clients = [] for node in multi_node.nodes(): diff --git a/src/integration-tests/test_admin_res_log.py b/src/integration-tests/test_admin_res_log.py index 57e9d71a89..2a0b907b20 100644 --- a/src/integration-tests/test_admin_res_log.py +++ b/src/integration-tests/test_admin_res_log.py @@ -132,7 +132,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster): res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}") success_count = res.split().count("SUCCESS") - assert success_count == num_nodes + # TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled + assert success_count == 1 assert leader.capture("Send response message", TIMEOUT) assert not member1.capture("Send response message", TIMEOUT) @@ -145,7 +146,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster): res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}") success_count = res.split().count("SUCCESS") - assert success_count == num_nodes + # TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled + assert success_count == 1 assert member1.capture("Send response message", TIMEOUT) assert not leader.capture("Send response message", TIMEOUT) diff --git a/src/integration-tests/test_domain_remove.py b/src/integration-tests/test_domain_remove.py index 68deabe9c6..fe1c48b099 100644 --- a/src/integration-tests/test_domain_remove.py +++ b/src/integration-tests/test_domain_remove.py @@ -56,6 +56,9 @@ def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster): the command fails with a routing error resend the command and it should succeed """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -103,6 +106,9 @@ def test_remove_different_domain(cluster: Cluster): send DOMAINS REMOVE command to remove a different domain the original one should be intact """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() # open queue in PRIORITY domain but remove PRIORITY_SC @@ -161,6 +167,9 @@ def test_open_queue_after_remove_domain(cluster: Cluster): try to open a queue after the first round of DOMAINS REMOVE command and it should fail since we started remove but not fully finished yet """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() next(proxies) # eastp proxy = next(proxies) # westp @@ -198,6 +207,9 @@ def test_remove_domain_with_producer_queue_open(cluster: Cluster): """ issue DOMAINS REMOVE command when consumer closes connection """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -234,6 +246,9 @@ def test_remove_domain_with_consumer_queue_open(cluster: Cluster): """ issue DOMAINS REMOVE command when producer closes connection """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -271,6 +286,9 @@ def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster): issue DOMAINS REMOVE command when both producer and consumer have queue open and both have queue closed """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -314,6 +332,9 @@ def test_try_open_removed_domain(cluster: Cluster): 3. close both producer and consumer 4. try open both, and they should all fail """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -358,6 +379,9 @@ def test_remove_domain_with_unconfirmed_message(cluster: Cluster): """ issue DOMAINS REMOVE command with unconfirmed messages """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -387,6 +411,9 @@ def test_remove_domain_not_on_disk(cluster: Cluster): """ issue DOMAINS REMOVE command when the domain is not on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -399,6 +426,9 @@ def test_remove_domain_on_disk_not_in_cache(cluster: Cluster): """ issue DOMAINS REMOVE command when the domain is not on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -412,6 +442,9 @@ def test_send_to_replicas(multi_node: Cluster): send DOMAINS REMOVE admin command to replica instead of primary replica will boardcast to all the nodes including the primary """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = multi_node.proxy_cycle() proxy = next(proxies) @@ -464,6 +497,9 @@ def test_second_round(cluster: Cluster): a queue and the removed domain can be opened after finalizing and when the domain exists on the disk """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -525,6 +561,9 @@ def test_purge_then_remove(cluster: Cluster): """ purge queue then remove """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY @@ -549,6 +588,9 @@ def test_remove_without_connection(cluster: Cluster): """ issue DOMAINS REMOVE command without any connection to a domain on disk """ + # TODO Skip this test until admin command routing is re-enabled + return + admin = AdminClient() leader = cluster.last_known_leader admin.connect(leader.config.host, int(leader.config.port)) @@ -577,6 +619,9 @@ def test_remove_then_restart(cluster: Cluster): 6. produce 3 messages 7. completely remove the domain """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -642,6 +687,9 @@ def test_remove_with_reconfig(cluster: Cluster): 5. call reconfigure to load the domain 6. produce 3 messages """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) @@ -697,6 +745,9 @@ def test_remove_cache_remains(cluster: Cluster): 5. second round of DOMAINS REMOVE 6. try to open a queue and nothing in the cache """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY @@ -741,6 +792,9 @@ def test_remove_cache_cleaned(cluster: Cluster): 5. second round of DOMAINS REMOVE 6. try to open a queue and nothing in the cache """ + # TODO Skip this test until admin command routing is re-enabled + return + proxies = cluster.proxy_cycle() proxy = next(proxies) uri = tc.URI_PRIORITY diff --git a/src/integration-tests/test_reconfigure_domains.py b/src/integration-tests/test_reconfigure_domains.py index 24967e6eee..58cf38ec1d 100644 --- a/src/integration-tests/test_reconfigure_domains.py +++ b/src/integration-tests/test_reconfigure_domains.py @@ -67,7 +67,10 @@ def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs: int) -> bo tc.DOMAIN_PRIORITY_SC ].definition.parameters.storage.domain_limits.messages = max_num_msgs return cluster.reconfigure_domain( - tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True + # TODO: Set `leader_only=True` when admin command routing is re-enabled + tc.DOMAIN_PRIORITY_SC, + leader_only=False, + succeed=True, ) # Verify that reconfiguring domain message limits works as expected. diff --git a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py index 67a54e26bd..fbcbabd406 100755 --- a/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py +++ b/src/plugins/bmqprometheus/tests/bmqprometheus_prometheusstatconsumer_test.py @@ -40,7 +40,8 @@ -m {all,pull,push}, --mode {all,pull,push} prometheus mode --no-docker don't run Prometheus in docker, assume it is running on localhost - """ + +""" __test__ = False # This is not for pytest. import argparse diff --git a/src/python/blazingmq/dev/fuzztest/__init__.py b/src/python/blazingmq/dev/fuzztest/__init__.py index 644a0bd3dd..f0e84864d3 100644 --- a/src/python/blazingmq/dev/fuzztest/__init__.py +++ b/src/python/blazingmq/dev/fuzztest/__init__.py @@ -253,7 +253,7 @@ def make_put_message() -> BoofuzzSequence: math=lambda x: x // NumBytes.WORD, ) - guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE" + guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce" message_components = [ boofuzz.BitField(name="options_size", width=3 * NumBits.BYTE), @@ -302,7 +302,7 @@ def make_confirm_message() -> BoofuzzSequence: Constructs boofuzz structures representing ConfirmMessage. """ - guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE" + guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce" message_components = [ boofuzz.BitField( diff --git a/src/python/blazingmq/util/test/logging_test.py b/src/python/blazingmq/util/test/logging_test.py index e5f1d828e8..258ca2c34b 100644 --- a/src/python/blazingmq/util/test/logging_test.py +++ b/src/python/blazingmq/util/test/logging_test.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""Test suite for blazingmq.util.logging. -""" +"""Test suite for blazingmq.util.logging.""" from logging import DEBUG, INFO, getLogger from unittest.mock import patch