Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix mqbc::IncoreCSL: Rollover fixes and improvements #595

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion src/groups/mqb/mqbblp/mqbblp_cluster.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ namespace mqbblp {
namespace {
const double k_LOG_SUMMARY_INTERVAL = 60.0 * 5; // 5 minutes

const double k_QUEUE_GC_INTERVAL = 60.0; // 1 minutes
const double k_QUEUE_GC_INTERVAL = 60.0; // 1 minute

/// Timeout duration for Partition FSM watchdog -- 5 minutes
const bsls::Types::Int64 k_PARTITION_FSM_WATCHDOG_TIMEOUT_DURATION = 60 * 5;
Expand Down
9 changes: 9 additions & 0 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,15 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri,
// insists on re-assigning
isNewAssignment = false;

// TODO Remove protection, and ensure idempotency on all these
// mqbc::ClusterState modifier functions.
if (queueIt->second->key() == key &&
queueIt->second->partitionId() == partitionId &&
queueIt->second->appInfos() == appIdInfos) {
// If queue info is unchanged, can simply return
return false; // RETURN
}

updatePartitionQueueMapped(queueIt->second->partitionId(), -1);
}
queueIt->second->setKey(key).setPartitionId(partitionId);
Expand Down
18 changes: 8 additions & 10 deletions src/groups/mqb/mqbc/mqbc_clusterstateledger.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,15 +260,6 @@ class ClusterStateLedger : public ElectorInfoObserver {
/// dispatcher thread.
virtual int close() = 0;

// TODO: Declare these methods once the parameter object types have been
// defined in 'mqbc::ClusterState'.
// virtual int apply(const ClusterStateQueueInfo& queueInfo) = 0;
// virtual int apply(const UriToQueueInfoMap& queuesInfo) = 0;
// virtual int apply(const ClusterStatePartitionInfo& partitionInfo) = 0;
// virtual int apply(const PartitionsInfo& partitionsInfo) = 0;
// Apply the specified message to self and replicate if self is leader.
// Notify via 'commitCb' when consistency level has been achieved.

/// Apply the specified `advisory` to self and replicate to followers.
/// Notify via `commitCb` when consistency level has been achieved.
/// Note that *only* a leader node may invoke this routine.
Expand All @@ -282,7 +273,14 @@ class ClusterStateLedger : public ElectorInfoObserver {
virtual int
apply(const bmqp_ctrlmsg::QueueUnassignedAdvisory& advisory) = 0;
virtual int apply(const bmqp_ctrlmsg::QueueUpdateAdvisory& advisory) = 0;
virtual int apply(const bmqp_ctrlmsg::LeaderAdvisory& advisory) = 0;

/// Apply the specified `advisory` to self and replicate to followers.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is ClusterUtil::apply, same name, different meaning. That one gets called on commit, is that correct?

/// Notify via `commitCb` when consistency level has been achieved. Note
/// that *only* a leader node may invoke this routine.
///
/// THREAD: This method can be invoked only in the associated cluster's
/// dispatcher thread.
virtual int apply(const bmqp_ctrlmsg::LeaderAdvisory& advisory) = 0;

/// Apply the advisory contained in the specified `clusterMessage` to
/// self and replicate to followers. Notify via `commitCb` when
Expand Down
122 changes: 16 additions & 106 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1678,11 +1678,12 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription,
bsl::vector<ClusterStatePartitionInfo> incorrectPartitions;
for (size_t pid = 0; pid < state.partitions().size(); ++pid) {
const ClusterStatePartitionInfo& stateInfo = state.partition(pid);
BSLS_ASSERT_SAFE(stateInfo.partitionId() == pid);
BSLS_ASSERT_SAFE(static_cast<size_t>(stateInfo.partitionId()) == pid);

const ClusterStatePartitionInfo& referenceInfo = reference.partition(
pid);
BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid);
BSLS_ASSERT_SAFE(static_cast<size_t>(referenceInfo.partitionId()) ==
pid);
if (stateInfo.primaryLeaseId() != referenceInfo.primaryLeaseId()) {
// Partition information mismatch. Note that we don't compare
// primaryNodeIds here because 'state' is initialized with cluster
Expand Down Expand Up @@ -1723,7 +1724,8 @@ int ClusterUtil::validateState(bsl::ostream& errorDescription,
for (size_t pid = 0; pid < state.partitions().size(); ++pid) {
const ClusterStatePartitionInfo& referenceInfo =
reference.partitions()[pid];
BSLS_ASSERT_SAFE(referenceInfo.partitionId() == pid);
BSLS_ASSERT_SAFE(
static_cast<size_t>(referenceInfo.partitionId()) == pid);
bdlb::Print::newlineAndIndent(out, level + 1);
out << "Partition [" << pid
<< "]: primaryLeaseId: " << referenceInfo.primaryLeaseId()
Expand Down Expand Up @@ -1998,10 +2000,6 @@ int ClusterUtil::load(ClusterState* state,
return rc * 10 + rc_ITERATION_ERROR; // RETURN
}

typedef bsl::unordered_map<bmqp_ctrlmsg::LeaderMessageSequence,
bmqp_ctrlmsg::ClusterMessage>
AdvisoriesMap;
AdvisoriesMap advisories;
do {
BSLS_ASSERT_SAFE(latestIter->isValid());

Expand All @@ -2012,110 +2010,22 @@ int ClusterUtil::load(ClusterState* state,
return rc * 10 + rc_MESSAGE_LOAD_ERROR; // RETURN
}

// Track if advisory, apply if commit
// Apply advisories, whether committed or not. Can ignore commit
// records
typedef bmqp_ctrlmsg::ClusterMessageChoice MsgChoice; // shortcut
switch (clusterMessage.choice().selectionId()) {
case MsgChoice::SELECTION_ID_PARTITION_PRIMARY_ADVISORY: {
const bmqp_ctrlmsg::LeaderMessageSequence& lms =
clusterMessage.choice()
.partitionPrimaryAdvisory()
.sequenceNumber();
bsl::pair<AdvisoriesMap::iterator, bool> insertRc =
advisories.insert(bsl::make_pair(lms, clusterMessage));
if (!insertRc.second) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": When loading from cluster state ledger, "
<< "discovered records with duplicate LSN ["
<< lms << "]. Older record type: "
<< advisories.at(lms).choice().selectionId()
<< "; newer record: " << clusterMessage;
};
} break; // BREAK
case MsgChoice::SELECTION_ID_LEADER_ADVISORY: {
const bmqp_ctrlmsg::LeaderMessageSequence& lms =
clusterMessage.choice().leaderAdvisory().sequenceNumber();
bsl::pair<AdvisoriesMap::iterator, bool> insertRc =
advisories.insert(bsl::make_pair(lms, clusterMessage));
if (!insertRc.second) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": When loading from cluster state ledger, "
<< "discovered records with duplicate LSN ["
<< lms << "]. Older record type: "
<< advisories.at(lms).choice().selectionId()
<< "; newer record type:"
<< latestIter->header().recordType();
};
} break; // BREAK
case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY: {
const bmqp_ctrlmsg::LeaderMessageSequence& lms =
clusterMessage.choice()
.queueAssignmentAdvisory()
.sequenceNumber();
bsl::pair<AdvisoriesMap::iterator, bool> insertRc =
advisories.insert(bsl::make_pair(lms, clusterMessage));
if (!insertRc.second) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": When loading from cluster state ledger, "
<< "discovered records with duplicate LSN ["
<< lms << "]. Older record type: "
<< advisories.at(lms).choice().selectionId()
<< "; newer record: " << clusterMessage;
};
} break; // BREAK
case MsgChoice::SELECTION_ID_QUEUE_UNASSIGNED_ADVISORY: {
const bmqp_ctrlmsg::LeaderMessageSequence& lms =
clusterMessage.choice()
.queueUnassignedAdvisory()
.sequenceNumber();
bsl::pair<AdvisoriesMap::iterator, bool> insertRc =
advisories.insert(bsl::make_pair(lms, clusterMessage));
if (!insertRc.second) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": When loading from cluster state ledger, "
<< "discovered records with duplicate LSN ["
<< lms << "]. Older record type: "
<< advisories.at(lms).choice().selectionId()
<< "; newer record: " << clusterMessage;
};
} break; // BREAK
case MsgChoice::SELECTION_ID_PARTITION_PRIMARY_ADVISORY:
case MsgChoice::SELECTION_ID_LEADER_ADVISORY:
case MsgChoice::SELECTION_ID_QUEUE_ASSIGNMENT_ADVISORY:
case MsgChoice::SELECTION_ID_QUEUE_UNASSIGNED_ADVISORY:
case MsgChoice::SELECTION_ID_QUEUE_UPDATE_ADVISORY: {
const bmqp_ctrlmsg::LeaderMessageSequence& lms =
clusterMessage.choice().queueUpdateAdvisory().sequenceNumber();
bsl::pair<AdvisoriesMap::iterator, bool> insertRc =
advisories.insert(bsl::make_pair(lms, clusterMessage));
if (!insertRc.second) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": When loading from cluster state ledger, "
<< "discovered records with duplicate LSN ["
<< lms << "]. Older record type: "
<< advisories.at(lms).choice().selectionId()
<< "; newer record: " << clusterMessage;
};
} break;
case MsgChoice::SELECTION_ID_LEADER_ADVISORY_COMMIT: {
const bmqp_ctrlmsg::LeaderMessageSequence& lmsCommitted =
clusterMessage.choice()
.leaderAdvisoryCommit()
.sequenceNumberCommitted();

AdvisoriesMap::const_iterator iter = advisories.find(lmsCommitted);
if (iter == advisories.end()) {
BALL_LOG_WARN << clusterData.identity().description()
<< ": Recovered a commit in IncoreCSL for which"
<< " a corresponding advisory was not found: "
<< clusterMessage;
break; // BREAK
}
// Finally, the advisory is applied to the state
const bmqp_ctrlmsg::ClusterMessage& advisory = iter->second;
BALL_LOG_INFO << "#CSL_RECOVERY "
<< clusterData.identity().description()
<< ": Applying a commit recovered from IncoreCSL. "
<< "Commit: "
<< clusterMessage.choice().leaderAdvisoryCommit()
<< ", advisory: " << advisory << ".";
apply(state, advisory, clusterData);
advisories.erase(iter);
<< ": Applying a recovered record from IncoreCSL: "
<< clusterMessage << ".";
apply(state, clusterMessage, clusterData);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs clarification (and comment).
We see an uncommitted advisory and apply it as if it is committed. What is the reasoning? If we are Primary and do_applyCSLSelf, we can assume all advisories are synchronized and that serves as the "commit"?
Is there a case when we apply uncommitted advisory as Replica?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider the case where the leader applies an advisory, receives enough acks, and commits the advisory, but then crashes before the followers have a chance to write the commit. One of the followers becomes the new leader. The new leader and the remaining followers, will see this as an uncommitted advisory; they must carry out the last wish of the previous leader and commit this advisory. That is why upon ClusterUtil::load, we apply the uncommitted advisories knowing that they are about to be committed. Note that the three callers of ClusterUtil::load: validateClusterStateLedger, do_applyCSLSelf, and do_sendFollowerClusterStateResponse all load into a temporary state, and that temporary state is either used to validate the ledger or to construct the snapshot advisory to be applied by the new leader. The new leader's snapshot advisory will soon be committed.

We apply uncommitted advisories as replica in validateClusterStateLedger and do_sendFollowerClusterStateResponse. validateClusterStateLedger is okay because it's just for validation. do_sendFollowerClusterStateResponse is okay because the FollowerClusterStateResponse is sent from the highest LSN follower to the leader to tell the leader what to put in its snapshot advisory.

Will add some comments in the code too to help clarify.

} break; // BREAK
case MsgChoice::SELECTION_ID_LEADER_ADVISORY_COMMIT: {
} break; // BREAK
case MsgChoice::SELECTION_ID_UNDEFINED:
default: {
Expand Down
Loading
Loading