Skip to content

Commit

Permalink
mqbc::StorageMgr, RecoveryMgr: Thread safety improvements (bloomberg#367
Browse files Browse the repository at this point in the history
)

* mqbs:FileStore: Improve rc logging

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageManager.t: Remove the concept of replica healing stages

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbmock::StorageManager [new]

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::ClusterData: Clean up manipulators and accessors

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::RecoveryMgr: Thread safety improvements

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* Fix mqbc::StorageMgr: Initialize queue key info map in cluster thread

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr.t: Set partition primary also in cluster state

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::StorageMgr: Thread safety improvements

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* Apply clang-format

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* PR#367: Address feedback

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::ClusterData: Return reference not pointer for non-nullables

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc: Make copy constructor and copy assignment private

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* blp::StorageManager: Improve assert logging

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbmock::StorageMgr: Fix compilation errors

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

* mqbc::RecoveryManager: Apply clang format

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>

---------

Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu authored and alexander-e1off committed Oct 24, 2024
1 parent 046a4da commit 0d5f94d
Show file tree
Hide file tree
Showing 48 changed files with 1,987 additions and 961 deletions.
33 changes: 16 additions & 17 deletions src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
// Start the StorageManager
d_storageManager_mp.load(
isFSMWorkflow()
? static_cast<mqbc::StorageManager*>(
? static_cast<mqbi::StorageManager*>(
new (*storageManagerAllocator) mqbc::StorageManager(
d_clusterData.clusterConfig(),
this,
Expand Down Expand Up @@ -219,12 +219,12 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
bdlf::PlaceHolders::_3), // primary leaseId
d_clusterData.domainFactory(),
dispatcher(),
d_clusterData.miscWorkThreadPool(),
&d_clusterData.miscWorkThreadPool(),
storageManagerAllocator)),
storageManagerAllocator);

// Start the misc work thread pool
*rc = d_clusterData.miscWorkThreadPool()->start();
*rc = d_clusterData.miscWorkThreadPool().start();
if (*rc != 0) {
d_clusterOrchestrator.stop();
*rc = *rc * 10 + rc_MISC_FAILURE;
Expand Down Expand Up @@ -278,14 +278,14 @@ void Cluster::startDispatched(bsl::ostream* errorDescription, int* rc)
d_clusterMonitor.registerObserver(this);

// Start a recurring clock for summary print
d_clusterData.scheduler()->scheduleRecurringEvent(
d_clusterData.scheduler().scheduleRecurringEvent(
&d_logSummarySchedulerHandle,
bsls::TimeInterval(k_LOG_SUMMARY_INTERVAL),
bdlf::BindUtil::bind(&Cluster::logSummaryState, this));

// Start a recurring clock for gc'ing expired queues.

d_clusterData.scheduler()->scheduleRecurringEvent(
d_clusterData.scheduler().scheduleRecurringEvent(
&d_queueGcSchedulerHandle,
bsls::TimeInterval(k_QUEUE_GC_INTERVAL),
bdlf::BindUtil::bind(&Cluster::gcExpiredQueues, this));
Expand Down Expand Up @@ -314,9 +314,8 @@ void Cluster::stopDispatched()

// Cancel recurring events.

d_clusterData.scheduler()->cancelEventAndWait(&d_queueGcSchedulerHandle);
d_clusterData.scheduler()->cancelEventAndWait(
&d_logSummarySchedulerHandle);
d_clusterData.scheduler().cancelEventAndWait(&d_queueGcSchedulerHandle);
d_clusterData.scheduler().cancelEventAndWait(&d_logSummarySchedulerHandle);
// NOTE: The scheduler event does a dispatching to execute 'logSummary'
// from the scheduler thread to the dispatcher thread, but there is
// no race issue here because stop does a double synchronize, so it's
Expand All @@ -335,7 +334,7 @@ void Cluster::stopDispatched()
d_state.unregisterObserver(this);
d_clusterData.electorInfo().unregisterObserver(this);

d_clusterData.scheduler()->cancelEventAndWait(
d_clusterData.scheduler().cancelEventAndWait(
d_clusterData.electorInfo().leaderSyncEventHandle());
// Ignore rc

Expand All @@ -347,7 +346,7 @@ void Cluster::stopDispatched()

d_clusterOrchestrator.stop();

d_clusterData.miscWorkThreadPool()->stop();
d_clusterData.miscWorkThreadPool().stop();

// Notify peers before going down. This should be the last message sent
// out.
Expand Down Expand Up @@ -646,7 +645,7 @@ void Cluster::initiateShutdownDispatched(const VoidFunctor& callback)

SessionSpVec sessions;
for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp = sessIt.session().lock();
Expand Down Expand Up @@ -1025,7 +1024,7 @@ void Cluster::onPutEvent(const mqbi::DispatcherEvent& event)
BSLS_ASSERT_SAFE(ns);

bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p);
bmqp::PutMessageIterator putIt(d_clusterData.bufferFactory(),
bmqp::PutMessageIterator putIt(&d_clusterData.bufferFactory(),
d_allocator_p);

BSLS_ASSERT_SAFE(rawEvent.isPutEvent());
Expand Down Expand Up @@ -1149,7 +1148,7 @@ void Cluster::onPutEvent(const mqbi::DispatcherEvent& event)

// Retrieve the payload of that message
bsl::shared_ptr<bdlbb::Blob> appDataSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
rc = putIt.loadApplicationData(appDataSp.get());
if (BSLS_PERFORMANCEHINT_PREDICT_UNLIKELY(rc != 0)) {
BSLS_PERFORMANCEHINT_UNLIKELY_HINT;
Expand Down Expand Up @@ -1984,7 +1983,7 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event)
bmqp::Event rawEvent(realEvent->blob().get(), d_allocator_p);
BSLS_ASSERT_SAFE(rawEvent.isPushEvent());
bdlma::LocalSequentialAllocator<1024> lsa(d_allocator_p);
bmqp::PushMessageIterator pushIt(d_clusterData.bufferFactory(), &lsa);
bmqp::PushMessageIterator pushIt(&d_clusterData.bufferFactory(), &lsa);
rawEvent.loadPushMessageIterator(&pushIt, false);
BSLS_ASSERT_SAFE(pushIt.isValid());

Expand Down Expand Up @@ -2041,12 +2040,12 @@ void Cluster::onRelayPushEvent(const mqbi::DispatcherEvent& event)
bsl::shared_ptr<bdlbb::Blob> optionsSp;
if (atMostOnce) {
// If it's at-most-once delivery, forward the blob too.
appDataSp = d_clusterData.blobSpPool()->getObject();
appDataSp = d_clusterData.blobSpPool().getObject();
rc = pushIt.loadApplicationData(appDataSp.get());
BSLS_ASSERT_SAFE(rc == 0);
}
else if (pushIt.hasOptions()) {
optionsSp = d_clusterData.blobSpPool()->getObject();
optionsSp = d_clusterData.blobSpPool().getObject();
rc = pushIt.loadOptions(optionsSp.get());
BSLS_ASSERT_SAFE(0 == rc);
}
Expand Down Expand Up @@ -3207,7 +3206,7 @@ void Cluster::processEvent(const bmqp::Event& event,
{ \
mqbi::DispatcherEvent* _evt = dispatcher()->getEvent(this); \
bsl::shared_ptr<bdlbb::Blob> _blobSp = \
d_clusterData.blobSpPool()->getObject(); \
d_clusterData.blobSpPool().getObject(); \
*_blobSp = *(event.blob()); \
(*_evt) \
.setType(T) \
Expand Down
9 changes: 5 additions & 4 deletions src/groups/mqb/mqbblp/mqbblp_cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_assert.h>
#include <bsls_cpp11.h>
#include <bsls_atomic.h>
#include <bsls_keyword.h>

namespace BloombergLP {

Expand Down Expand Up @@ -265,7 +266,7 @@ class Cluster : public mqbi::Cluster,
// This flag is used only inside this
// component.

bool d_isStopping;
bsls::AtomicBool d_isStopping;
// Flag to indicate if this cluster is
// stopping. This flag is exposed via
// an accessor.
Expand Down Expand Up @@ -345,10 +346,10 @@ class Cluster : public mqbi::Cluster,

private:
// NOT IMPLEMENTED
Cluster(const Cluster&) BSLS_CPP11_DELETED;
Cluster(const Cluster&) BSLS_KEYWORD_DELETED;

/// Copy constructor and assignment operator are not implemented.
Cluster& operator=(const Cluster&) BSLS_CPP11_DELETED;
Cluster& operator=(const Cluster&) BSLS_KEYWORD_DELETED;

private:
// PRIVATE MANIPULATORS
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ void ClusterOrchestrator::timerCb()

d_clusterData_p->dispatcherClientData().dispatcher()->execute(
bdlf::BindUtil::bind(&ClusterOrchestrator::timerCbDispatched, this),
d_clusterData_p->cluster());
&d_clusterData_p->cluster());
}

void ClusterOrchestrator::timerCbDispatched()
Expand All @@ -511,7 +511,7 @@ void ClusterOrchestrator::timerCbDispatched()

// PRECONDITIONS
BSLS_ASSERT_SAFE(
dispatcher()->inDispatcherThread(d_clusterData_p->cluster()));
dispatcher()->inDispatcherThread(&d_clusterData_p->cluster()));

const bsls::Types::Int64 timer = mwcsys::Time::highResolutionTimer();

Expand Down Expand Up @@ -578,7 +578,7 @@ ClusterOrchestrator::ClusterOrchestrator(
, d_stateManager_mp(
clusterConfig.clusterAttributes().isFSMWorkflow()
? static_cast<mqbi::ClusterStateManager*>(
new (*d_allocator_p) mqbc::ClusterStateManager(
new(*d_allocator_p) mqbc::ClusterStateManager(
clusterConfig,
d_cluster_p,
d_clusterData_p,
Expand All @@ -593,11 +593,11 @@ ClusterOrchestrator::ClusterOrchestrator(
// Strong
d_clusterData_p,
clusterState,
d_clusterData_p->bufferFactory())),
&d_clusterData_p->bufferFactory())),
k_WATCHDOG_TIMEOUT_DURATION,
d_allocators.get("ClusterStateManager")))
: static_cast<mqbi::ClusterStateManager*>(
new (*d_allocator_p) ClusterStateManager(
new(*d_allocator_p) ClusterStateManager(
clusterConfig,
d_cluster_p,
d_clusterData_p,
Expand All @@ -612,7 +612,7 @@ ClusterOrchestrator::ClusterOrchestrator(
// Strong
d_clusterData_p,
clusterState,
d_clusterData_p->bufferFactory())),
&d_clusterData_p->bufferFactory())),
d_allocators.get("ClusterStateManager"))),
d_allocator_p)
, d_queueHelper(d_clusterData_p,
Expand Down Expand Up @@ -699,15 +699,15 @@ int ClusterOrchestrator::start(bsl::ostream& errorDescription)
d_elector_mp.load(
new (*d_allocator_p) mqbnet::Elector(
d_clusterConfig.elector(),
d_clusterData_p->cluster(),
&d_clusterData_p->cluster(),
bdlf::BindUtil::bind(&ClusterOrchestrator::onElectorStateChange,
this,
_1, // ElectorState
_2, // ElectorTransitionCode
_3, // LeaderNodeId
_4), // Term
electorTerm,
d_clusterData_p->bufferFactory(),
&d_clusterData_p->bufferFactory(),
d_allocator_p),
d_allocator_p);

Expand All @@ -720,7 +720,7 @@ int ClusterOrchestrator::start(bsl::ostream& errorDescription)
bsls::TimeInterval interval;
interval.setTotalMilliseconds(
d_clusterConfig.queueOperations().consumptionMonitorPeriodMs());
d_clusterData_p->scheduler()->scheduleRecurringEvent(
d_clusterData_p->scheduler().scheduleRecurringEvent(
&d_consumptionMonitorEventHandle,
interval,
bdlf::BindUtil::bind(&ClusterOrchestrator::timerCb, this));
Expand All @@ -740,7 +740,7 @@ void ClusterOrchestrator::stop()
d_isStarted = false;

BSLS_ASSERT_SAFE(d_clusterData_p);
d_clusterData_p->scheduler()->cancelEventAndWait(
d_clusterData_p->scheduler().cancelEventAndWait(
&d_consumptionMonitorEventHandle);

d_stateManager_mp->stop();
Expand Down
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_clusterorchestrator.h
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ inline mqbi::Dispatcher* ClusterOrchestrator::dispatcher()
// PRIVATE ACCESSORS
inline bool ClusterOrchestrator::isLocal() const
{
return d_clusterData_p->cluster()->isLocal();
return d_clusterData_p->cluster().isLocal();
}

// MANIPULATORS
Expand Down
20 changes: 10 additions & 10 deletions src/groups/mqb/mqbblp/mqbblp_clusterproxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void ClusterProxy::startDispatched()
// and all session to build initial state and then schedule a refresh to
// find active node if there is none after processing all pending events.

d_activeNodeManager.initialize(d_clusterData.transportManager());
d_activeNodeManager.initialize(&d_clusterData.transportManager());

// Ready to read.
d_clusterData.membership().netCluster()->enableRead();
Expand All @@ -150,7 +150,7 @@ void ClusterProxy::startDispatched()
bsls::TimeInterval interval = mwcsys::Time::nowMonotonicClock() +
bsls::TimeInterval(
k_ACTIVE_NODE_INITIAL_WAIT);
d_clusterData.scheduler()->scheduleEvent(
d_clusterData.scheduler().scheduleEvent(
&d_activeNodeLookupEventHandle,
interval,
bdlf::BindUtil::bind(&ClusterProxy::onActiveNodeLookupTimerExpired,
Expand Down Expand Up @@ -178,7 +178,7 @@ void ClusterProxy::initiateShutdownDispatched(const VoidFunctor& callback)
clusterProxyConfig()->queueOperations().shutdownTimeoutMs());

for (mqbnet::TransportManagerIterator sessIt(
d_clusterData.transportManager());
&d_clusterData.transportManager());
sessIt;
++sessIt) {
bsl::shared_ptr<mqbnet::Session> sessionSp = sessIt.session().lock();
Expand Down Expand Up @@ -242,7 +242,7 @@ void ClusterProxy::stopDispatched()

// Cancel scheduler event
if (d_activeNodeLookupEventHandle) {
d_clusterData.scheduler()->cancelEventAndWait(
d_clusterData.scheduler().cancelEventAndWait(
&d_activeNodeLookupEventHandle);
}

Expand Down Expand Up @@ -336,7 +336,7 @@ void ClusterProxy::processActiveNodeManagerResult(
if (result & mqbnet::ClusterActiveNodeManager::e_NEW_ACTIVE) {
// Cancel the scheduler event, if any.
if (d_activeNodeLookupEventHandle) {
d_clusterData.scheduler()->cancelEvent(
d_clusterData.scheduler().cancelEvent(
&d_activeNodeLookupEventHandle);
d_activeNodeManager.enableExtendedSelection();
}
Expand Down Expand Up @@ -435,7 +435,7 @@ void ClusterProxy::onPushEvent(const mqbi::DispatcherPushEvent& event)

bmqp::Event rawEvent(event.blob().get(), d_allocator_p);
bdlma::LocalSequentialAllocator<1024> lsa(d_allocator_p);
bmqp::PushMessageIterator iter(d_clusterData.bufferFactory(), &lsa);
bmqp::PushMessageIterator iter(&d_clusterData.bufferFactory(), &lsa);
rawEvent.loadPushMessageIterator(&iter, false);

BSLS_ASSERT_SAFE(iter.isValid());
Expand All @@ -450,13 +450,13 @@ void ClusterProxy::onPushEvent(const mqbi::DispatcherPushEvent& event)
}

bsl::shared_ptr<bdlbb::Blob> appDataSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
rc = iter.loadApplicationData(appDataSp.get());
BSLS_ASSERT_SAFE(rc == 0);

bsl::shared_ptr<bdlbb::Blob> optionsSp;
if (iter.hasOptions()) {
optionsSp = d_clusterData.blobSpPool()->getObject();
optionsSp = d_clusterData.blobSpPool().getObject();
rc = iter.loadOptions(optionsSp.get());
BSLS_ASSERT_SAFE(0 == rc);
}
Expand Down Expand Up @@ -717,7 +717,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
case bmqp::EventType::e_PUSH: {
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_PUSH)
Expand All @@ -728,7 +728,7 @@ void ClusterProxy::processEvent(const bmqp::Event& event,
case bmqp::EventType::e_ACK: {
mqbi::DispatcherEvent* dispEvent = dispatcher()->getEvent(this);
bsl::shared_ptr<bdlbb::Blob> blobSp =
d_clusterData.blobSpPool()->getObject();
d_clusterData.blobSpPool().getObject();
*blobSp = *(event.blob());
(*dispEvent)
.setType(mqbi::DispatcherEventType::e_ACK)
Expand Down
Loading

0 comments on commit 0d5f94d

Please sign in to comment.