Skip to content

Commit

Permalink
fix: handle pending assignment case
Browse files Browse the repository at this point in the history
Signed-off-by: dorjesinpo <[email protected]>
  • Loading branch information
dorjesinpo committed Nov 15, 2024
1 parent a23ad95 commit f2eb61b
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 42 deletions.
12 changes: 4 additions & 8 deletions src/groups/mqb/mqbblp/mqbblp_clusterqueuehelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,9 +386,8 @@ ClusterQueueHelper::assignQueue(const QueueContextSp& queueContext)
// PRECONDITIONS
BSLS_ASSERT_SAFE(
d_cluster_p->dispatcher()->inDispatcherThread(d_cluster_p));
BSLS_ASSERT_SAFE(!isQueueAssigned(*(queueContext.get())) ||
((d_cluster_p->isCSLModeEnabled() &&
queueContext->d_stateQInfo_sp->pendingUnassignment())));
BSLS_ASSERT_SAFE(!isQueueAssigned(*queueContext) ||
queueContext->d_stateQInfo_sp->pendingUnassignment());

if (d_cluster_p->isRemote()) {
// Assigning a queue in a remote, is simply giving it a new queueId.
Expand Down Expand Up @@ -4546,7 +4545,7 @@ void ClusterQueueHelper::openQueue(
if (!isQueueAssigned(*(queueContextIt->second)) ||
isQueuePendingUnassignment(*(queueContextIt->second))) {
// In CSL, unassignment is async.
// Since QueueUnassignemntAdvisory can contain multipe queues,
// Since QueueUnassignmentAdvisory can contain multiple queues,
// canceling pending Advisory is not an option.
// Instead, initiate new QueueAssignemntAdvisory which must
// take effect after old QueueUnassignemntAdvisory.
Expand Down Expand Up @@ -6145,9 +6144,7 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
<< "], queueKey [" << keyCopy << "] assigned to "
<< "Partition [" << pid << "] as it has expired.";

mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p,
uriCopy,
true);
mqbc::ClusterUtil::setPendingUnassignment(d_clusterState_p, uriCopy);

// Populate 'queueUnassignedAdvisory'
bdlma::LocalSequentialAllocator<1024> localAlloc(d_allocator_p);
Expand All @@ -6163,7 +6160,6 @@ int ClusterQueueHelper::gcExpiredQueues(bool immediate)
uriCopy,
keyCopy,
pid,

*d_clusterState_p);

// Apply 'queueUnassignedAdvisory' to CSL
Expand Down
4 changes: 2 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,8 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri,
updatePartitionQueueMapped(iter->second->partitionId(), -1);
iter->second->setKey(key).setPartitionId(partitionId);
iter->second->appInfos() = appIdInfos;
// TODO: in what scenario 'pendingUnassignment() == true'?
iter->second->setPendingUnassignment(false);

iter->second->setState(ClusterStateQueueInfo::k_ASSIGNED);
}
}

Expand Down
59 changes: 48 additions & 11 deletions src/groups/mqb/mqbc/mqbc_clusterstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,15 @@ class ClusterStateQueueInfo {
typedef mqbi::ClusterStateManager::AppInfos AppInfos;
typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter;

enum State {
// State of Assignment

k_NONE,
k_ASSIGNING,
k_ASSIGNED,
k_UNASSIGNING
};

private:
// DATA
bmqt::Uri d_uri;
Expand All @@ -183,9 +192,9 @@ class ClusterStateQueueInfo {
//
// TBD: Should also be added to mqbconfm::Domain

bool d_pendingUnassignment;
State d_state;
// Flag indicating whether this queue is in the process of
// being unassigned.
// being assigned / unassigned.

private:
// NOT IMPLEMENTED
Expand Down Expand Up @@ -219,7 +228,7 @@ class ClusterStateQueueInfo {

/// Set the corresponding member to the specified `value` and return a
/// reference offering modifiable access to this object.
ClusterStateQueueInfo& setPendingUnassignment(bool value);
bool setState(State value);

/// Get a modifiable reference to this object's appIdInfos.
AppInfos& appInfos();
Expand All @@ -236,7 +245,8 @@ class ClusterStateQueueInfo {
const AppInfos& appInfos() const;

/// Return the value of the corresponding member of this object.
bool pendingUnassignment() const;
State state() const;
bool pendingUnassignment() const;

/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
Expand Down Expand Up @@ -767,7 +777,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo(
, d_key()
, d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID)
, d_appInfos(allocator)
, d_pendingUnassignment(false)
, d_state(k_NONE)
{
// NOTHING
}
Expand All @@ -782,7 +792,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo(
, d_key(key)
, d_partitionId(partitionId)
, d_appInfos(appIdInfos, allocator)
, d_pendingUnassignment(false)
, d_state(k_NONE)
{
// NOTHING
}
Expand All @@ -801,11 +811,33 @@ inline ClusterStateQueueInfo& ClusterStateQueueInfo::setPartitionId(int value)
return *this;
}

inline ClusterStateQueueInfo&
ClusterStateQueueInfo::setPendingUnassignment(bool value)
inline bool ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value)
{
d_pendingUnassignment = value;
return *this;
// k_NONE
// |
// ClusterUtil::assignQueue |
// V
// k_ASSIGNING <---+
// | |
// ClusterState::assignQueue | |
// V |
// k_ASSIGNED |
// | |
// | | ClusterState::assignQueue
// V |
// k_UNASSIGNING

bool result = false;

switch (d_state) {
case k_NONE: result = (value == k_ASSIGNING); break;
case k_ASSIGNING: result = (value == k_ASSIGNED); break;
case k_ASSIGNED: result = (value == k_UNASSIGNING); break;
case k_UNASSIGNING: result = (value == k_ASSIGNING); break;
}

d_state = value;
return result;
}

inline ClusterStateQueueInfo::AppInfos& ClusterStateQueueInfo::appInfos()
Expand Down Expand Up @@ -845,9 +877,14 @@ ClusterStateQueueInfo::appInfos() const
return d_appInfos;
}

inline ClusterStateQueueInfo::State ClusterStateQueueInfo::state() const
{
return d_state;
}

inline bool ClusterStateQueueInfo::pendingUnassignment() const
{
return d_pendingUnassignment;
return d_state == k_UNASSIGNING;
}

// ------------------
Expand Down
36 changes: 17 additions & 19 deletions src/groups/mqb/mqbc/mqbc_clusterutil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,7 @@ void createDomainCb(const bmqp_ctrlmsg::Status& status,
// ------------------

void ClusterUtil::setPendingUnassignment(ClusterState* clusterState,
const bmqt::Uri& uri,
bool pendingUnassignment)
const bmqt::Uri& uri)
{
// PRECONDITIONS
BSLS_ASSERT_SAFE(clusterState);
Expand All @@ -366,7 +365,7 @@ void ClusterUtil::setPendingUnassignment(ClusterState* clusterState,
if (iter != clusterState->domainStates().cend()) {
UriToQueueInfoMapIter qiter = iter->second->queuesInfo().find(uri);
if (qiter != iter->second->queuesInfo().cend()) {
qiter->second->setPendingUnassignment(pendingUnassignment);
qiter->second->setState(ClusterStateQueueInfo::k_UNASSIGNING);
}
}
}
Expand Down Expand Up @@ -751,7 +750,7 @@ void ClusterUtil::processQueueAssignmentRequest(
UriToQueueInfoMapCIter qcit = cit->second->queuesInfo().find(uri);
if (qcit != cit->second->queuesInfo().cend() &&
!(cluster->isCSLModeEnabled() &&
qcit->second->pendingUnassignment())) {
qcit->second->state() == ClusterStateQueueInfo::k_UNASSIGNING)) {
// Queue is already assigned
clusterData->messageTransmitter().sendMessage(response, requester);
return; // RETURN
Expand Down Expand Up @@ -897,6 +896,20 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
allocator);
domIt = clusterState->domainStates().find(uri.qualifiedDomain());
}
else {
// Set the queue as assigning (no longer pending unassignment)

UriToQueueInfoMapCIter qcit = domIt->second->queuesInfo().find(uri);
if (qcit != domIt->second->queuesInfo().cend()) {
if (qcit->second->state() == ClusterStateQueueInfo::k_ASSIGNING) {
BALL_LOG_INFO << cluster->description()
<< "queueAssignment of '" << uri
<< "' is already pending.";
return QueueAssignmentResult::k_ASSIGNMENT_OK;
}
qcit->second->setState(ClusterStateQueueInfo::k_ASSIGNING);
}
}

if (domIt->second->domain() == 0) {
BSLS_ASSERT_SAFE(clusterData->domainFactory());
Expand Down Expand Up @@ -969,21 +982,6 @@ ClusterUtil::assignQueue(ClusterState* clusterState,
}
}

// Set the queue as no longer pending unassignment
const DomainStatesCIter citDomainState = clusterState->domainStates().find(
uri.qualifiedDomain());
if (citDomainState != clusterState->domainStates().cend()) {
UriToQueueInfoMapCIter qcit =
citDomainState->second->queuesInfo().find(uri);
if (qcit != citDomainState->second->queuesInfo().cend()) {
BSLS_ASSERT_SAFE(cluster->isCSLModeEnabled() &&
qcit->second->pendingUnassignment());

// TODO: cancel QUAA, if 'pendingUnassignment() == true'
qcit->second->setPendingUnassignment(false);
}
}

// Populate 'queueAssignmentAdvisory'
bdlma::LocalSequentialAllocator<1024> localAllocator(allocator);
bmqp_ctrlmsg::ControlMessage controlMsg(&localAllocator);
Expand Down
3 changes: 1 addition & 2 deletions src/groups/mqb/mqbc/mqbc_clusterutil.h
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,7 @@ struct ClusterUtil {
/// Set the specified `uri` to have the specified `pendingUnassignment`
/// status in the specified `clusterState`.
static void setPendingUnassignment(ClusterState* clusterState,
const bmqt::Uri& uri,
bool pendingUnassignment);
const bmqt::Uri& uri);

/// Load into the specified `message` the message encoded in the
/// specified `eventBlob` using the specified `allocator`.
Expand Down

0 comments on commit f2eb61b

Please sign in to comment.