Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: CQH calls Queue::getHandle #565

Merged
merged 3 commits into from
Jan 30, 2025
Merged

Feat: CQH calls Queue::getHandle #565

merged 3 commits into from
Jan 30, 2025

Conversation

dorjesinpo
Copy link
Collaborator

For the upcoming Dynamic Apps feature, we need ClusterQueueHelper to be the caller of Queue::getHandle (and inspect the status to make Dynamic Apps updates if necessary).
So, instead of Domain calling Queue::getHandle after Cluster::openQueue, it is the Cluster (ClusterQueueHelper) calling Queue::getHandle as part of Cluster::openQueue.

Roughly, the sequence looks like this:

    //  CQH::processPeerOpenQueueRequest  QueueSessionManager::processOpenQueue
    //              \                       /
    //               V                     V
    //              DomainFactory::createDomain
    //              /                      \
    //             V                        V
    //  CQH::onGetDomain                  QueueSessionManager::onDomainOpenCb
    //          |                           /
    //          V                          /
    //      CQH::onGetDomainDispatched    /
    //                      \            /
    //                       V          V
    //                      Domain::openQueue
    //                              |
    //                              V
    //                          Cluster::openQueue
    //                              |
    //                              V
    //                              CQH::openQueue
    //                                      \
    //                                       V
    //                                  Queue::getHandle
    //                                      /
    //                                     V
    //                              CQH::onGetQueueHandle. (future Dynamic Apps work)
    //                                    /
    //                                   V
    //                      Domain::onOpenQueueResponse
    //                        /                 \
    //                       V                   V
    //  CQH::onGetQueueHandleDispatched     QueueSessionManager::onQueueOpenCb
    //
    ```

@dorjesinpo dorjesinpo requested a review from a team as a code owner January 13, 2025 15:19
@dorjesinpo dorjesinpo added the enhancement New feature or request label Jan 13, 2025
@dorjesinpo dorjesinpo requested review from 678098 and removed request for a team January 13, 2025 15:19
@dorjesinpo dorjesinpo changed the title CQH calls Queue::getHandle Feat: CQH calls Queue::getHandle Jan 13, 2025
QueueLiveState& qinfo = context.d_queueContext_p->d_liveQInfo;
const int pid = context.d_queueContext_p->partitionId();
QueueLiveState& qinfo = context->d_queueContext_p->d_liveQInfo;
const int pid = context->d_queueContext_p->partitionId();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we also dereference context->d_queueContext_p without proper checks

@@ -1141,12 +1161,12 @@ void ClusterQueueHelper::onOpenQueueResponse(
failure = false;
}

QueueContext& qcontext = *context.d_queueContext_p;
QueueContext& qcontext = *context->d_queueContext_p;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dereferencing

QueueLiveState& qinfo = queueContext->d_liveQInfo;
const int pid = queueContext->partitionId();
const bool isPrimary = !d_cluster_p->isRemote() &&
d_clusterState_p->isSelfPrimary(pid);
bsl::shared_ptr<mqbi::Queue> queue = createQueueFactory(errorDescription,
context,
*context,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing check before that context is not a null shared ptr

@@ -89,8 +89,7 @@ namespace mqbblp {
// ============

/// Domain implementation
class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain,
public mqbc::ClusterStateObserver {
class Domain : public mqbi::Domain {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class Domain : public mqbi::Domain {
class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain {

@dorjesinpo dorjesinpo force-pushed the dev/streamline-gethandle branch from 6e95521 to 2f548bf Compare January 14, 2025 16:07
Copy link

@bmq-oss-ci bmq-oss-ci bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build 440 of commit 2f548bf has completed with FAILURE

, d_callback(callback)
{
BSLS_ASSERT_SAFE(domain);
// NOTHING
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// NOTHING

{
QueueLiveState& info = context->d_queueContext_p->d_liveQInfo;
QueueLiveState& info = context->queueContext()->d_liveQInfo;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might rely on the checks done before calling this function, or might add them here as well

Suggested change
QueueLiveState& info = context->queueContext()->d_liveQInfo;
// executed by the cluster *DISPATCHER* thread
// PRECONDITIONS
BSLS_ASSERT_SAFE(context);
BSLS_ASSERT_SAFE(context->queueContext());
QueueLiveState& info = context->queueContext()->d_liveQInfo;


const int pid = context.d_queueContext_p->partitionId();
QueueContext* queueContext = context.queueContext();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
QueueContext* queueContext = context.queueContext();
BSLS_ASSERT_SAFE(context);
QueueContext* queueContext = context.queueContext();
BSLS_ASSERT_SAFE(queueContext);

@678098 678098 assigned dorjesinpo and unassigned 678098 Jan 17, 2025
@dorjesinpo dorjesinpo force-pushed the dev/streamline-gethandle branch from 2f548bf to af5e923 Compare January 23, 2025 15:15
Signed-off-by: dorjesinpo <[email protected]>
Signed-off-by: dorjesinpo <[email protected]>
@dorjesinpo dorjesinpo force-pushed the dev/streamline-gethandle branch from af5e923 to de99d90 Compare January 27, 2025 22:50
Signed-off-by: dorjesinpo <[email protected]>
@dorjesinpo dorjesinpo force-pushed the dev/streamline-gethandle branch from de99d90 to b6777a5 Compare January 28, 2025 15:15
Copy link
Collaborator

@678098 678098 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's run sanitizers and merge, the UT failure is unrelated

@dorjesinpo dorjesinpo merged commit 8a86d36 into main Jan 30, 2025
27 of 32 checks passed
emelialei88 pushed a commit to emelialei88/blazingmq that referenced this pull request Feb 6, 2025
* CQH calls Queue::getHandle

Signed-off-by: dorjesinpo <[email protected]>

* Asserts...

Signed-off-by: dorjesinpo <[email protected]>

* more asserts

Signed-off-by: dorjesinpo <[email protected]>

---------

Signed-off-by: dorjesinpo <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants