Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqbc::StorageMgr, RecoveryMgr: Thread safety improvements #367

Merged
merged 16 commits into from
Aug 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 16 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
// Start the StorageManager
d_storageManager_mp.load(
isFSMWorkflow()
? static_cast<mqbc::StorageManager*>(
? static_cast<mqbi::StorageManager*>(
new (*storageManagerAllocator) mqbc::StorageManager(
d_clusterData.clusterConfig(),
this,
Expand Down Expand Up @@ -219,12 +219,12 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
bdlf::PlaceHolders::_3), // primary leaseId
d_clusterData.domainFactory(),
dispatcher(),
d_clusterData.miscWorkThreadPool(),
&d_clusterData.miscWorkThreadPool(),
storageManagerAllocator)),
storageManagerAllocator);

// Start the misc work thread pool
*rc = d_clusterData.miscWorkThreadPool()->start();
*rc = d_clusterData.miscWorkThreadPool().start();
if (*rc != 0) {
d_clusterOrchestrator.stop();
*rc = *rc * 10 + rc_MISC_FAILURE;
Expand Down Expand Up @@ -278,14 +278,14 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
d_clusterMonitor.registerObserver(this);

// Start a recurring clock for summary print
d_clusterData.scheduler()->scheduleRecurringEvent(
d_clusterData.scheduler().scheduleRecurringEvent(
&d_logSummarySchedulerHandle,
bsls::TimeInterval(k_LOG_SUMMARY_INTERVAL),
bdlf::BindUtil::bind(&Cluster::logSummaryState, this));

// Start a recurring clock for gc'ing expired queues.

d_clusterData.scheduler()->scheduleRecurringEvent(
d_clusterData.scheduler().scheduleRecurringEvent(
&d_queueGcSchedulerHandle,
bsls::TimeInterval(k_QUEUE_GC_INTERVAL),
bdlf::BindUtil::bind(&Cluster::gcExpiredQueues, this));
Expand Down Expand Up @@ -314,9 +314,8 @@ void Cluster::stopDispatched()

// Cancel recurring events.

d_clusterData.scheduler()->cancelEventAndWait(&d_queueGcSchedulerHandle);
d_clusterData.scheduler()->cancelEventAndWait(
&d_logSummarySchedulerHandle);
d_clusterData.scheduler().cancelEventAndWait(&d_queueGcSchedulerHandle);
d_clusterData.scheduler().cancelEventAndWait(&d_logSummarySchedulerHandle);
// NOTE: The scheduler event does a dispatching to execute 'logSummary'
// from the scheduler thread to the dispatcher thread, but there is
// no race issue here because stop does a double synchronize, so it's
Expand All @@ -335,7 +334,7 @@ void Cluster::stopDispatched()
d_state.unregisterObserver(this);
d_clusterData.electorInfo().unregisterObserver(this);

d_clusterData.scheduler()->cancelEventAndWait(
d_clusterData.scheduler().cancelEventAndWait(
d_clusterData.electorInfo().leaderSyncEventHandle());
// Ignore rc

Expand All @@ -347,7 +346,7 @@ void Cluster::stopDispatched()

d_clusterOrchestrator.stop();

d_clusterData.miscWorkThreadPool()->stop();
d_clusterData.miscWorkThreadPool().stop();

// Notify peers before going down. This should be the last message sent
// out.
Expand Down Expand Up @@ -646,7 +645,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback)

SessionSpVec sessions;
for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp = sessIt.session().lock();
Expand Down Expand Up @@ -1025,7 +1024,7 @@ void Cluster::onPutEvent(const mqbi::DispatcherEvent& event)
BSLS_ASSERT_SAFE(ns);

bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p);
bmqp::PutMessageIterator putIt(d_clusterData.bufferFactory(),
bmqp::PutMessageIterator putIt(&d_clusterData.bufferFactory(),
d_allocator_p);

BSLS_ASSERT_SAFE(rawEvent.isPutEvent());
Expand Down Expand Up @@ -1149,7 +1148,7 @@ void Cluster::onPutEvent(const mqbi::DispatcherEvent& event)

// Retrieve the payload of that message
bsl::shared_ptr<bdlbb::Blob> appDataSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
rc = putIt.loadApplicationData(appDataSp.get());
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc != 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand Down Expand Up @@ -1984,7 +1983,7 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event)
bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isPushEvent());
bdlma::LocalSequentialAllocator<1024> lsa(d_allocator_p);
bmqp::PushMessageIterator pushIt(d_clusterData.bufferFactory(), &lsa);
bmqp::PushMessageIterator pushIt(&d_clusterData.bufferFactory(), &lsa);
rawEvent.loadPushMessageIterator(&pushIt, false);
BSLS_ASSERT_SAFE(pushIt.isValid());

Expand Down Expand Up @@ -2041,12 +2040,12 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event)
bsl::shared_ptr<bdlbb::Blob> optionsSp;
if (atMostOnce) {
// If it's at-most-once delivery, forward the blob too.
appDataSp = d_clusterData.blobSpPool()->getObject();
appDataSp = d_clusterData.blobSpPool().getObject();
rc = pushIt.loadApplicationData(appDataSp.get());
BSLS_ASSERT_SAFE(rc == 0);
}
else if (pushIt.hasOptions()) {
optionsSp = d_clusterData.blobSpPool()->getObject();
optionsSp = d_clusterData.blobSpPool().getObject();
rc = pushIt.loadOptions(optionsSp.get());
BSLS_ASSERT_SAFE(0 == rc);
}
Expand Down Expand Up @@ -3207,7 +3206,7 @@ void Cluster::processEvent(const bmqp::Event& event,
{ \
mqbi::DispatcherEvent* _evt = dispatcher()->getEvent(this); \
bsl::shared_ptr<bdlbb::Blob> _blobSp = \
d_clusterData.blobSpPool()->getObject(); \
d_clusterData.blobSpPool().getObject(); \
*_blobSp = *(event.blob()); \
(*_evt) \
.setType(T) \
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_assert.h>
#include <bsls_cpp11.h>
#include <bsls_atomic.h>
#include <bsls_keyword.h>

namespace BloombergLP {

Expand Down Expand Up @@ -265,7 +266,7 @@ class Cluster : public mqbi::Cluster,
// This flag is used only inside this
// component.

bool d_isStopping;
bsls::AtomicBool d_isStopping;
// Flag to indicate if this cluster is
// stopping. This flag is exposed via
// an accessor.
Expand Down Expand Up @@ -345,10 +346,10 @@ class Cluster : public mqbi::Cluster,

private:
// NOT IMPLEMENTED
Cluster(const Cluster&) BSLS_CPP11_DELETED;
Cluster(const Cluster&) BSLS_KEYWORD_DELETED;

/// Copy constructor and assignment operator are not implemented.
Cluster& operator=(const Cluster&) BSLS_CPP11_DELETED;
Cluster& operator=(const Cluster&) BSLS_KEYWORD_DELETED;

private:
// PRIVATE MANIPULATORS
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ void ClusterOrchestrator::timerCb()

d_clusterData_p->dispatcherClientData().dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::timerCbDispatched, this),
d_clusterData_p->cluster());
&d_clusterData_p->cluster());
}

void ClusterOrchestrator::timerCbDispatched()
Expand All @@ -511,7 +511,7 @@ void ClusterOrchestrator::timerCbDispatched()

// PRECONDITIONS
BSLS_ASSERT_SAFE(
dispatcher()->inDispatcherThread(d_clusterData_p->cluster()));
dispatcher()->inDispatcherThread(&d_clusterData_p->cluster()));

const bsls::Types::Int64 timer = mwcsys::Time::highResolutionTimer();

Expand Down Expand Up @@ -578,7 +578,7 @@ ClusterOrchestrator::ClusterOrchestrator(
, d_stateManager_mp(
clusterConfig.clusterAttributes().isFSMWorkflow()
? static_cast<mqbi::ClusterStateManager*>(
new (*d_allocator_p) mqbc::ClusterStateManager(
new(*d_allocator_p) mqbc::ClusterStateManager(
clusterConfig,
d_cluster_p,
d_clusterData_p,
Expand All @@ -593,11 +593,11 @@ ClusterOrchestrator::ClusterOrchestrator(
// Strong
d_clusterData_p,
clusterState,
d_clusterData_p->bufferFactory())),
&d_clusterData_p->bufferFactory())),
k_WATCHDOG_TIMEOUT_DURATION,
d_allocators.get("ClusterStateManager")))
: static_cast<mqbi::ClusterStateManager*>(
new (*d_allocator_p) ClusterStateManager(
new(*d_allocator_p) ClusterStateManager(
clusterConfig,
d_cluster_p,
d_clusterData_p,
Expand All @@ -612,7 +612,7 @@ ClusterOrchestrator::ClusterOrchestrator(
// Strong
d_clusterData_p,
clusterState,
d_clusterData_p->bufferFactory())),
&d_clusterData_p->bufferFactory())),
d_allocators.get("ClusterStateManager"))),
d_allocator_p)
, d_queueHelper(d_clusterData_p,
Expand Down Expand Up @@ -699,15 +699,15 @@ int ClusterOrchestrator::start(bsl::ostream& errorDescription)
d_elector_mp.load(
new (*d_allocator_p) mqbnet::Elector(
d_clusterConfig.elector(),
d_clusterData_p->cluster(),
&d_clusterData_p->cluster(),
bdlf::BindUtil::bind(&ClusterOrchestrator::onElectorStateChange,
this,
_1, // ElectorState
_2, // ElectorTransitionCode
_3, // LeaderNodeId
_4), // Term
electorTerm,
d_clusterData_p->bufferFactory(),
&d_clusterData_p->bufferFactory(),
d_allocator_p),
d_allocator_p);

Expand All @@ -720,7 +720,7 @@ int ClusterOrchestrator::start(bsl::ostream& errorDescription)
bsls::TimeInterval interval;
interval.setTotalMilliseconds(
d_clusterConfig.queueOperations().consumptionMonitorPeriodMs());
d_clusterData_p->scheduler()->scheduleRecurringEvent(
d_clusterData_p->scheduler().scheduleRecurringEvent(
&d_consumptionMonitorEventHandle,
interval,
bdlf::BindUtil::bind(&ClusterOrchestrator::timerCb, this));
Expand All @@ -740,7 +740,7 @@ void ClusterOrchestrator::stop()
d_isStarted = false;

BSLS_ASSERT_SAFE(d_clusterData_p);
d_clusterData_p->scheduler()->cancelEventAndWait(
d_clusterData_p->scheduler().cancelEventAndWait(
&d_consumptionMonitorEventHandle);

d_stateManager_mp->stop();
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ inline mqbi::Dispatcher* ClusterOrchestrator::dispatcher()
// PRIVATE ACCESSORS
inline bool ClusterOrchestrator::isLocal() const
{
return d_clusterData_p->cluster()->isLocal();
return d_clusterData_p->cluster().isLocal();
}

// MANIPULATORS
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void ClusterProxy::startDispatched()
// and all session to build initial state and then schedule a refresh to
// find active node if there is none after processing all pending events.

d_activeNodeManager.initialize(d_clusterData.transportManager());
d_activeNodeManager.initialize(&d_clusterData.transportManager());

// Ready to read.
d_clusterData.membership().netCluster()->enableRead();
Expand All @@ -150,7 +150,7 @@ void ClusterProxy::startDispatched()
bsls::TimeInterval interval = mwcsys::Time::nowMonotonicClock() +
bsls::TimeInterval(
k_ACTIVE_NODE_INITIAL_WAIT);
d_clusterData.scheduler()->scheduleEvent(
d_clusterData.scheduler().scheduleEvent(
&d_activeNodeLookupEventHandle,
interval,
bdlf::BindUtil::bind(&ClusterProxy::onActiveNodeLookupTimerExpired,
Expand Down Expand Up @@ -178,7 +178,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
clusterProxyConfig()->queueOperations().shutdownTimeoutMs());

for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp = sessIt.session().lock();
Expand Down Expand Up @@ -242,7 +242,7 @@ void ClusterProxy::stopDispatched()

// Cancel scheduler event
if (d_activeNodeLookupEventHandle) {
d_clusterData.scheduler()->cancelEventAndWait(
d_clusterData.scheduler().cancelEventAndWait(
&d_activeNodeLookupEventHandle);
}

Expand Down Expand Up @@ -336,7 +336,7 @@ void ClusterProxy::processActiveNodeManagerResult(
if (result & mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE) {
// Cancel the scheduler event, if any.
if (d_activeNodeLookupEventHandle) {
d_clusterData.scheduler()->cancelEvent(
d_clusterData.scheduler().cancelEvent(
&d_activeNodeLookupEventHandle);
d_activeNodeManager.enableExtendedSelection();
}
Expand Down Expand Up @@ -435,7 +435,7 @@ void ClusterProxy::onPushEvent(const mqbi::DispatcherPushEvent& event)

bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
bdlma::LocalSequentialAllocator<1024> lsa(d_allocator_p);
bmqp::PushMessageIterator iter(d_clusterData.bufferFactory(), &lsa);
bmqp::PushMessageIterator iter(&d_clusterData.bufferFactory(), &lsa);
rawEvent.loadPushMessageIterator(&iter, false);

BSLS_ASSERT_SAFE(iter.isValid());
Expand All @@ -450,13 +450,13 @@ void ClusterProxy::onPushEvent(const mqbi::DispatcherPushEvent& event)
}

bsl::shared_ptr<bdlbb::Blob> appDataSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
rc = iter.loadApplicationData(appDataSp.get());
BSLS_ASSERT_SAFE(rc == 0);

bsl::shared_ptr<bdlbb::Blob> optionsSp;
if (iter.hasOptions()) {
optionsSp = d_clusterData.blobSpPool()->getObject();
optionsSp = d_clusterData.blobSpPool().getObject();
rc = iter.loadOptions(optionsSp.get());
BSLS_ASSERT_SAFE(0 == rc);
}
Expand Down Expand Up @@ -717,7 +717,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
case bmqp::EventType::e_PUSH: {
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_PUSH)
Expand All @@ -728,7 +728,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
case bmqp::EventType::e_ACK: {
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_ACK)
Expand Down
Loading
Loading