Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into sanitizers-in-docker
Browse files Browse the repository at this point in the history
  • Loading branch information
alexander-e1off committed Jan 29, 2025
2 parents 891bf9e + c9ee151 commit c5b582e
Show file tree
Hide file tree
Showing 13 changed files with 640 additions and 59 deletions.
11 changes: 10 additions & 1 deletion src/groups/mqb/mqba/mqba_application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -832,13 +832,22 @@ int Application::processCommand(const bslstl::StringRef& source,

mqbcmd::InternalResult cmdResult;

// TODO This boolean is added to address failure during in-progress broker
// rollout from v92.8 -> v93.7. If 93.7 broker tries to re-route an admin
// command to a 92.8 broker, 92.8 broker will fail to handle. Therefore, we
// turn off re-routing until all nodes have been upgraded to 93.7.
//
// Also, once reroute is re-enabled, revert the integration test changes
// introduced in https://github.com/bloomberg/blazingmq/pull/586.
const bool neverReroute = true;

// Note that routed commands should never route again to another node.
// This should always be the "end of the road" for a command.
// Note that this logic is important to prevent a "deadlock" scenario
// where two nodes are waiting on a response from each other to continue.
// Currently commands from reroutes are executed on their own dedicated
// thread.
if (fromReroute) {
if (fromReroute || neverReroute) {
if (0 != executeCommand(command, &cmdResult)) {
// early exit (caused by "dangerous" command)
return rc_EARLY_EXIT; // RETURN
Expand Down
66 changes: 64 additions & 2 deletions src/groups/mqb/mqbc/mqbc_clusterstate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,67 @@ bsl::ostream& ClusterStateQueueInfo::print(bsl::ostream& stream,
return stream;
}

bsl::ostream&
ClusterStateQueueInfo::State::print(bsl::ostream& stream,
ClusterStateQueueInfo::State::Enum value,
int level,
int spacesPerLevel)
{
if (stream.bad()) {
return stream; // RETURN
}

bdlb::Print::indent(stream, level, spacesPerLevel);
stream << ClusterStateQueueInfo::State::toAscii(value);

if (spacesPerLevel >= 0) {
stream << '\n';
}

return stream;
}

const char*
ClusterStateQueueInfo::State::toAscii(ClusterStateQueueInfo::State::Enum value)
{
#define CASE(X) \
case k_##X: return #X;

switch (value) {
CASE(NONE)
CASE(ASSIGNING)
CASE(ASSIGNED)
CASE(UNASSIGNING)
default: return "(* NONE *)";
}

#undef CASE
}

bool ClusterStateQueueInfo::State::fromAscii(
ClusterStateQueueInfo::State::Enum* out,
const bslstl::StringRef& str)
{
#define CHECKVALUE(M) \
if (bdlb::String::areEqualCaseless( \
toAscii(ClusterStateQueueInfo::State::k_##M), \
str.data(), \
static_cast<int>(str.length()))) { \
*out = ClusterStateQueueInfo::State::k_##M; \
return true; \
}

CHECKVALUE(NONE)
CHECKVALUE(ASSIGNING)
CHECKVALUE(ASSIGNED)
CHECKVALUE(UNASSIGNING)

// Invalid string
return false;

#undef CHECKVALUE
}

// --------------------------
// class ClusterStateObserver
// --------------------------
Expand Down Expand Up @@ -352,7 +413,8 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri,
queueIt = domIt->second->queuesInfo().emplace(uri, queueInfo).first;
}
else {
if (queueIt->second->state() == ClusterStateQueueInfo::k_ASSIGNED) {
if (queueIt->second->state() ==
ClusterStateQueueInfo::State::k_ASSIGNED) {
// See 'ClusterStateManager::processQueueAssignmentAdvisory' which
// insists on re-assigning
isNewAssignment = false;
Expand All @@ -364,7 +426,7 @@ bool ClusterState::assignQueue(const bmqt::Uri& uri,
}

// Set the queue as assigned
queueIt->second->setState(ClusterStateQueueInfo::k_ASSIGNED);
queueIt->second->setState(ClusterStateQueueInfo::State::k_ASSIGNED);

updatePartitionQueueMapped(partitionId, 1);

Expand Down
100 changes: 77 additions & 23 deletions src/groups/mqb/mqbc/mqbc_clusterstate.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,16 +162,55 @@ class ClusterStateQueueInfo {
typedef mqbi::ClusterStateManager::AppInfos AppInfos;
typedef mqbi::ClusterStateManager::AppInfosCIter AppInfosCIter;

enum State {
// State of Assignment. In CSL, assignment and unassignment are async,
// hence the need for k_ASSIGNING/k_UNASSIGNING
// Assigning following unassigning is also supported.
// On Replica, the only possible state is k_ASSIGNED.

k_NONE = 0,
k_ASSIGNING = -1,
k_ASSIGNED = -2,
k_UNASSIGNING = -3
struct State {
public:
enum Enum {
// State of Assignment. In CSL, assignment and unassignment are
// async,
// hence the need for k_ASSIGNING/k_UNASSIGNING
// Assigning following unassigning is also supported.
// On Replica, the only possible state is k_ASSIGNED.

k_NONE = 0,
k_ASSIGNING = -1,
k_ASSIGNED = -2,
k_UNASSIGNING = -3
};

/// Write the string representation of the specified enumeration
/// `value`
/// to the specified output `stream`, and return a reference to
/// `stream`. Optionally specify an initial indentation `level`, whose
/// absolute value is incremented recursively for nested objects. If
/// `level` is specified, optionally specify `spacesPerLevel`, whose
/// absolute value indicates the number of spaces per indentation level
/// for this and all of its nested objects. If `level` is negative,
/// suppress indentation of the first line. If `spacesPerLevel` is
/// negative, format the entire output on one line, suppressing all but
/// the initial indentation (as governed by `level`). See `toAscii`
/// for what constitutes the string representation of a
/// `ClusterStateQueueInfo::State` value.
static bsl::ostream& print(bsl::ostream& stream,
ClusterStateQueueInfo::State::Enum value,
int level = 0,
int spacesPerLevel = 4);

/// Return the non-modifiable string representation corresponding to
/// the specified enumeration `value`, if it exists, and a unique
/// (error) string otherwise. The string representation of `value`
/// matches its corresponding enumerator name with the `e_` prefix
/// elided. Note that specifying a `value` that does not match any of
/// the enumerators will result in a string representation that is
/// distinct from any of those corresponding to the enumerators, but is
/// otherwise unspecified.
static const char* toAscii(ClusterStateQueueInfo::State::Enum value);

/// Return true and fills the specified `out` with the enum value
/// corresponding to the specified `str`, if valid, or return false and
/// leave `out` untouched if `str` doesn't correspond to any value of
/// the enum.
static bool fromAscii(ClusterStateQueueInfo::State::Enum* out,
const bslstl::StringRef& str);
};

private:
Expand All @@ -192,7 +231,7 @@ class ClusterStateQueueInfo {
//
// TBD: Should also be added to mqbconfm::Domain

State d_state;
State::Enum d_state;
// Flag indicating whether this queue is in the process of
// being assigned / unassigned.

Expand Down Expand Up @@ -228,7 +267,7 @@ class ClusterStateQueueInfo {

/// Set the corresponding member to the specified `value` and return a
/// reference offering modifiable access to this object.
void setState(State value);
void setState(State::Enum value);

/// Get a modifiable reference to this object's appIdInfos.
AppInfos& appInfos();
Expand All @@ -245,8 +284,8 @@ class ClusterStateQueueInfo {
const AppInfos& appInfos() const;

/// Return the value of the corresponding member of this object.
State state() const;
bool pendingUnassignment() const;
State::Enum state() const;
bool pendingUnassignment() const;

/// Format this object to the specified output `stream` at the (absolute
/// value of) the optionally specified indentation `level` and return a
Expand All @@ -269,6 +308,11 @@ class ClusterStateQueueInfo {
bsl::ostream& operator<<(bsl::ostream& stream,
const ClusterStateQueueInfo& rhs);

/// Format the specified `value` to the specified output `stream` and return
/// a reference to the modifiable `stream`.
bsl::ostream& operator<<(bsl::ostream& stream,
ClusterStateQueueInfo::State::Enum value);

/// Return `true` if the specified `rhs` object contains the value of the
/// same type as contained in the specified `lhs` object and the value
/// itself is the same in both objects, return false otherwise.
Expand Down Expand Up @@ -805,7 +849,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo(
, d_key()
, d_partitionId(mqbs::DataStore::k_INVALID_PARTITION_ID)
, d_appInfos(allocator)
, d_state(k_NONE)
, d_state(State::k_NONE)
{
// NOTHING
}
Expand All @@ -820,7 +864,7 @@ inline ClusterStateQueueInfo::ClusterStateQueueInfo(
, d_key(key)
, d_partitionId(partitionId)
, d_appInfos(appIdInfos, allocator)
, d_state(k_NONE)
, d_state(State::k_NONE)
{
// NOTHING
}
Expand All @@ -839,7 +883,8 @@ inline ClusterStateQueueInfo& ClusterStateQueueInfo::setPartitionId(int value)
return *this;
}

inline void ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State value)
inline void
ClusterStateQueueInfo::setState(ClusterStateQueueInfo::State::Enum value)
{
// k_NONE
// | |
Expand Down Expand Up @@ -895,14 +940,14 @@ ClusterStateQueueInfo::appInfos() const
return d_appInfos;
}

inline ClusterStateQueueInfo::State ClusterStateQueueInfo::state() const
inline ClusterStateQueueInfo::State::Enum ClusterStateQueueInfo::state() const
{
return d_state;
}

inline bool ClusterStateQueueInfo::pendingUnassignment() const
{
return d_state == k_UNASSIGNING;
return d_state == State::k_UNASSIGNING;
}

// ------------------
Expand Down Expand Up @@ -1099,8 +1144,9 @@ ClusterState::getAssigned(const bmqt::Uri& uri) const
{
ClusterStateQueueInfo* queue = getQueueInfo(uri);

return queue ? queue->state() == ClusterStateQueueInfo::k_ASSIGNED ? queue
: 0
return queue ? queue->state() == ClusterStateQueueInfo::State::k_ASSIGNED
? queue
: 0
: 0;
}

Expand All @@ -1110,8 +1156,9 @@ ClusterState::getAssignedOrUnassigning(const bmqt::Uri& uri) const
ClusterStateQueueInfo* queue = getQueueInfo(uri);

return queue
? queue->state() == ClusterStateQueueInfo::k_ASSIGNED ||
queue->state() == ClusterStateQueueInfo::k_UNASSIGNING
? queue->state() == ClusterStateQueueInfo::State::k_ASSIGNED ||
queue->state() ==
ClusterStateQueueInfo::State::k_UNASSIGNING
? queue
: 0
: 0;
Expand Down Expand Up @@ -1172,6 +1219,13 @@ inline bsl::ostream& mqbc::operator<<(bsl::ostream& stream,
return rhs.print(stream, 0, -1);
}

inline bsl::ostream&
mqbc::operator<<(bsl::ostream& stream,
mqbc::ClusterStateQueueInfo::State::Enum value)
{
return mqbc::ClusterStateQueueInfo::State::print(stream, value, 0, -1);
}

inline bool mqbc::operator==(const ClusterStateQueueInfo& lhs,
const ClusterStateQueueInfo& rhs)
{
Expand Down
Loading

0 comments on commit c5b582e

Please sign in to comment.