diff --git a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp index 4c93ef73c7..bf9229a988 100644 --- a/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_brokersession.cpp @@ -1350,7 +1350,7 @@ BrokerSession::QueueFsm::QueueFsm(BrokerSession& session) typedef QueueState S; typedef QueueFsmEvent E; QueueStateTransition table[] = { - // current state event new state + // current state event new state {S::e_CLOSED, E::e_OPEN_CMD, S::e_OPENING_OPN}, // {S::e_OPENING_OPN, E::e_RESP_OK, S::e_OPENING_CFG}, @@ -3533,10 +3533,9 @@ void BrokerSession::processPushEvent(const bmqp::Event& event) bmqt::CorrelationId correlationId; unsigned int subscriptionHandleId; const QueueManager::QueueSp queue = - d_queueManager.observePushEvent( - &correlationId, - &subscriptionHandleId, - *citer); + d_queueManager.observePushEvent(&correlationId, + &subscriptionHandleId, + *citer); BSLS_ASSERT(queue); queueEvent->insertQueue(citer->d_subscriptionId, queue); @@ -3545,9 +3544,7 @@ void BrokerSession::processPushEvent(const bmqp::Event& event) // 'citer->d_subscriptionId' so that // 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle' - queueEvent->addCorrelationId( - correlationId, - subscriptionHandleId); + queueEvent->addCorrelationId(correlationId, subscriptionHandleId); } // Update event bytes @@ -5275,10 +5272,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, subscription.expression().text() = from.expression().text(); streamParams.subscriptions().emplace_back(subscription); - queue->registerInternalSubscriptionId( - internalSubscriptionId, - cit->first.id(), - cit->first.correlationId()); + queue->registerInternalSubscriptionId(internalSubscriptionId, + cit->first.id(), + cit->first.correlationId()); } return context; // RETURN } @@ -5314,10 +5310,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr& queue, streamParams.consumerPriority() = options.consumerPriority(); streamParams.consumerPriorityCount() = 1; - queue->registerInternalSubscriptionId( - queue->subQueueId(), - queue->subQueueId(), - bmqt::CorrelationId()); + queue->registerInternalSubscriptionId(queue->subQueueId(), + queue->subQueueId(), + bmqt::CorrelationId()); } return context; @@ -6140,7 +6135,7 @@ void BrokerSession::onConfigureQueueResponse( res == bmqt::GenericResult::e_NOT_CONNECTED || res == bmqt::GenericResult::e_NOT_SUPPORTED); - (void) res; + (void)res; BALL_LOG_INFO << "Ignore cancelled request: " << context->request(); return; // RETURN diff --git a/src/groups/bmq/bmqimp/bmqimp_event.h b/src/groups/bmq/bmqimp/bmqimp_event.h index 8e75a6faea..9e9d39658b 100644 --- a/src/groups/bmq/bmqimp/bmqimp_event.h +++ b/src/groups/bmq/bmqimp/bmqimp_event.h @@ -797,9 +797,8 @@ inline bmqp::PutEventBuilder* Event::putEventBuilder() return &(d_putEventBuilderBuffer.object()); } -inline void Event::addCorrelationId( - const bmqt::CorrelationId& correlationId, - unsigned int subscriptionHandleId) +inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId, + unsigned int subscriptionHandleId) { // TODO: when ACK event is created locally we have to fill d_correlationIds // before the raw ACK 'bmqp::Event' is created and may be used to @@ -812,9 +811,7 @@ inline void Event::addCorrelationId( // BSLS_ASSERT_SAFE(d_rawEvent.isAckEvent()); d_correlationIds.push_back( - bsl::make_pair( - correlationId, - subscriptionHandleId)); + bsl::make_pair(correlationId, subscriptionHandleId)); } } // close package namespace diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp index ac76849bd4..65a83be8d6 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp @@ -346,11 +346,10 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event) iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo); QueueManager::QueueSp queue = - d_queueManager_p->lookupQueueBySubscriptionId( - &correlationId, - &subscriptionHandle, - qId, - subscriptionId); + d_queueManager_p->lookupQueueBySubscriptionId(&correlationId, + &subscriptionHandle, + qId, + subscriptionId); BSLS_ASSERT_SAFE(queue); out << "PUSH Message #" << ++msgNum << ": " diff --git a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp index 9f42d6aa96..52c6c05da7 100644 --- a/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp @@ -545,16 +545,14 @@ void Tester::registerSubscription(const bslstl::StringRef& uri, unsigned int subscriptionId, const bmqt::CorrelationId& correlationId) { - const bmqimp::QueueManager::QueueSp& queue = - d_queueManager.lookupQueue( - uri); + const bmqimp::QueueManager::QueueSp& queue = d_queueManager.lookupQueue( + uri); BSLS_ASSERT_SAFE(queue); - queue->registerInternalSubscriptionId( - subscriptionId, - subscriptionId, - correlationId); + queue->registerInternalSubscriptionId(subscriptionId, + subscriptionId, + correlationId); } void Tester::updateSubscriptions(const bslstl::StringRef& uri, diff --git a/src/groups/bmq/bmqimp/bmqimp_queue.h b/src/groups/bmq/bmqimp/bmqimp_queue.h index 1d19b326d2..659f1cba43 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queue.h +++ b/src/groups/bmq/bmqimp/bmqimp_queue.h @@ -174,8 +174,8 @@ struct QueueStatsUtil { class Queue { public: // PUBLIC TYPES - typedef bsl::pair SubscriptionHandle; - // Not using private 'bmqt::SubscriptionHandle' ctor + typedef bsl::pair SubscriptionHandle; + // Not using private 'bmqt::SubscriptionHandle' ctor private: // DATA @@ -262,11 +262,11 @@ class Queue { bmqp_ctrlmsg::StreamParameters d_config; bsl::unordered_map - d_registeredInternalSubscriptionIds; - // This keeps SubscriptionHandle (id and CorrelationId) for Configure - // response processing. - // Supporting multiple concurrent Configure requests. - // TODO: This should go into ConfigureRequest context. + d_registeredInternalSubscriptionIds; + // This keeps SubscriptionHandle (id and CorrelationId) for Configure + // response processing. + // Supporting multiple concurrent Configure requests. + // TODO: This should go into ConfigureRequest context. private: // NOT IMPLEMENTED @@ -337,19 +337,19 @@ class Queue { /// reinitialize the state before a new start). void clearStatContext(); - void registerInternalSubscriptionId( - unsigned int internalSubscriptionId, - unsigned int subscriptionHandleId, - const bmqt::CorrelationId& correlationId); - // Keep the specified 'subscriptionHandleId' and 'correlationId' - // associated with the specified 'internalSubscriptionId' between - // Configure request and Configure response (until - // 'extractSubscriptionHandle'). + void + registerInternalSubscriptionId(unsigned int internalSubscriptionId, + unsigned int subscriptionHandleId, + const bmqt::CorrelationId& correlationId); + // Keep the specified 'subscriptionHandleId' and 'correlationId' + // associated with the specified 'internalSubscriptionId' between + // Configure request and Configure response (until + // 'extractSubscriptionHandle'). - SubscriptionHandle extractSubscriptionHandle( - unsigned int internalSubscriptionId); - // Lookup, copy, erase, and return the copy of what was registered - // by 'registerInternalSubscriptionId'. + SubscriptionHandle + extractSubscriptionHandle(unsigned int internalSubscriptionId); + // Lookup, copy, erase, and return the copy of what was registered + // by 'registerInternalSubscriptionId'. // ACCESSORS @@ -554,26 +554,24 @@ inline Queue& Queue::setConfig(const bmqp_ctrlmsg::StreamParameters& value) return *this; } -inline -void Queue::registerInternalSubscriptionId( - unsigned int internalSubscriptionId, - unsigned int subscriptionHandleId, - const bmqt::CorrelationId& correlationId) +inline void +Queue::registerInternalSubscriptionId(unsigned int internalSubscriptionId, + unsigned int subscriptionHandleId, + const bmqt::CorrelationId& correlationId) { d_registeredInternalSubscriptionIds.emplace( - internalSubscriptionId, - SubscriptionHandle(subscriptionHandleId, correlationId)); + internalSubscriptionId, + SubscriptionHandle(subscriptionHandleId, correlationId)); } -inline -Queue::SubscriptionHandle +inline Queue::SubscriptionHandle Queue::extractSubscriptionHandle(unsigned int internalSubscriptionId) { - bsl::unordered_map::const_iterator - cit = d_registeredInternalSubscriptionIds.find(internalSubscriptionId); + bsl::unordered_map::const_iterator cit = + d_registeredInternalSubscriptionIds.find(internalSubscriptionId); if (cit == d_registeredInternalSubscriptionIds.end()) { - return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN + return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN } SubscriptionHandle result(cit->second); diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp index 2ab0aae6de..0aea8d0bd6 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp @@ -66,10 +66,10 @@ QueueManager::lookupQueueLocked(const bmqp::QueueId& queueId) const } QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked( - bmqt::CorrelationId *correlationId, - unsigned int *subscriptionHandleId, - int qId, - unsigned int internalSubscriptionId) const + bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, + int qId, + unsigned int internalSubscriptionId) const { // PRECONDITIONS // d_queuesLock locked @@ -87,8 +87,8 @@ QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked( // lookup by 'subscriptionId' SubscriptionId id(qId, internalSubscriptionId); - QueuesBySubscriptions::const_iterator cit = - d_queuesBySubscriptionIds.find(id); + QueuesBySubscriptions::const_iterator cit = d_queuesBySubscriptionIds.find( + id); if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY( cit == d_queuesBySubscriptionIds.end())) { @@ -98,8 +98,8 @@ QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked( BSLS_ASSERT_SAFE(cit->second.d_queue); - *subscriptionHandleId = cit->second.d_subscriptionHandle.first; - *correlationId = cit->second.d_subscriptionHandle.second; + *subscriptionHandleId = cit->second.d_subscriptionHandle.first; + *correlationId = cit->second.d_subscriptionHandle.second; return cit->second.d_queue; } @@ -289,10 +289,9 @@ void QueueManager::resetState() } const QueueManager::QueueSp -QueueManager::observePushEvent( - bmqt::CorrelationId *correlationId, - unsigned int *subscriptionHandle, - const bmqp::EventUtilQueueInfo& info) +QueueManager::observePushEvent(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandle, + const bmqp::EventUtilQueueInfo& info) { // Update stats const QueueSp queue = lookupQueueBySubscriptionIdLocked( @@ -497,8 +496,8 @@ void QueueManager::resetSubStreamCount(const bsl::string& canonicalUri) } void QueueManager::updateSubscriptions( - const bsl::shared_ptr& queue, - const bmqp_ctrlmsg::StreamParameters& config) + const bsl::shared_ptr& queue, + const bmqp_ctrlmsg::StreamParameters& config) { BSLS_ASSERT_SAFE(queue); @@ -506,28 +505,19 @@ void QueueManager::updateSubscriptions( for (size_t i = 0; i < previous.subscriptions().size(); ++i) { unsigned int internalSubscriptionId = - previous.subscriptions()[i].sId(); + previous.subscriptions()[i].sId(); - SubscriptionId id( - queue->id(), - internalSubscriptionId); + SubscriptionId id(queue->id(), internalSubscriptionId); d_queuesBySubscriptionIds.erase(id); } for (size_t i = 0; i < config.subscriptions().size(); ++i) { + unsigned int internalSubscriptionId = config.subscriptions()[i].sId(); - unsigned int internalSubscriptionId = - config.subscriptions()[i].sId(); - - d_queuesBySubscriptionIds.insert( - bsl::make_pair( - SubscriptionId( - queue->id(), - internalSubscriptionId), - QueueBySubscription( - internalSubscriptionId, - queue))); + d_queuesBySubscriptionIds.insert(bsl::make_pair( + SubscriptionId(queue->id(), internalSubscriptionId), + QueueBySubscription(internalSubscriptionId, queue))); } } diff --git a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h index 90aa911d03..4eb13cf4f1 100644 --- a/src/groups/bmq/bmqimp/bmqimp_queuemanager.h +++ b/src/groups/bmq/bmqimp/bmqimp_queuemanager.h @@ -151,7 +151,7 @@ class QueueManager { public: // TYPES - typedef bsl::shared_ptr QueueSp; + typedef bsl::shared_ptr QueueSp; typedef bsl::pair SubscriptionUandle; struct QueueBySubscription { @@ -161,9 +161,8 @@ class QueueManager { // The Subscription correlationId as // specified in the configure request. - QueueBySubscription( - unsigned int internalSubscriptionId, - const bsl::shared_ptr& queue); + QueueBySubscription(unsigned int internalSubscriptionId, + const bsl::shared_ptr& queue); }; /// Subscription id -> {Queue, Subscription correlationId} @@ -247,11 +246,10 @@ class QueueManager { /// successful lookup Load the Subscription correlationId into the /// specified `correlationId`. QueueSp - lookupQueueBySubscriptionIdLocked( - bmqt::CorrelationId *correlationId, - unsigned int *subscriptionHandleId, - int qid, - unsigned int innerSubscriptionId) const; + lookupQueueBySubscriptionIdLocked(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandleId, + int qid, + unsigned int innerSubscriptionId) const; public: // TRAITS @@ -312,10 +310,9 @@ class QueueManager { bool* hasMessageWithMultipleSubQueueIds, const bmqp::PushMessageIterator& iterator); - const QueueSp observePushEvent( - bmqt::CorrelationId *correlationId, - unsigned int *subscriptionHandle, - const bmqp::EventUtilQueueInfo& info); + const QueueSp observePushEvent(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandle, + const bmqp::EventUtilQueueInfo& info); /// Update stats for the queue(s) corresponding to the messages pointed /// to by the specified `iterator` and populate the specified @@ -343,9 +340,8 @@ class QueueManager { /// object. void resetSubStreamCount(const bsl::string& canonicalUri); - void updateSubscriptions( - const bsl::shared_ptr& queue, - const bmqp_ctrlmsg::StreamParameters& config); + void updateSubscriptions(const bsl::shared_ptr& queue, + const bmqp_ctrlmsg::StreamParameters& config); // ACCESSORS QueueSp lookupQueue(const bmqt::Uri& uri) const; @@ -355,11 +351,10 @@ class QueueManager { /// Lookup the queue with the specified `queueId`, or `uri`, or /// `correlationId`, and return a shared pointer to the Queue object (if /// found), or an empty shared pointer (if not found). - QueueSp lookupQueueBySubscriptionId( - bmqt::CorrelationId* correlationId, - unsigned int *subscriptionHandle, - int queueId, - unsigned int subscriptionId) const; + QueueSp lookupQueueBySubscriptionId(bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandle, + int queueId, + unsigned int subscriptionId) const; // TBD: Temporary method to enable refactoring of 'BrokerSession'. // Specifically, reopen logic in 'BrokerSession::processPacket'. @@ -386,11 +381,11 @@ class QueueManager { // ---------------------------------------- inline QueueManager::QueueBySubscription::QueueBySubscription( - unsigned int internalSubscriptionId, - const bsl::shared_ptr& queue) + unsigned int internalSubscriptionId, + const bsl::shared_ptr& queue) : d_queue(queue) , d_subscriptionHandle( - queue->extractSubscriptionHandle(internalSubscriptionId)) + queue->extractSubscriptionHandle(internalSubscriptionId)) { // NOTHING } @@ -441,20 +436,18 @@ QueueManager::lookupQueue(const bmqp::QueueId& queueId) const return lookupQueueLocked(queueId); } -inline QueueManager::QueueSp -QueueManager::lookupQueueBySubscriptionId( - bmqt::CorrelationId* correlationId, - unsigned int *subscriptionHandle, - int queueId, - unsigned int internalSubscriptionId) const +inline QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionId( + bmqt::CorrelationId* correlationId, + unsigned int* subscriptionHandle, + int queueId, + unsigned int internalSubscriptionId) const { bsls::SpinLockGuard guard(&d_queuesLock); // d_queuesLock LOCKED - return lookupQueueBySubscriptionIdLocked( - correlationId, - subscriptionHandle, - queueId, - internalSubscriptionId); + return lookupQueueBySubscriptionIdLocked(correlationId, + subscriptionHandle, + queueId, + internalSubscriptionId); } } // close package namespace