From 6e955216213f28c823b9b6b6c06eb8722ea5e4e2 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Sun, 12 Jan 2025 14:47:08 -0500 Subject: [PATCH] CQH calls Queue::getHandle Signed-off-by: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> --- .../mqb/mqbblp/mqbblp_clusterqueuehelper.cpp | 362 ++++++++++-------- .../mqb/mqbblp/mqbblp_clusterqueuehelper.h | 35 +- src/groups/mqb/mqbblp/mqbblp_domain.cpp | 155 +------- src/groups/mqb/mqbblp/mqbblp_domain.h | 37 +- src/groups/mqb/mqbi/mqbi_cluster.h | 2 +- 5 files changed, 243 insertions(+), 348 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp index 919ac29cd6..ede55bc9e9 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp @@ -103,24 +103,6 @@ const bsls::Types::Int64 k_NS_PER_MESSAGE = BALL_LOGTHROTTLE_INFO(k_MAX_INSTANT_MESSAGES, k_NS_PER_MESSAGE) \ << "[THROTTLED] " -/// This function is a simple wrapper around the specified `callback`, to -/// ensure that the specified `refCount` is decremented after it gets -/// invoked with the specified `status`, `queue` and `confirmationCookie`. -void openQueueContextCallbackWrapper( - int* refCount, - const mqbi::Cluster::OpenQueueCallback& callback, - const bmqp_ctrlmsg::Status& status, - mqbi::Queue* queue, - const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, - const mqbi::Cluster::OpenQueueConfirmationCookie& confirmationCookie) -{ - // PRECONDITIONS - BSLS_ASSERT_SAFE(*refCount >= 1); - - callback(status, queue, openQueueResponse, confirmationCookie); - --(*refCount); -} - /// Populate the specified `out` with the queueUriKey corresponding to the /// specified `uri` for the specified `cluster`; that is the canonical URI. /// The reason is that different queues with the same canonical URI are just @@ -178,6 +160,40 @@ void countUnconfirmed(bsls::Types::Int64* result, mqbi::Queue* queue) } // close unnamed namespace +// ------------------------------------------- +// struct ClusterQueueHelper::OpenQueueContext +// ------------------------------------------- + +ClusterQueueHelper::OpenQueueContext::OpenQueueContext( + mqbi::Domain* domain, + const bmqp_ctrlmsg::QueueHandleParameters& handleParameters, + const bsl::shared_ptr& clientContext, + const mqbi::Cluster::OpenQueueCallback& callback) +: d_queueContext_p(0) +, d_domain_p(domain) +, d_handleParameters(handleParameters) +, d_upstreamSubQueueId(bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID) +, d_clientContext(clientContext) +, d_callback(callback) +{ + // NOTHING +} + +ClusterQueueHelper::OpenQueueContext::~OpenQueueContext() +{ + BSLS_ASSERT_SAFE(d_queueContext_p); + + --d_queueContext_p->d_liveQInfo.d_inFlight; +} + +void ClusterQueueHelper::OpenQueueContext::setQueueContext( + QueueContext* queueContext) +{ + d_queueContext_p = queueContext; + // Bump 'd_inFlight' counter + ++(d_queueContext_p->d_liveQInfo.d_inFlight); +} + // ----------------------------------------- // struct ClusterQueueHelper::QueueLiveState // ----------------------------------------- @@ -276,7 +292,8 @@ unsigned int ClusterQueueHelper::getNextQueueId() return res; } -unsigned int ClusterQueueHelper::getNextSubQueueId(OpenQueueContext* context) +unsigned int +ClusterQueueHelper::getNextSubQueueId(const OpenQueueContextSp& context) { // executed by the cluster *DISPATCHER* thread @@ -769,17 +786,18 @@ void ClusterQueueHelper::processPendingContexts( // Swap the contexts to process them one by one and also clear the // pendingContexts of the queue info: they will be enqueued back, if // needed. - bsl::vector contexts(d_allocator_p); + bsl::vector contexts(d_allocator_p); contexts.swap(queueContext->d_liveQInfo.d_pending); - for (bsl::vector::iterator it = contexts.begin(); + for (bsl::vector::iterator it = contexts.begin(); it != contexts.end(); ++it) { processOpenQueueRequest(*it); } } -void ClusterQueueHelper::assignUpstreamSubqueueId(OpenQueueContext* context) +void ClusterQueueHelper::assignUpstreamSubqueueId( + const OpenQueueContextSp& context) { QueueLiveState& info = context->d_queueContext_p->d_liveQInfo; const bsl::string appId = bmqp::QueueUtil::extractAppId( @@ -826,44 +844,44 @@ void ClusterQueueHelper::assignUpstreamSubqueueId(OpenQueueContext* context) } void ClusterQueueHelper::processOpenQueueRequest( - const OpenQueueContext& context) + const OpenQueueContextSp& context) { // executed by the cluster *DISPATCHER* thread // PRECONDITIONS BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - BSLS_ASSERT_SAFE(isQueueAssigned(*(context.d_queueContext_p))); + BSLS_ASSERT_SAFE(isQueueAssigned(*(context->d_queueContext_p))); // At this time, the Queue must have been assigned an id/partition. - OpenQueueContext* context_p = &const_cast(context); - if (d_cluster_p->isRemote()) { BSLS_ASSERT_SAFE(d_clusterData_p->electorInfo().hasActiveLeader()); - assignUpstreamSubqueueId(context_p); + assignUpstreamSubqueueId(context); sendOpenQueueRequest(context); return; // RETURN } - const int pid = context.d_queueContext_p->partitionId(); + const int pid = context->d_queueContext_p->partitionId(); if (hasActiveAvailablePrimary(pid)) { if (d_clusterState_p->isSelfPrimary(pid)) { - // At primary. Load the routing configuration and inject the - // downstream handle parameters into the "response" - BSLS_ASSERT_SAFE(context.d_domain_p); + // At primary. + + // Load the routing configuration and inject the downstream + // handle parameters into the "response" + BSLS_ASSERT_SAFE(context->d_domain_p); bmqp_ctrlmsg::OpenQueueResponse openQueueResp; - context.d_domain_p->loadRoutingConfiguration( + context->d_domain_p->loadRoutingConfiguration( &openQueueResp.routingConfiguration()); openQueueResp.deduplicationTimeMs() = - context.d_domain_p->config().deduplicationTimeMs(); + context->d_domain_p->config().deduplicationTimeMs(); openQueueResp.originalRequest().handleParameters() = - context.d_handleParameters; + context->d_handleParameters; openQueueResp.originalRequest().handleParameters().qId() = bmqp::QueueId::k_PRIMARY_QUEUE_ID; @@ -877,19 +895,20 @@ void ClusterQueueHelper::processOpenQueueRequest( // upstream id associated, if we haven't already done so, assign // one now. - if (context.d_queueContext_p->d_liveQInfo.d_id == + if (context->d_queueContext_p->d_liveQInfo.d_id == bmqp::QueueId::k_UNASSIGNED_QUEUE_ID) { - context.d_queueContext_p->d_liveQInfo.d_id = getNextQueueId(); + context->d_queueContext_p->d_liveQInfo.d_id = getNextQueueId(); } - assignUpstreamSubqueueId(context_p); + assignUpstreamSubqueueId(context); sendOpenQueueRequest(context); } } } -void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) +void ClusterQueueHelper::sendOpenQueueRequest( + const OpenQueueContextSp& context) { // executed by the cluster *DISPATCHER* thread @@ -897,10 +916,10 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - QueueLiveState& qinfo = context.d_queueContext_p->d_liveQInfo; - const int pid = context.d_queueContext_p->partitionId(); + QueueLiveState& qinfo = context->d_queueContext_p->d_liveQInfo; + const int pid = context->d_queueContext_p->partitionId(); - BSLS_ASSERT_SAFE(isQueueAssigned(*(context.d_queueContext_p))); + BSLS_ASSERT_SAFE(isQueueAssigned(*(context->d_queueContext_p))); BSLS_ASSERT_SAFE(qinfo.d_id != bmqp::QueueId::k_UNASSIGNED_QUEUE_ID); BSLS_ASSERT_SAFE((d_cluster_p->isRemote() && d_clusterData_p->electorInfo().hasActiveLeader()) || @@ -920,13 +939,13 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) failure.code() = RC; \ failure.message().assign(os.str().data(), os.str().length()); \ \ - context.d_callback(failure, \ - 0, \ - bmqp_ctrlmsg::OpenQueueResponse(), \ - mqbi::Cluster::OpenQueueConfirmationCookie()); \ + context->d_callback(failure, \ + 0, \ + bmqp_ctrlmsg::OpenQueueResponse(), \ + mqbi::Cluster::OpenQueueConfirmationCookie()); \ } while (0) - if (bmqp::QueueUtil::isEmpty(context.d_handleParameters)) { + if (bmqp::QueueUtil::isEmpty(context->d_handleParameters)) { BALL_LOG_INFO << "#INVALID_OPENQUEUE_REQ " << d_cluster_p->description() << ": Not sending openQueueRequest to " @@ -934,7 +953,7 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) .primaryNode() ->nodeDescription() << "[context.d_handleParameters: " - << context.d_handleParameters + << context->d_handleParameters << ", reason: 'All read,write,admin counts are <= 0]"; CALLBACK_FAILURE(bmqp_ctrlmsg::StatusCategory::E_UNKNOWN, bmqt::GenericResult::e_INVALID_ARGUMENT, @@ -947,18 +966,19 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) bmqp_ctrlmsg::OpenQueue& openQueue = request->request().choice().makeOpenQueue(); - openQueue.handleParameters() = context.d_handleParameters; + openQueue.handleParameters() = context->d_handleParameters; openQueue.handleParameters().qId() = qinfo.d_id; // If we previously generated an upstream subQueueId, then set it here // before sending to upstream. - if (context.d_upstreamSubQueueId != bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) { - BSLS_ASSERT_SAFE(!context.d_handleParameters.subIdInfo().isNull()); + if (context->d_upstreamSubQueueId != + bmqp::QueueId::k_DEFAULT_SUBQUEUE_ID) { + BSLS_ASSERT_SAFE(!context->d_handleParameters.subIdInfo().isNull()); openQueue.handleParameters().subIdInfo().makeValue( - context.d_handleParameters.subIdInfo().value()); + context->d_handleParameters.subIdInfo().value()); openQueue.handleParameters().subIdInfo().value().subId() = - context.d_upstreamSubQueueId; + context->d_upstreamSubQueueId; } mqbnet::ClusterNode* targetNode = 0; @@ -988,7 +1008,7 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) if (rc == bmqt::GenericResult::e_NOT_CONNECTED) { // Put back the context to the pending list so that it will get // re-processed later. - context.d_queueContext_p->d_liveQInfo.d_pending.push_back(context); + context->d_queueContext_p->d_liveQInfo.d_pending.push_back(context); return; // RETURN } @@ -1001,10 +1021,10 @@ void ClusterQueueHelper::sendOpenQueueRequest(const OpenQueueContext& context) // Success. Update _upstream_ view on that particular subQueueId. StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId( - context.d_upstreamSubQueueId); + context->d_upstreamSubQueueId); bmqp::QueueUtil::mergeHandleParameters(&subStreamIt->value().d_parameters, - context.d_handleParameters); + context->d_handleParameters); #undef CALLBACK_FAILURE } @@ -1060,7 +1080,7 @@ bmqt::GenericResult::Enum ClusterQueueHelper::sendReopenQueueRequest( void ClusterQueueHelper::onOpenQueueResponse( const RequestManagerType::RequestSp& requestContext, - const OpenQueueContext& context, + const OpenQueueContextSp& context, mqbnet::ClusterNode* responder) { // executed by the cluster *DISPATCHER* thread @@ -1141,12 +1161,12 @@ void ClusterQueueHelper::onOpenQueueResponse( failure = false; } - QueueContext& qcontext = *context.d_queueContext_p; + QueueContext& qcontext = *context->d_queueContext_p; QueueLiveState& qinfo = qcontext.d_liveQInfo; BSLA_MAYBE_UNUSED const bmqp_ctrlmsg::OpenQueue& req = requestContext->request().choice().openQueue(); StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId( - context.d_upstreamSubQueueId); + context->d_upstreamSubQueueId); BSLS_ASSERT_SAFE(bmqp::QueueUtil::extractAppId(req.handleParameters()) == subStreamIt->appId()); @@ -1161,13 +1181,13 @@ void ClusterQueueHelper::onOpenQueueResponse( bmqp::QueueUtil::subtractHandleParameters( &subStreamIt->value().d_parameters, - context.d_handleParameters); + context->d_handleParameters); if (d_cluster_p->isStopping() || (!retry && !pushBack)) { - context.d_callback(requestContext->response().choice().status(), - 0, - bmqp_ctrlmsg::OpenQueueResponse(), - mqbi::Cluster::OpenQueueConfirmationCookie()); + context->d_callback(requestContext->response().choice().status(), + 0, + bmqp_ctrlmsg::OpenQueueResponse(), + mqbi::Cluster::OpenQueueConfirmationCookie()); return; // RETURN } @@ -1814,7 +1834,7 @@ void ClusterQueueHelper::onOpenQueueConfirmationCookieReleased( } bool ClusterQueueHelper::createQueue( - const OpenQueueContext& context, + const OpenQueueContextSp& context, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, mqbnet::ClusterNode* upstreamNode) { @@ -1824,15 +1844,16 @@ bool ClusterQueueHelper::createQueue( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); - const unsigned int upstreamQueueId = - openQueueResponse.originalRequest().handleParameters().qId(); + const bmqp_ctrlmsg::QueueHandleParameters& parameters = + openQueueResponse.originalRequest().handleParameters(); + const unsigned int upstreamQueueId = parameters.qId(); BALL_LOG_INFO << d_cluster_p->description() << ": createQueue called [upstreamQueueId: " << upstreamQueueId << ", openQueueResponse: " << openQueueResponse << ", " << "context.d_handleParameters: " - << context.d_handleParameters << "]"; + << context->d_handleParameters << "]"; mqbi::Cluster::OpenQueueConfirmationCookie confirmationCookie( new (*d_allocator_p) mqbi::QueueHandle * (0), @@ -1840,22 +1861,20 @@ bool ClusterQueueHelper::createQueue( &ClusterQueueHelper::onOpenQueueConfirmationCookieReleased, this, bdlf::PlaceHolders::_1, // queue handle* - context.d_handleParameters), + context->d_handleParameters), d_allocator_p); bdlma::LocalSequentialAllocator<1024> la(d_allocator_p); bmqu::MemOutStream errorDescription(&la); bmqp_ctrlmsg::Status status; - const bmqp_ctrlmsg::QueueHandleParameters& parameters = - openQueueResponse.originalRequest().handleParameters(); - QueueContext* queueContext = context.d_queueContext_p; + QueueContext* queueContext = context->d_queueContext_p; QueueLiveState& qinfo = queueContext->d_liveQInfo; const int pid = queueContext->partitionId(); const bool isPrimary = !d_cluster_p->isRemote() && d_clusterState_p->isSelfPrimary(pid); bsl::shared_ptr queue = createQueueFactory(errorDescription, - context, + *context, openQueueResponse); if (queue) { @@ -1883,10 +1902,20 @@ bool ClusterQueueHelper::createQueue( true); // isWriterOnly } - context.d_callback(status, - queue.get(), - openQueueResponse, - confirmationCookie); + const unsigned int upstreamSubQueueId = + bmqp::QueueUtil::extractSubQueueId(parameters); + + queue->getHandle( + context->d_clientContext, + context->d_handleParameters, + upstreamSubQueueId, + bdlf::BindUtil::bind(&ClusterQueueHelper::onGetQueueHandle, + this, + bdlf::PlaceHolders::_1, // status + bdlf::PlaceHolders::_2, // handle + context, + openQueueResponse, + confirmationCookie)); return true; // RETURN } @@ -1896,10 +1925,13 @@ bool ClusterQueueHelper::createQueue( status.code() = -1; status.message().assign(errorDescription.str().data(), errorDescription.str().length()); - context.d_callback(status, - 0, - openQueueResponse, - mqbi::Cluster::OpenQueueConfirmationCookie()); + + // Explicitly call the callback with the status + onGetQueueHandle(status, + 0, + context, + openQueueResponse, + mqbi::Cluster::OpenQueueConfirmationCookie()); if (isPrimary) { // No further cleanup required. @@ -1914,7 +1946,7 @@ bool ClusterQueueHelper::createQueue( // Update _upstream_ view on that particular subQueueId StreamsMap::iterator subStreamIt = qinfo.d_subQueueIds.findBySubId( - context.d_upstreamSubQueueId); + context->d_upstreamSubQueueId); subtractCounters(&qinfo, parameters, subStreamIt); sendCloseQueueRequest(parameters, @@ -1934,6 +1966,7 @@ bsl::shared_ptr ClusterQueueHelper::createQueueFactory( BSLS_ASSERT_SAFE( d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p)); BSLS_ASSERT_SAFE(isQueueAssigned(*(context.d_queueContext_p))); + BSLS_ASSERT_SAFE(context.d_domain_p); const int pid = context.d_queueContext_p->partitionId(); if (!d_cluster_p->isRemote()) { @@ -2291,11 +2324,44 @@ void ClusterQueueHelper::onGetDomainDispatched( const bmqp_ctrlmsg::QueueHandleParameters& handleParams = request.choice().openQueue().handleParameters(); + // CQH::processPeerOpenQueueRequest QueueSessionManager::processOpenQueue + // \ / + // V V + // DomainFactory::createDomain + // / \ + // V V + // CQH::onGetDomain QueueSessionManager::onDomainOpenCb + // | / + // V / + // CQH::onGetDomainDispatched / + // \ / + // V V + // Domain::openQueue + // | + // V + // Cluster::openQueue + // | + // V + // CQH::openQueue + // \ + // V + // Queue::getHandle + // / + // V + // CQH::onGetQueueHandle + // / + // V + // Domain::onOpenQueueResponse + // / \ + // V V + // CQH::onGetQueueHandleDispatched QueueSessionManager::onQueueOpenCb + // + domain->openQueue( bmqt::Uri(handleParams.uri()), requester->handleRequesterContext(), handleParams, - bdlf::BindUtil::bind(&ClusterQueueHelper::onGetQueueHandle, + bdlf::BindUtil::bind(&ClusterQueueHelper::onGetQueueHandleDispatched, this, bdlf::PlaceHolders::_1, // status bdlf::PlaceHolders::_2, // queueHandle @@ -2309,19 +2375,19 @@ void ClusterQueueHelper::onGetDomainDispatched( void ClusterQueueHelper::onGetQueueHandle( const bmqp_ctrlmsg::Status& status, mqbi::QueueHandle* queueHandle, + const OpenQueueContextSp& context, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, - const mqbi::Domain::OpenQueueConfirmationCookie& confirmationCookie, - const bmqp_ctrlmsg::ControlMessage& request, - mqbc::ClusterNodeSession* requester, - const int peerInstanceId) + const mqbi::Domain::OpenQueueConfirmationCookie& confirmationCookie) { // executed by *ANY* thread + BSLS_ASSERT_SAFE(context); + // First step in this routine is to update the cookie with the queue handle // if 'confirmationCookie' is valid. If this open-queue request has // succeeded, this object should eventually set 'confirmationCookie' to 0 - // (see 'onGetQueueHandleDispatched'). Note that this is similar to what - // 'mqba::ClientSession' is doing in 'onQueueOpenCb'. The rough equivalent + // (see 'onGetQueueHandleDispatched'). Note that this also applies to the + // 'mqba::ClientSession' case ('onQueueOpenCb'). The rough equivalent // of a client session here is the cluster node session represented by // 'requester'. @@ -2329,17 +2395,21 @@ void ClusterQueueHelper::onGetQueueHandle( *confirmationCookie = queueHandle; } - d_cluster_p->dispatcher()->execute( - bdlf::BindUtil::bind(&ClusterQueueHelper::onGetQueueHandleDispatched, - this, - status, - queueHandle, - openQueueResponse, - confirmationCookie, - request, - requester, - peerInstanceId), - d_cluster_p); + if (context->d_clientContext->isClusterMember()) { + d_cluster_p->dispatcher()->execute( + bdlf::BindUtil::bind(context->d_callback, + status, + queueHandle, + openQueueResponse, + confirmationCookie), + d_cluster_p); + } + else { + context->d_callback(status, + queueHandle, + openQueueResponse, + confirmationCookie); + } } void ClusterQueueHelper::onGetQueueHandleDispatched( @@ -3232,6 +3302,7 @@ bool ClusterQueueHelper::subtractCounters( StreamsMap::iterator& itSubStream) { BSLS_ASSERT_SAFE(qinfo); + BSLS_ASSERT_SAFE(qinfo->d_queue_sp); bmqp::QueueUtil::subtractHandleParameters( &itSubStream->value().d_parameters, @@ -3295,15 +3366,15 @@ void ClusterQueueHelper::processRejectedQueueAssignments( for (bsl::vector::const_iterator sIt = rejected.begin(); sIt != rejected.end(); ++sIt) { - for (bsl::vector::iterator + for (bsl::vector::iterator cIt = (*sIt)->d_liveQInfo.d_pending.begin(), cLast = (*sIt)->d_liveQInfo.d_pending.end(); cIt != cLast; ++cIt) { - cIt->d_callback(failure, - 0, - bmqp_ctrlmsg::OpenQueueResponse(), - mqbi::Cluster::OpenQueueConfirmationCookie()); + (*cIt)->d_callback(failure, + 0, + bmqp_ctrlmsg::OpenQueueResponse(), + mqbi::Cluster::OpenQueueConfirmationCookie()); } d_queues.erase((*sIt)->uri()); } @@ -3948,6 +4019,9 @@ void ClusterQueueHelper::onQueueAssigned( QueueContextSp queueContext; QueueContextMapIter queueContextIt = d_queues.find(info->uri()); + mqbc::ClusterState::DomainState& domainState = + *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); + if (queueContextIt != d_queues.end()) { // We already have a queueContext created for that queue queueContext = queueContextIt->second; @@ -4019,8 +4093,6 @@ void ClusterQueueHelper::onQueueAssigned( d_queues[info->uri()] = queueContext; } - mqbc::ClusterState::DomainState& domainState = - *d_clusterState_p->domainStates().at(info->uri().qualifiedDomain()); domainState.adjustQueueCount(1); @@ -4029,7 +4101,7 @@ void ClusterQueueHelper::onQueueAssigned( // above BALL_LOG_INFO << d_cluster_p->description() - << ": Assigned queue: " << info; + << ": Assigned queue: " << *info; // Note: In non-CSL mode, the queue creation callback is instead invoked at // replica nodes when they receive a queue creation record from the primary @@ -4042,24 +4114,18 @@ void ClusterQueueHelper::onQueueAssigned( // in the StorageMgr if it was a queue found during storage // recovery. Therefore, we will allow for duplicate registration // which will simply result in a no-op. - d_storageManager_p->registerQueueReplica( - info->partitionId(), - info->uri(), - info->key(), - d_clusterState_p->domainStates() - .at(info->uri().qualifiedDomain()) - ->domain(), - true); // allowDuplicate - - d_storageManager_p->updateQueueReplica( - info->partitionId(), - info->uri(), - info->key(), - info->appInfos(), - d_clusterState_p->domainStates() - .at(info->uri().qualifiedDomain()) - ->domain(), - true); // allowDuplicate + d_storageManager_p->registerQueueReplica(info->partitionId(), + info->uri(), + info->key(), + domainState.domain(), + true); // allowDuplicate + + d_storageManager_p->updateQueueReplica(info->partitionId(), + info->uri(), + info->key(), + info->appInfos(), + domainState.domain(), + true); // allowDuplicate } } @@ -4496,31 +4562,20 @@ void ClusterQueueHelper::openQueue( } // Create an OpenQueue context for that request. - OpenQueueContext context; - context.d_domain_p = domain; - context.d_handleParameters = handleParameters; - context.d_upstreamSubQueueId = bmqp::QueueId::k_UNASSIGNED_SUBQUEUE_ID; + OpenQueueContextSp context(new (*d_allocator_p) + OpenQueueContext(domain, + handleParameters, + clientContext, + callback), + d_allocator_p); - // NOTE: We wrap the 'callback' into the 'openQueueContextCallbackWrapper' - // so that we guarantee the 'd_inFlight' counter will always be - // decremented after the user provided 'callback' has been invoked. + // 'OpenQueueContext::~OpenQueueContext' decrements 'd_inFlight' counter. // Check if we are already aware of the queue. if (queueContextIt != d_queues.end()) { // Already aware of the queue; but the queue may not yet have been // assigned. - context.d_queueContext_p = queueContextIt->second.get(); - context.d_callback = bdlf::BindUtil::bind( - &openQueueContextCallbackWrapper, - &context.d_queueContext_p->d_liveQInfo.d_inFlight, - callback, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // queue - bdlf::PlaceHolders::_3, // openQueueResponse - bdlf::PlaceHolders::_4); // confirmationCookie - - // Bump 'd_inFlight' counter - ++(context.d_queueContext_p->d_liveQInfo.d_inFlight); + context->setQueueContext(queueContextIt->second.get()); // In case queue was marked for expiration, explicitly unmark it. Note // that self may be a replica or a passive primary, but it's ok to @@ -4529,7 +4584,7 @@ void ClusterQueueHelper::openQueue( // point. queueContextIt->second->d_liveQInfo.d_queueExpirationTimestampMs = 0; - if (isQueuePrimaryAvailable(*(context.d_queueContext_p))) { + if (isQueuePrimaryAvailable(*(context->d_queueContext_p))) { // Queue is already assigned and the primary is AVAILABLE, all // good; move on to next step, i.e., processing the open request. processOpenQueueRequest(context); @@ -4605,18 +4660,7 @@ void ClusterQueueHelper::openQueue( queueContext.createInplace(d_allocator_p, uriKey, d_allocator_p); d_queues[uriKey] = queueContext; - context.d_queueContext_p = queueContext.get(); - context.d_callback = bdlf::BindUtil::bind( - &openQueueContextCallbackWrapper, - &context.d_queueContext_p->d_liveQInfo.d_inFlight, - callback, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // queue - bdlf::PlaceHolders::_3, // openQueueResponse - bdlf::PlaceHolders::_4); // confirmationCookie - - // Bump 'd_inFlight' counter - ++(context.d_queueContext_p->d_liveQInfo.d_inFlight); + context->setQueueContext(queueContext.get()); // Register the context to the pending list. queueContext->d_liveQInfo.d_pending.push_back(context); @@ -6283,7 +6327,7 @@ void ClusterQueueHelper::loadState( for (QueueContextMapConstIter it = d_queues.begin(); it != d_queues.end(); ++it, ++qIdx) { const QueueLiveState& info = it->second->d_liveQInfo; - const bsl::vector& contexts = + const bsl::vector& contexts = it->second->d_liveQInfo.d_pending; const int pid = it->second->partitionId(); @@ -6333,7 +6377,7 @@ void ClusterQueueHelper::loadState( // Contexts clusterQueue.contexts().resize(contexts.size()); for (size_t ctxId = 0; ctxId != contexts.size(); ++ctxId) { - const OpenQueueContext& context = contexts[ctxId]; + const OpenQueueContext& context = *contexts[ctxId]; os << context.d_handleParameters; clusterQueue.contexts()[ctxId].queueHandleParametersJson() = os.str(); diff --git a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h index 3907bfb137..f17763bba7 100644 --- a/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h +++ b/src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.h @@ -246,7 +246,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL // expire (because it has non-zero // messages or handles or both). - bsl::vector d_pending; + bsl::vector > d_pending; // List of all open queue pending // contexts which are awaiting for a // next step on the queue (assignment, @@ -360,11 +360,26 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL // ::k_UNASSIGNED_SUBQUEUE_ID // if unassigned) + bsl::shared_ptr d_clientContext; + mqbi::Cluster::OpenQueueCallback d_callback; // Callback to invoke when the queue is // opened (whether success or failure). + + OpenQueueContext( + mqbi::Domain* domain, + const bmqp_ctrlmsg::QueueHandleParameters& handleParameters, + const bsl::shared_ptr& + clientContext, + const mqbi::Cluster::OpenQueueCallback& callback); + + ~OpenQueueContext(); + + void setQueueContext(QueueContext* queueContext); }; + typedef bsl::shared_ptr OpenQueueContextSp; + /// Structure representing all information and context associated to a /// queue, whether the queue is opened, being opened, or just aware due /// to a leader advisory message. @@ -498,7 +513,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// Get the next subQueueId for a subStream of the queue corresponding /// to the specified `context`. - unsigned int getNextSubQueueId(OpenQueueContext* context); + unsigned int getNextSubQueueId(const OpenQueueContextSp& context); /// Invoked after the specified `partitionId` gets assigned to the /// specified `primary` with the specified `status`. Note that null is @@ -558,14 +573,14 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// `context`: that is, depending on the cluster mode and queue /// assignment, either send an open queue request or create the queue. /// The queue must have been assigned at this point. - void processOpenQueueRequest(const OpenQueueContext& context); + void processOpenQueueRequest(const OpenQueueContextSp& context); /// Send an open queue request for the queue and its associated /// parameter as contained in the specified `context` to the primary /// node in charge of the queue. The queue must have been assigned at /// this point, and the current machine must either be a proxy, or not /// the primary of the queue. - void sendOpenQueueRequest(const OpenQueueContext& context); + void sendOpenQueueRequest(const OpenQueueContextSp& context); /// Send an open queue request for the queue and its associated /// parameters as contained in the specified `requestContext` to the @@ -585,14 +600,14 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// queue has already been opened with the appId in the `context`, /// assign the upstream subQueueId which was previously generated for /// that appId. Otherwise, generate and assign new unique id. - void assignUpstreamSubqueueId(OpenQueueContext* context); + void assignUpstreamSubqueueId(const OpenQueueContextSp& context); /// Response callback of an open queue request, in the specified /// `context` and with the request and its associated response in the /// specified `requestContext`. void onOpenQueueResponse(const RequestManagerType::RequestSp& requestContext, - const OpenQueueContext& context, + const OpenQueueContextSp& context, mqbnet::ClusterNode* responder); /// Response callback of an open queue request, that was sent due to the @@ -639,7 +654,7 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL /// this method is invoked at the primary node; for every other node, /// `upstreamNode` will represent the node to which open-queue request /// was sent. - bool createQueue(const OpenQueueContext& context, + bool createQueue(const OpenQueueContextSp& context, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, mqbnet::ClusterNode* upstreamNode); @@ -699,11 +714,9 @@ class ClusterQueueHelper BSLS_KEYWORD_FINAL void onGetQueueHandle( const bmqp_ctrlmsg::Status& status, mqbi::QueueHandle* queueHandle, + const OpenQueueContextSp& context, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, - const mqbi::Domain::OpenQueueConfirmationCookie& confirmationCookie, - const bmqp_ctrlmsg::ControlMessage& request, - mqbc::ClusterNodeSession* requester, - const int peerInstanceId); + const mqbi::Domain::OpenQueueConfirmationCookie& confirmationCookie); /// Callback invoked in response to an open queue request to the domain /// (in the specified `request`). If the specified `status` is SUCCESS, diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.cpp b/src/groups/mqb/mqbblp/mqbblp_domain.cpp index 7f91b64b6b..0c08645fd4 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_domain.cpp @@ -193,149 +193,26 @@ int normalizeConfig(mqbconfm::Domain* defn, void Domain::onOpenQueueResponse( const bmqp_ctrlmsg::Status& status, - mqbi::Queue* queue, + mqbi::QueueHandle* queuehandle, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, const mqbi::Cluster::OpenQueueConfirmationCookie& confirmationCookie, - const bsl::shared_ptr& clientContext, - const bmqp_ctrlmsg::QueueHandleParameters& handleParameters, - const mqbi::Domain::OpenQueueCallback& callback) + const mqbi::Domain::OpenQueueCallback& callback) { - // executed by the associated CLUSTER's DISPATCHER thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get())); + // executed by *ANY* thread --d_pendingRequests; - if (status.category() != bmqp_ctrlmsg::StatusCategory::E_SUCCESS) { - // Failed to open queue - callback(status, - static_cast(0), - openQueueResponse, - confirmationCookie); - return; // RETURN - } - - // VALIDATION: The queue must exist at this point, i.e., have been - // registered. - BSLS_ASSERT_SAFE(queue); - BSLS_ASSERT_SAFE(lookupQueue(0, queue->uri()) == 0); - - const bmqp_ctrlmsg::QueueHandleParameters& upstreamHandleParams = - openQueueResponse.originalRequest().handleParameters(); - const unsigned int upstreamSubQueueId = bmqp::QueueUtil::extractSubQueueId( - upstreamHandleParams); - - queue->getHandle(clientContext, - handleParameters, - upstreamSubQueueId, - bdlf::BindUtil::bind(callback, - bdlf::PlaceHolders::_1, // status - bdlf::PlaceHolders::_2, // handle - openQueueResponse, - confirmationCookie)); -} - -void Domain::updateAuthorizedAppIds(const AppInfos& addedAppIds, - const AppInfos& removedAppIds) -{ - mqbconfm::QueueMode& queueMode = d_config.value().mode(); - if (!queueMode.isFanoutValue()) { - return; // RETURN - } - bsl::vector& authorizedAppIds = queueMode.fanout().appIDs(); - - for (AppInfosCIter cit = addedAppIds.cbegin(); cit != addedAppIds.cend(); - ++cit) { - if (bsl::find(authorizedAppIds.begin(), - authorizedAppIds.end(), - cit->first) != authorizedAppIds.end()) { - // No need to log error here. When a new appId is registered for a - // domain, multiple queues will be affected, so duplicate calls to - // this method is expected. - - continue; // CONTINUE - } - authorizedAppIds.push_back(cit->first); - } - - for (AppInfosCIter cit = removedAppIds.cbegin(); - cit != removedAppIds.cend(); - ++cit) { - const bsl::vector::const_iterator it = bsl::find( - authorizedAppIds.begin(), - authorizedAppIds.end(), - cit->first); - if (it == authorizedAppIds.end()) { - // No need to log error here. When an appId is unregistered from a - // domain, multiple queues will be affected, so duplicate calls to - // this method is expected. - - continue; // CONTINUE - } - authorizedAppIds.erase(it); - } -} - -void Domain::onQueueAssigned( - const bsl::shared_ptr& info) -{ - // executed by the associated CLUSTER's DISPATCHER thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get())); - BSLS_ASSERT_SAFE(info); - - if (!d_cluster_sp->isCSLModeEnabled()) { - return; // RETURN - } - - if (d_state != e_STARTED) { - return; // RETURN - } - - if (info->uri().domain() != d_name) { - // Note: This method will fire on all domains which belong to the - // cluster having the queue assignment, but we examine the domain - // name from the 'uri' to guarantee that only one domain is - // updated. - - return; // RETURN + if (status.category() == bmqp_ctrlmsg::StatusCategory::E_SUCCESS) { + // VALIDATION: The queue must exist at this point, i.e., have been + // registered.FE( + BSLS_ASSERT_SAFE(queuehandle); + BSLS_ASSERT_SAFE(queuehandle->queue()); + BSLS_ASSERT_SAFE(lookupQueue(0, queuehandle->queue()->uri()) == 0); } - - updateAuthorizedAppIds(info->appInfos()); -} - -void Domain::onQueueUpdated(const bmqt::Uri& uri, - const bsl::string& domain, - const AppInfos& addedAppIds, - const AppInfos& removedAppIds) -{ - // executed by the associated CLUSTER's DISPATCHER thread - - // PRECONDITIONS - BSLS_ASSERT_SAFE( - d_cluster_sp->dispatcher()->inDispatcherThread(d_cluster_sp.get())); - - if (!d_cluster_sp->isCSLModeEnabled()) { - return; // RETURN - } - - if (d_state != e_STARTED) { - return; // RETURN - } - - if (uri.isValid()) { - BSLS_ASSERT_SAFE(uri.qualifiedDomain() == domain); + else { + queuehandle = 0; } - // Note: This method will fire on all domains which belong to the cluster - // having the queue update, but we examine the domain name to - // guarantee that only one domain is updated. - if (d_name == domain) { - updateAuthorizedAppIds(addedAppIds, removedAppIds); - } + callback(status, queuehandle, openQueueResponse, confirmationCookie); } Domain::Domain(const bsl::string& name, @@ -369,8 +246,6 @@ Domain::Domain(const bsl::string& name, // Initialize stats d_domainsStats.initialize(this, d_domainsStatContext_p, allocator); - - d_cluster_sp->registerStateObserver(this); } Domain::~Domain() @@ -503,8 +378,6 @@ void Domain::teardown(const mqbi::Domain::TeardownCb& teardownCb) d_teardownCb = teardownCb; d_state = e_STOPPING; - d_cluster_sp->unregisterStateObserver(this); - if (d_queues.empty()) { d_teardownCb(d_name); d_state = e_STOPPED; @@ -531,8 +404,6 @@ void Domain::teardownRemove(const TeardownCb& teardownCb) d_teardownRemoveCb = teardownCb; d_state = e_REMOVING; - d_cluster_sp->unregisterStateObserver(this); - if (d_queues.empty()) { d_teardownRemoveCb(d_name); d_state = e_REMOVED; @@ -598,8 +469,6 @@ void Domain::openQueue( bdlf::PlaceHolders::_2, // queue bdlf::PlaceHolders::_3, // openQueueResponse bdlf::PlaceHolders::_4, // confirmationCookie - clientContext, - handleParameters, callback)); } diff --git a/src/groups/mqb/mqbblp/mqbblp_domain.h b/src/groups/mqb/mqbblp/mqbblp_domain.h index 093e5b5e1f..a49678c857 100644 --- a/src/groups/mqb/mqbblp/mqbblp_domain.h +++ b/src/groups/mqb/mqbblp/mqbblp_domain.h @@ -89,8 +89,7 @@ namespace mqbblp { // ============ /// Domain implementation -class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, - public mqbc::ClusterStateObserver { +class Domain : public mqbi::Domain { private: // CLASS-SCOPE CATEGORY BALL_LOG_SET_CLASS_CATEGORY("MQBBLP.DOMAIN"); @@ -204,44 +203,14 @@ class Domain BSLS_KEYWORD_FINAL : public mqbi::Domain, /// `confirmationCookie` to propagate the result to the requester. void onOpenQueueResponse( const bmqp_ctrlmsg::Status& status, - mqbi::Queue* queue, + mqbi::QueueHandle* queuehandle, const bmqp_ctrlmsg::OpenQueueResponse& openQueueResponse, const mqbi::Cluster::OpenQueueConfirmationCookie& confirmationCookie, - const bsl::shared_ptr& - clientContext, - const bmqp_ctrlmsg::QueueHandleParameters& handleParameters, - const mqbi::Domain::OpenQueueCallback& callback); - - /// Update the list of authorized appIds by adding the specified - /// `addedAppIds` and removing the specified `removedAppIds`. - void updateAuthorizedAppIds(const AppInfos& addedAppIds, - const AppInfos& removedAppIds = AppInfos()); + const mqbi::Domain::OpenQueueCallback& callback); // PRIVATE MANIPULATORS // (virtual: mqbc::ClusterStateObserver) - /// Callback invoked when a queue with the specified `info` gets - /// assigned to the cluster. - /// - /// THREAD: This method is invoked in the associated cluster's - /// dispatcher thread. - void - onQueueAssigned(const bsl::shared_ptr& info) - BSLS_KEYWORD_OVERRIDE; - - /// Callback invoked when a queue with the specified `uri` belonging to - /// the specified `domain` is updated with the optionally specified - /// `addedAppIds` and `removedAppIds`. If the specified `uri` is empty, - /// the appId updates are applied to the entire `domain` instead. - /// - /// Note: The `uri` could belong to a different domain than this one, in - /// which case this queue update is ignored. - void onQueueUpdated(const bmqt::Uri& uri, - const bsl::string& domain, - const AppInfos& addedAppIds, - const AppInfos& removedAppIds = AppInfos()) - BSLS_KEYWORD_OVERRIDE; - private: // NOT IMPLEMENTED diff --git a/src/groups/mqb/mqbi/mqbi_cluster.h b/src/groups/mqb/mqbi/mqbi_cluster.h index 894642502f..ecfdc29174 100644 --- a/src/groups/mqb/mqbi/mqbi_cluster.h +++ b/src/groups/mqb/mqbi/mqbi_cluster.h @@ -224,7 +224,7 @@ class Cluster : public DispatcherClient { /// above). typedef bsl::function OpenQueueCallback;