Skip to content

Commit

Permalink
fix: always update CSL on QueueUpdateAdvisory (#581)
Browse files Browse the repository at this point in the history
* fix: always update CSL on QueueUpdateAdvisory

Signed-off-by: dorjesinpo <[email protected]>

* Updating IT

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo authored Feb 5, 2025
1 parent 12bc29e commit a65e2c2
Show file tree
Hide file tree
Showing 3 changed files with 70 additions and 54 deletions.
19 changes: 11 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterstatemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,19 +70,24 @@ void ClusterStateManager::onCommit(
BSLS_ASSERT_SAFE(dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(advisory.choice().isClusterMessageValue());

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled()) {
return; // RETURN
}

if (status != mqbc::ClusterStateLedgerCommitStatus::e_SUCCESS) {
BALL_LOG_ERROR << d_clusterData_p->identity().description()
<< ": Failed to commit advisory: " << advisory
<< ", with status '" << status << "'";
return; // RETURN
}

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();

// NOTE: Even when using old workflow, we still apply all advisories to the
// CSL. We just don't invoke the commit callbacks.
// Make an exception for QueueUpdateAdvisory
if (!d_clusterConfig.clusterAttributes().isCSLModeEnabled() &&
!clusterMessage.choice().isQueueUpdateAdvisoryValue()) {
return; // RETURN
}

// Commenting out following 'if' check to fix an assert during node
// shutdown.
// if ( d_clusterData_p->membership().selfNodeStatus()
Expand All @@ -94,8 +99,6 @@ void ClusterStateManager::onCommit(
<< ": Committed advisory: " << advisory << ", with status '"
<< status << "'";

const bmqp_ctrlmsg::ClusterMessage& clusterMessage =
advisory.choice().clusterMessage();
mqbc::ClusterUtil::apply(d_state_p, clusterMessage, *d_clusterData_p);
}

Expand Down
46 changes: 0 additions & 46 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1286,29 +1286,6 @@ void ClusterUtil::registerAppId(ClusterData* clusterData,

queueUpdate.addedAppIds().push_back(appIdInfo);
queueAdvisory.queueUpdates().push_back(queueUpdate);

if (!clusterData->cluster().isCSLModeEnabled()) {
// In CSL mode, we update the queue to ClusterState upon CSL
// commit callback of QueueUpdateAdvisory.

// In non-CSL mode this is the shortcut to call Primary CQH
// instead of waiting for the quorum of acks in the ledger.

AppInfos addedApps(allocator);
mqbc::ClusterUtil::parseQueueInfo(&addedApps,
queueUpdate.addedAppIds(),
allocator);

BSLA_MAYBE_UNUSED const int assignRc =
clusterState.updateQueue(queueUpdate.uri(),
queueUpdate.domain(),
addedApps,
AppInfos(allocator));
BSLS_ASSERT_SAFE(assignRc == 0);

BALL_LOG_INFO << clusterData->cluster().description()
<< ": Queue updated: " << queueAdvisory;
}
}
}

Expand Down Expand Up @@ -1437,29 +1414,6 @@ void ClusterUtil::unregisterAppId(ClusterData* clusterData,

return; // RETURN
}

if (!clusterData->cluster().isCSLModeEnabled()) {
// In CSL mode, we update the queue to ClusterState upon CSL
// commit callback of QueueUpdateAdvisory.

// In non-CSL mode this is the shortcut to call Primary CQH
// instead of waiting for the quorum of acks in the ledger.

AppInfos removedApps(allocator);
mqbc::ClusterUtil::parseQueueInfo(&removedApps,
queueUpdate.removedAppIds(),
allocator);

BSLA_MAYBE_UNUSED const int assignRc =
clusterState.updateQueue(queueUpdate.uri(),
queueUpdate.domain(),
AppInfos(allocator),
removedApps);
BSLS_ASSERT_SAFE(assignRc == 0);

BALL_LOG_INFO << clusterData->cluster().description()
<< ": Queue updated: " << queueAdvisory;
}
}
}

Expand Down
59 changes: 59 additions & 0 deletions src/integration-tests/test_appids.py
Original file line number Diff line number Diff line change
Expand Up @@ -641,3 +641,62 @@ def test_open_authorize_restart_from_non_FSM_to_FSM(cluster: Cluster):
consumers[app_id].close(f"{tc.URI_FANOUT}?id={app_id}", block=True)
== Client.e_SUCCESS
)


def test_open_authorize_change_primary(multi_node: Cluster):
"""Add an App to Domain config of an existing queue, and then force a
Replica to become new Primary. Start new Consumer. Make sure the Consumer
receives previously posted data.
This is to address the concern with Replica not processing QueueUpdates
before becoming Primary.
"""
leader = multi_node.last_known_leader
proxies = multi_node.proxy_cycle()

producer = next(proxies).create_client("producer")
producer.open(tc.URI_FANOUT, flags=["write,ack"], succeed=True)

all_app_ids = default_app_ids + ["new_app"]

# ---------------------------------------------------------------------
# Create a consumer
app_id = all_app_ids[0]
consumer = next(proxies).create_client(app_id)
consumer.open(f"{tc.URI_FANOUT}?id={app_id}", flags=["read"], succeed=True)

# ---------------------------------------------------------------------
# Authorize 'quux'.
set_app_ids(multi_node, all_app_ids)

# ---------------------------------------------------------------------
# Post a message.
producer.post(tc.URI_FANOUT, ["msg1"], succeed=True, wait_ack=True)

# ---------------------------------------------------------------------
# Ensure that all substreams get 2 messages

leader.dump_queue_internals(tc.DOMAIN_FANOUT, tc.TEST_QUEUE)

assert wait_until(
lambda: len(consumer.list(f"{tc.URI_FANOUT}?id={app_id}", block=True)) == 1,
3,
)

consumer.close(f"{tc.URI_FANOUT}?id={app_id}", block=True, succeed=True)

leader.check_exit_code = False
leader.kill()
leader.wait()

# wait for new leader
leader = multi_node.wait_leader()

consumer = next(proxies).create_client(app_id)
consumer.open(f"{tc.URI_FANOUT}?id=new_app", flags=["read"], succeed=True)

assert wait_until(
lambda: len(consumer.list(f"{tc.URI_FANOUT}?id=new_app", block=True)) == 1,
3,
)

consumer.close(f"{tc.URI_FANOUT}?id=new_app", block=True, succeed=True)

0 comments on commit a65e2c2

Please sign in to comment.