Skip to content

Commit

Permalink
fix: mix of strong and weak consistencies
Browse files Browse the repository at this point in the history
Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo committed Dec 20, 2023
1 parent 5f65a2a commit d5cbcc6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 14 deletions.
25 changes: 20 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,9 @@ void StorageManager::recoveredQueuesCbImpl(
mqbs::FileStoreIterator fsIt(fs);
DataStoreRecordHandles recordsToPurge;

bsls::Types::Uint64 lastStrongConsistencySequenceNum = 0;
unsigned int lastStrongConsistencyPrimaryLeaseId = 0;

while (fsIt.next()) {
mqbu::StorageKey appKey;
mqbu::StorageKey queueKey;
Expand Down Expand Up @@ -1002,6 +1005,9 @@ void StorageManager::recoveredQueuesCbImpl(

QueueKeyStorageMapIterator storageMapIt = queueKeyStorageMap.find(
queueKey);

const mqbs::DataStoreRecordHandle& handle = fsIt.handle();

// If queue is either not recovered or belongs to an unrecognized
// domain.
if (storageMapIt == queueKeyStorageMap.end()) {
Expand All @@ -1020,7 +1026,7 @@ void StorageManager::recoveredQueuesCbImpl(
domIt->second.find(uri);
BSLS_ASSERT_SAFE(countMapIt != domIt->second.end());
++countMapIt->second;
recordsToPurge.push_back(fsIt.handle());
recordsToPurge.push_back(handle);
continue; // CONTINUE
}

Expand All @@ -1037,6 +1043,8 @@ void StorageManager::recoveredQueuesCbImpl(
mqbs::ReplicatedStorage* rs = storageMapIt->second;
BSLS_ASSERT_SAFE(rs);

const bool isStrongConsistency = !rs->hasReceipt(bmqt::MessageGUID());

if (mqbs::RecordType::e_QUEUE_OP != fsIt.type()) {
// It's one of MESSAGE/CONFIRM/DELETION records, which means it
// must be a file-backed storage.
Expand All @@ -1060,7 +1068,7 @@ void StorageManager::recoveredQueuesCbImpl(
BSLS_ASSERT_SAFE(guid.isUnset());
BSLS_ASSERT_SAFE(mqbs::QueueOpType::e_UNDEFINED != queueOpType);

rs->addQueueOpRecordHandle(fsIt.handle());
rs->addQueueOpRecordHandle(handle);

if (mqbs::QueueOpType::e_PURGE == queueOpType &&
!appKey.isNull()) {
Expand All @@ -1087,9 +1095,13 @@ void StorageManager::recoveredQueuesCbImpl(
else if (mqbs::RecordType::e_MESSAGE == fsIt.type()) {
BSLS_ASSERT_SAFE(false == guid.isUnset());
rs->processMessageRecord(guid,
fs->getMessageLenRaw(fsIt.handle()),
fs->getMessageLenRaw(handle),
refCount,
fsIt.handle());
handle);
if (isStrongConsistency) {
lastStrongConsistencySequenceNum = handle.sequenceNum();
lastStrongConsistencyPrimaryLeaseId = handle.primaryLeaseId();
}
}
else if (mqbs::RecordType::e_CONFIRM == fsIt.type()) {
BSLS_ASSERT_SAFE(false == guid.isUnset());
Expand All @@ -1111,7 +1123,7 @@ void StorageManager::recoveredQueuesCbImpl(
<< "]. Dropping this record." << MWCTSK_ALARMLOG_END;
continue; // CONTINUE
}
rs->processConfirmRecord(guid, appKey, fsIt.handle());
rs->processConfirmRecord(guid, appKey, handle);
}
else {
BSLS_ASSERT(false);
Expand All @@ -1124,6 +1136,9 @@ void StorageManager::recoveredQueuesCbImpl(
++it) {
fs->removeRecordRaw(*it);
}

fs->setLastStrongConsistency(lastStrongConsistencyPrimaryLeaseId,
lastStrongConsistencySequenceNum);
}

void StorageManager::recoveredQueuesCb(int partitionId,
Expand Down
8 changes: 8 additions & 0 deletions src/groups/mqb/mqbs/mqbs_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ class DataStoreRecordHandle {

/// Return the Primary LeaseId which created the record.
unsigned int primaryLeaseId() const;

bsls::Types::Uint64 sequenceNum() const;
};

// FREE OPERATORS
Expand Down Expand Up @@ -1164,6 +1166,12 @@ inline unsigned int DataStoreRecordHandle::primaryLeaseId() const
return d_iterator->first.d_primaryLeaseId;
}

inline bsls::Types::Uint64 DataStoreRecordHandle::sequenceNum() const
{
BSLS_ASSERT_SAFE(isValid());
return d_iterator->first.d_sequenceNum;
}

} // close package namespace

// -------------------------
Expand Down
13 changes: 5 additions & 8 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2498,11 +2498,6 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap,
record.d_messagePropertiesInfo = bmqp::MessagePropertiesInfo(
*dataHeader);

if (d_lastRecoveredMessage < key) {
// This will be used as Implicit Receipt
d_lastRecoveredMessage = key;
}

// Update in-memory record mapping.
d_records.rinsert(bsl::make_pair(key, record));

Expand Down Expand Up @@ -5130,6 +5125,8 @@ FileStore::FileStore(const DataStoreConfig& config,
, d_unreceipted(d_allocators.get("UnreceiptedRecords"))
, d_replicationFactor(replicationFactor)
, d_nodes(allocator)
, d_lastStrongConsistencySequenceNum(0)
, d_lastStrongConsistencyPrimaryLeaseId(0)
, d_fileSets(allocator)
, d_cluster_p(cluster)
, d_miscWorkThreadPool_p(miscWorkThreadPool)
Expand Down Expand Up @@ -6528,14 +6525,14 @@ void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode,
d_config.scheduler()->cancelEvent(
&d_partitionHighwatermarkEventHandle);

if (d_lastRecoveredMessage.d_primaryLeaseId == d_primaryLeaseId) {
if (d_lastStrongConsistencyPrimaryLeaseId == d_primaryLeaseId) {
BALL_LOG_INFO << partitionDesc() << "Issuing Implicit Receipt ["
<< "primaryLeaseId = " << d_primaryLeaseId
<< ", d_sequenceNum = " << d_sequenceNum << "].";

issueReceipt(primaryNode,
d_lastRecoveredMessage.d_primaryLeaseId,
d_lastRecoveredMessage.d_sequenceNum);
d_lastStrongConsistencyPrimaryLeaseId,
d_lastStrongConsistencySequenceNum);
}
return; // RETURN
}
Expand Down
15 changes: 14 additions & 1 deletion src/groups/mqb/mqbs/mqbs_filestore.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,8 @@ class FileStore : public DataStore {

NodeReceiptContexts d_nodes;

DataStoreRecordKey d_lastRecoveredMessage;
bsls::Types::Uint64 d_lastStrongConsistencySequenceNum;
unsigned int d_lastStrongConsistencyPrimaryLeaseId;

FileSets d_fileSets;
// List of file sets. File set at
Expand Down Expand Up @@ -898,6 +899,10 @@ class FileStore : public DataStore {
/// set this to true during testing.
void setIgnoreCrc32c(bool value);

// This will be used as Implicit Receipt
void setLastStrongConsistency(unsigned int primaryLeaseId,
bsls::Types::Uint64 sequenceNum);

/// Load into the specified `storages` the list of queue storages for
/// which all filters from the specified `filters` are returning true.
void getStorages(StorageList* storages,
Expand Down Expand Up @@ -1174,6 +1179,14 @@ inline void FileStore::setIgnoreCrc32c(bool value)
d_ignoreCrc32c = value;
}

inline void
FileStore::setLastStrongConsistency(unsigned int primaryLeaseId,
bsls::Types::Uint64 sequenceNum)
{
d_lastStrongConsistencyPrimaryLeaseId = primaryLeaseId;
d_lastStrongConsistencySequenceNum = sequenceNum;
}

// ACCESSORS
inline const mqbi::DispatcherClientData&
FileStore::dispatcherClientData() const
Expand Down

0 comments on commit d5cbcc6

Please sign in to comment.