Skip to content

Commit

Permalink
mqba::Application: Temporarily disable admin command re-routing (#586)
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored Jan 28, 2025
1 parent c18ff5c commit 4eabe37
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 9 deletions.
11 changes: 10 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,13 +832,22 @@ int Application::processCommand(const bslstl::StringRef& source,

mqbcmd::InternalResult cmdResult;

// TODO This boolean is added to address failure during in-progress broker
// rollout from v92.8 -> v93.7. If 93.7 broker tries to re-route an admin
// command to a 92.8 broker, 92.8 broker will fail to handle. Therefore, we
// turn off re-routing until all nodes have been upgraded to 93.7.
//
// Also, once reroute is re-enabled, revert the integration test changes
// introduced in https://github.com/bloomberg/blazingmq/pull/586.
const bool neverReroute = true;

// Note that routed commands should never route again to another node.
// This should always be the "end of the road" for a command.
// Note that this logic is important to prevent a "deadlock" scenario
// where two nodes are waiting on a response from each other to continue.
// Currently commands from reroutes are executed on their own dedicated
// thread.
if (fromReroute) {
if (fromReroute || neverReroute) {
if (0 != executeCommand(command, &cmdResult)) {
// early exit (caused by "dangerous" command)
return rc_EARLY_EXIT; // RETURN
Expand Down
12 changes: 12 additions & 0 deletions src/integration-tests/test_admin_command_routing.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ def test_primary_rerouting(multi_node: Cluster) -> None:
- CLUSTERS CLUSTER <name> STORAGE PARTITION <partitionId> ENABLE/DISABLE
"""

# TODO Skip admin command routing tests until admin command routing is re-enabled
return

admin = AdminClient()

# find the first node which is not a known leader
Expand Down Expand Up @@ -129,6 +132,9 @@ def test_cluster_rerouting(multi_node: Cluster) -> None:
- CLUSTERS CLUSTER <name> STATE ELECTOR GET_ALL <param> <value>
"""

# TODO Skip admin command routing tests until admin command routing is re-enabled
return

admin = AdminClient()

node = multi_node.nodes()[0]
Expand Down Expand Up @@ -197,6 +203,9 @@ def test_multi_response_encoding(multi_node: Cluster):
"""

# TODO Skip admin command routing tests until admin command routing is re-enabled
return

def is_compact(json_str: str) -> bool:
return " " not in json_str

Expand Down Expand Up @@ -263,6 +272,9 @@ def test_concurrently_routed_commands(multi_node: Cluster):
Stage 2: Issue command in parallel
Stage 3: Expect
"""
# TODO Skip admin command routing tests until admin command routing is re-enabled
return

# Connect a client to each node in the cluster
clients = []
for node in multi_node.nodes():
Expand Down
6 changes: 4 additions & 2 deletions src/integration-tests/test_admin_res_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster):
res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}")

success_count = res.split().count("SUCCESS")
assert success_count == num_nodes
# TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled
assert success_count == 1

assert leader.capture("Send response message", TIMEOUT)
assert not member1.capture("Send response message", TIMEOUT)
Expand All @@ -145,7 +146,8 @@ def test_adminsession_res_log_reconfigure(multi_node: Cluster):
res = admin.send_admin(f"DOMAINS RECONFIGURE {tc.DOMAIN_FANOUT}")

success_count = res.split().count("SUCCESS")
assert success_count == num_nodes
# TODO Do `assert success_count == num_nodes` when admin command routing is re-enabled
assert success_count == 1

assert member1.capture("Send response message", TIMEOUT)
assert not leader.capture("Send response message", TIMEOUT)
Expand Down
54 changes: 54 additions & 0 deletions src/integration-tests/test_domain_remove.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ def test_remove_domain_when_cluster_unhealthy(multi_node: Cluster):
the command fails with a routing error
resend the command and it should succeed
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = multi_node.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -103,6 +106,9 @@ def test_remove_different_domain(cluster: Cluster):
send DOMAINS REMOVE command to remove a different domain
the original one should be intact
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()

# open queue in PRIORITY domain but remove PRIORITY_SC
Expand Down Expand Up @@ -161,6 +167,9 @@ def test_open_queue_after_remove_domain(cluster: Cluster):
try to open a queue after the first round of DOMAINS REMOVE command
and it should fail since we started remove but not fully finished yet
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
next(proxies) # eastp
proxy = next(proxies) # westp
Expand Down Expand Up @@ -198,6 +207,9 @@ def test_remove_domain_with_producer_queue_open(cluster: Cluster):
"""
issue DOMAINS REMOVE command when consumer closes connection
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -234,6 +246,9 @@ def test_remove_domain_with_consumer_queue_open(cluster: Cluster):
"""
issue DOMAINS REMOVE command when producer closes connection
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -271,6 +286,9 @@ def test_remove_domain_with_both_queue_open_and_closed(cluster: Cluster):
issue DOMAINS REMOVE command when both producer and consumer have queue open
and both have queue closed
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -314,6 +332,9 @@ def test_try_open_removed_domain(cluster: Cluster):
3. close both producer and consumer
4. try open both, and they should all fail
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -358,6 +379,9 @@ def test_remove_domain_with_unconfirmed_message(cluster: Cluster):
"""
issue DOMAINS REMOVE command with unconfirmed messages
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -387,6 +411,9 @@ def test_remove_domain_not_on_disk(cluster: Cluster):
"""
issue DOMAINS REMOVE command when the domain is not on disk
"""
# TODO Skip this test until admin command routing is re-enabled
return

admin = AdminClient()
leader = cluster.last_known_leader
admin.connect(leader.config.host, int(leader.config.port))
Expand All @@ -399,6 +426,9 @@ def test_remove_domain_on_disk_not_in_cache(cluster: Cluster):
"""
issue DOMAINS REMOVE command when the domain is not on disk
"""
# TODO Skip this test until admin command routing is re-enabled
return

admin = AdminClient()
leader = cluster.last_known_leader
admin.connect(leader.config.host, int(leader.config.port))
Expand All @@ -412,6 +442,9 @@ def test_send_to_replicas(multi_node: Cluster):
send DOMAINS REMOVE admin command to replica instead of primary
replica will boardcast to all the nodes including the primary
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = multi_node.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -464,6 +497,9 @@ def test_second_round(cluster: Cluster):
a queue and the removed domain can be opened after finalizing
and when the domain exists on the disk
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -525,6 +561,9 @@ def test_purge_then_remove(cluster: Cluster):
"""
purge queue then remove
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)
uri = tc.URI_PRIORITY
Expand All @@ -549,6 +588,9 @@ def test_remove_without_connection(cluster: Cluster):
"""
issue DOMAINS REMOVE command without any connection to a domain on disk
"""
# TODO Skip this test until admin command routing is re-enabled
return

admin = AdminClient()
leader = cluster.last_known_leader
admin.connect(leader.config.host, int(leader.config.port))
Expand Down Expand Up @@ -577,6 +619,9 @@ def test_remove_then_restart(cluster: Cluster):
6. produce 3 messages
7. completely remove the domain
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -642,6 +687,9 @@ def test_remove_with_reconfig(cluster: Cluster):
5. call reconfigure to load the domain
6. produce 3 messages
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)

Expand Down Expand Up @@ -697,6 +745,9 @@ def test_remove_cache_remains(cluster: Cluster):
5. second round of DOMAINS REMOVE
6. try to open a queue and nothing in the cache
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)
uri = tc.URI_PRIORITY
Expand Down Expand Up @@ -741,6 +792,9 @@ def test_remove_cache_cleaned(cluster: Cluster):
5. second round of DOMAINS REMOVE
6. try to open a queue and nothing in the cache
"""
# TODO Skip this test until admin command routing is re-enabled
return

proxies = cluster.proxy_cycle()
proxy = next(proxies)
uri = tc.URI_PRIORITY
Expand Down
5 changes: 4 additions & 1 deletion src/integration-tests/test_reconfigure_domains.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,10 @@ def reconfigure_to_limit_n_msgs(self, cluster: Cluster, max_num_msgs: int) -> bo
tc.DOMAIN_PRIORITY_SC
].definition.parameters.storage.domain_limits.messages = max_num_msgs
return cluster.reconfigure_domain(
tc.DOMAIN_PRIORITY_SC, leader_only=True, succeed=True
# TODO: Set `leader_only=True` when admin command routing is re-enabled
tc.DOMAIN_PRIORITY_SC,
leader_only=False,
succeed=True,
)

# Verify that reconfiguring domain message limits works as expected.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@
-m {all,pull,push}, --mode {all,pull,push}
prometheus mode
--no-docker don't run Prometheus in docker, assume it is running on localhost
"""
"""
__test__ = False # This is not for pytest.

import argparse
Expand Down
4 changes: 2 additions & 2 deletions src/python/blazingmq/dev/fuzztest/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ def make_put_message() -> BoofuzzSequence:
math=lambda x: x // NumBytes.WORD,
)

guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE"
guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce"

message_components = [
boofuzz.BitField(name="options_size", width=3 * NumBits.BYTE),
Expand Down Expand Up @@ -302,7 +302,7 @@ def make_confirm_message() -> BoofuzzSequence:
Constructs boofuzz structures representing ConfirmMessage.
"""

guid = b"\x00\x00\x00\x00\x05\x78\x8D\xAE\xD4\xB8\xCA\x12\xAE\xF3\x2D\xCE"
guid = b"\x00\x00\x00\x00\x05\x78\x8d\xae\xd4\xb8\xca\x12\xae\xf3\x2d\xce"

message_components = [
boofuzz.BitField(
Expand Down
3 changes: 1 addition & 2 deletions src/python/blazingmq/util/test/logging_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

"""Test suite for blazingmq.util.logging.
"""
"""Test suite for blazingmq.util.logging."""

from logging import DEBUG, INFO, getLogger
from unittest.mock import patch
Expand Down

0 comments on commit 4eabe37

Please sign in to comment.