Skip to content

Commit

Permalink
clang-format
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo committed Oct 25, 2023
1 parent 21bc9dd commit e6eec92
Show file tree
Hide file tree
Showing 7 changed files with 100 additions and 130 deletions.
29 changes: 12 additions & 17 deletions src/groups/bmq/bmqimp/bmqimp_brokersession.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1350,7 +1350,7 @@ BrokerSession::QueueFsm::QueueFsm(BrokerSession& session)
typedef QueueState S;
typedef QueueFsmEvent E;
QueueStateTransition table[] = {
// current state event new state
// current state event new state
{S::e_CLOSED, E::e_OPEN_CMD, S::e_OPENING_OPN},
//
{S::e_OPENING_OPN, E::e_RESP_OK, S::e_OPENING_CFG},
Expand Down Expand Up @@ -3533,10 +3533,9 @@ void BrokerSession::processPushEvent(const bmqp::Event& event)
bmqt::CorrelationId correlationId;
unsigned int subscriptionHandleId;
const QueueManager::QueueSp queue =
d_queueManager.observePushEvent(
&correlationId,
&subscriptionHandleId,
*citer);
d_queueManager.observePushEvent(&correlationId,
&subscriptionHandleId,
*citer);

BSLS_ASSERT(queue);
queueEvent->insertQueue(citer->d_subscriptionId, queue);
Expand All @@ -3545,9 +3544,7 @@ void BrokerSession::processPushEvent(const bmqp::Event& event)
// 'citer->d_subscriptionId' so that
// 'bmqimp::Event::subscriptionId()' returns 'subscriptionHandle'

queueEvent->addCorrelationId(
correlationId,
subscriptionHandleId);
queueEvent->addCorrelationId(correlationId, subscriptionHandleId);
}

// Update event bytes
Expand Down Expand Up @@ -5275,10 +5272,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
subscription.expression().text() = from.expression().text();

streamParams.subscriptions().emplace_back(subscription);
queue->registerInternalSubscriptionId(
internalSubscriptionId,
cit->first.id(),
cit->first.correlationId());
queue->registerInternalSubscriptionId(internalSubscriptionId,
cit->first.id(),
cit->first.correlationId());
}
return context; // RETURN
}
Expand Down Expand Up @@ -5314,10 +5310,9 @@ BrokerSession::createConfigureQueueContext(const bsl::shared_ptr<Queue>& queue,
streamParams.consumerPriority() = options.consumerPriority();
streamParams.consumerPriorityCount() = 1;

queue->registerInternalSubscriptionId(
queue->subQueueId(),
queue->subQueueId(),
bmqt::CorrelationId());
queue->registerInternalSubscriptionId(queue->subQueueId(),
queue->subQueueId(),
bmqt::CorrelationId());
}

return context;
Expand Down Expand Up @@ -6140,7 +6135,7 @@ void BrokerSession::onConfigureQueueResponse(
res == bmqt::GenericResult::e_NOT_CONNECTED ||
res == bmqt::GenericResult::e_NOT_SUPPORTED);

(void) res;
(void)res;
BALL_LOG_INFO << "Ignore cancelled request: "
<< context->request();
return; // RETURN
Expand Down
9 changes: 3 additions & 6 deletions src/groups/bmq/bmqimp/bmqimp_event.h
Original file line number Diff line number Diff line change
Expand Up @@ -797,9 +797,8 @@ inline bmqp::PutEventBuilder* Event::putEventBuilder()
return &(d_putEventBuilderBuffer.object());
}

inline void Event::addCorrelationId(
const bmqt::CorrelationId& correlationId,
unsigned int subscriptionHandleId)
inline void Event::addCorrelationId(const bmqt::CorrelationId& correlationId,
unsigned int subscriptionHandleId)
{
// TODO: when ACK event is created locally we have to fill d_correlationIds
// before the raw ACK 'bmqp::Event' is created and may be used to
Expand All @@ -812,9 +811,7 @@ inline void Event::addCorrelationId(
// BSLS_ASSERT_SAFE(d_rawEvent.isAckEvent());

d_correlationIds.push_back(
bsl::make_pair(
correlationId,
subscriptionHandleId));
bsl::make_pair(correlationId, subscriptionHandleId));
}

} // close package namespace
Expand Down
9 changes: 4 additions & 5 deletions src/groups/bmq/bmqimp/bmqimp_messagedumper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -346,11 +346,10 @@ void MessageDumper::dumpPushEvent(bsl::ostream& out, const bmqp::Event& event)
iter.extractQueueInfo(&qId, &subscriptionId, &rdaInfo);

QueueManager::QueueSp queue =
d_queueManager_p->lookupQueueBySubscriptionId(
&correlationId,
&subscriptionHandle,
qId,
subscriptionId);
d_queueManager_p->lookupQueueBySubscriptionId(&correlationId,
&subscriptionHandle,
qId,
subscriptionId);
BSLS_ASSERT_SAFE(queue);

out << "PUSH Message #" << ++msgNum << ": "
Expand Down
12 changes: 5 additions & 7 deletions src/groups/bmq/bmqimp/bmqimp_messagedumper.t.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -545,16 +545,14 @@ void Tester::registerSubscription(const bslstl::StringRef& uri,
unsigned int subscriptionId,
const bmqt::CorrelationId& correlationId)
{
const bmqimp::QueueManager::QueueSp& queue =
d_queueManager.lookupQueue(
uri);
const bmqimp::QueueManager::QueueSp& queue = d_queueManager.lookupQueue(
uri);

BSLS_ASSERT_SAFE(queue);

queue->registerInternalSubscriptionId(
subscriptionId,
subscriptionId,
correlationId);
queue->registerInternalSubscriptionId(subscriptionId,
subscriptionId,
correlationId);
}

void Tester::updateSubscriptions(const bslstl::StringRef& uri,
Expand Down
60 changes: 29 additions & 31 deletions src/groups/bmq/bmqimp/bmqimp_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,8 @@ struct QueueStatsUtil {
class Queue {
public:
// PUBLIC TYPES
typedef bsl::pair<unsigned int, bmqt::CorrelationId> SubscriptionHandle;
// Not using private 'bmqt::SubscriptionHandle' ctor
typedef bsl::pair<unsigned int, bmqt::CorrelationId> SubscriptionHandle;
// Not using private 'bmqt::SubscriptionHandle' ctor

private:
// DATA
Expand Down Expand Up @@ -262,11 +262,11 @@ class Queue {
bmqp_ctrlmsg::StreamParameters d_config;

bsl::unordered_map<unsigned int, SubscriptionHandle>
d_registeredInternalSubscriptionIds;
// This keeps SubscriptionHandle (id and CorrelationId) for Configure
// response processing.
// Supporting multiple concurrent Configure requests.
// TODO: This should go into ConfigureRequest context.
d_registeredInternalSubscriptionIds;
// This keeps SubscriptionHandle (id and CorrelationId) for Configure
// response processing.
// Supporting multiple concurrent Configure requests.
// TODO: This should go into ConfigureRequest context.

private:
// NOT IMPLEMENTED
Expand Down Expand Up @@ -337,19 +337,19 @@ class Queue {
/// reinitialize the state before a new start).
void clearStatContext();

void registerInternalSubscriptionId(
unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId);
// Keep the specified 'subscriptionHandleId' and 'correlationId'
// associated with the specified 'internalSubscriptionId' between
// Configure request and Configure response (until
// 'extractSubscriptionHandle').
void
registerInternalSubscriptionId(unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId);
// Keep the specified 'subscriptionHandleId' and 'correlationId'
// associated with the specified 'internalSubscriptionId' between
// Configure request and Configure response (until
// 'extractSubscriptionHandle').

SubscriptionHandle extractSubscriptionHandle(
unsigned int internalSubscriptionId);
// Lookup, copy, erase, and return the copy of what was registered
// by 'registerInternalSubscriptionId'.
SubscriptionHandle
extractSubscriptionHandle(unsigned int internalSubscriptionId);
// Lookup, copy, erase, and return the copy of what was registered
// by 'registerInternalSubscriptionId'.

// ACCESSORS

Expand Down Expand Up @@ -554,26 +554,24 @@ inline Queue& Queue::setConfig(const bmqp_ctrlmsg::StreamParameters& value)
return *this;
}

inline
void Queue::registerInternalSubscriptionId(
unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId)
inline void
Queue::registerInternalSubscriptionId(unsigned int internalSubscriptionId,
unsigned int subscriptionHandleId,
const bmqt::CorrelationId& correlationId)
{
d_registeredInternalSubscriptionIds.emplace(
internalSubscriptionId,
SubscriptionHandle(subscriptionHandleId, correlationId));
internalSubscriptionId,
SubscriptionHandle(subscriptionHandleId, correlationId));
}

inline
Queue::SubscriptionHandle
inline Queue::SubscriptionHandle
Queue::extractSubscriptionHandle(unsigned int internalSubscriptionId)
{
bsl::unordered_map<unsigned int, SubscriptionHandle>::const_iterator
cit = d_registeredInternalSubscriptionIds.find(internalSubscriptionId);
bsl::unordered_map<unsigned int, SubscriptionHandle>::const_iterator cit =
d_registeredInternalSubscriptionIds.find(internalSubscriptionId);

if (cit == d_registeredInternalSubscriptionIds.end()) {
return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN
return {internalSubscriptionId, bmqt::CorrelationId()}; // RETURN
}

SubscriptionHandle result(cit->second);
Expand Down
48 changes: 19 additions & 29 deletions src/groups/bmq/bmqimp/bmqimp_queuemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,10 +66,10 @@ QueueManager::lookupQueueLocked(const bmqp::QueueId& queueId) const
}

QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked(
bmqt::CorrelationId *correlationId,
unsigned int *subscriptionHandleId,
int qId,
unsigned int internalSubscriptionId) const
bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandleId,
int qId,
unsigned int internalSubscriptionId) const
{
// PRECONDITIONS
// d_queuesLock locked
Expand All @@ -87,8 +87,8 @@ QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked(
// lookup by 'subscriptionId'
SubscriptionId id(qId, internalSubscriptionId);

QueuesBySubscriptions::const_iterator cit =
d_queuesBySubscriptionIds.find(id);
QueuesBySubscriptions::const_iterator cit = d_queuesBySubscriptionIds.find(
id);

if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(
cit == d_queuesBySubscriptionIds.end())) {
Expand All @@ -98,8 +98,8 @@ QueueManager::QueueSp QueueManager::lookupQueueBySubscriptionIdLocked(

BSLS_ASSERT_SAFE(cit->second.d_queue);

*subscriptionHandleId = cit->second.d_subscriptionHandle.first;
*correlationId = cit->second.d_subscriptionHandle.second;
*subscriptionHandleId = cit->second.d_subscriptionHandle.first;
*correlationId = cit->second.d_subscriptionHandle.second;

return cit->second.d_queue;
}
Expand Down Expand Up @@ -289,10 +289,9 @@ void QueueManager::resetState()
}

const QueueManager::QueueSp
QueueManager::observePushEvent(
bmqt::CorrelationId *correlationId,
unsigned int *subscriptionHandle,
const bmqp::EventUtilQueueInfo& info)
QueueManager::observePushEvent(bmqt::CorrelationId* correlationId,
unsigned int* subscriptionHandle,
const bmqp::EventUtilQueueInfo& info)
{
// Update stats
const QueueSp queue = lookupQueueBySubscriptionIdLocked(
Expand Down Expand Up @@ -497,37 +496,28 @@ void QueueManager::resetSubStreamCount(const bsl::string& canonicalUri)
}

void QueueManager::updateSubscriptions(
const bsl::shared_ptr<Queue>& queue,
const bmqp_ctrlmsg::StreamParameters& config)
const bsl::shared_ptr<Queue>& queue,
const bmqp_ctrlmsg::StreamParameters& config)
{
BSLS_ASSERT_SAFE(queue);

const bmqp_ctrlmsg::StreamParameters& previous = queue->config();

for (size_t i = 0; i < previous.subscriptions().size(); ++i) {
unsigned int internalSubscriptionId =
previous.subscriptions()[i].sId();
previous.subscriptions()[i].sId();

SubscriptionId id(
queue->id(),
internalSubscriptionId);
SubscriptionId id(queue->id(), internalSubscriptionId);

d_queuesBySubscriptionIds.erase(id);
}

for (size_t i = 0; i < config.subscriptions().size(); ++i) {
unsigned int internalSubscriptionId = config.subscriptions()[i].sId();

unsigned int internalSubscriptionId =
config.subscriptions()[i].sId();

d_queuesBySubscriptionIds.insert(
bsl::make_pair(
SubscriptionId(
queue->id(),
internalSubscriptionId),
QueueBySubscription(
internalSubscriptionId,
queue)));
d_queuesBySubscriptionIds.insert(bsl::make_pair(
SubscriptionId(queue->id(), internalSubscriptionId),
QueueBySubscription(internalSubscriptionId, queue)));
}
}

Expand Down
Loading

0 comments on commit e6eec92

Please sign in to comment.