From e619d6968ac1fd3816651a710cc16581e1c11636 Mon Sep 17 00:00:00 2001 From: dorjesinpo <129227380+dorjesinpo@users.noreply.github.com> Date: Tue, 26 Dec 2023 15:15:32 -0500 Subject: [PATCH] fix: mix of strong and weak consistencies (#174) * fix: mix of strong and weak consistencies Signed-off-by: Vitaly Dzhitenov * mqbs::ReplicatedStorage::isStrongConsistency() Signed-off-by: Vitaly Dzhitenov --------- Signed-off-by: Vitaly Dzhitenov --- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 25 +++++++++++++++---- src/groups/mqb/mqbs/mqbs_datastore.h | 8 ++++++ src/groups/mqb/mqbs/mqbs_filebackedstorage.h | 7 ++++++ src/groups/mqb/mqbs/mqbs_filestore.cpp | 13 ++++------ src/groups/mqb/mqbs/mqbs_filestore.h | 14 ++++++++++- src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp | 5 ++++ src/groups/mqb/mqbs/mqbs_inmemorystorage.h | 4 ++- src/groups/mqb/mqbs/mqbs_replicatedstorage.h | 3 +++ 8 files changed, 64 insertions(+), 15 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 3d345dc0c5..836bcabc2e 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -964,6 +964,9 @@ void StorageManager::recoveredQueuesCbImpl( mqbs::FileStoreIterator fsIt(fs); DataStoreRecordHandles recordsToPurge; + bsls::Types::Uint64 lastStrongConsistencySequenceNum = 0; + unsigned int lastStrongConsistencyPrimaryLeaseId = 0; + while (fsIt.next()) { mqbu::StorageKey appKey; mqbu::StorageKey queueKey; @@ -1002,6 +1005,9 @@ void StorageManager::recoveredQueuesCbImpl( QueueKeyStorageMapIterator storageMapIt = queueKeyStorageMap.find( queueKey); + + const mqbs::DataStoreRecordHandle& handle = fsIt.handle(); + // If queue is either not recovered or belongs to an unrecognized // domain. if (storageMapIt == queueKeyStorageMap.end()) { @@ -1020,7 +1026,7 @@ void StorageManager::recoveredQueuesCbImpl( domIt->second.find(uri); BSLS_ASSERT_SAFE(countMapIt != domIt->second.end()); ++countMapIt->second; - recordsToPurge.push_back(fsIt.handle()); + recordsToPurge.push_back(handle); continue; // CONTINUE } @@ -1037,6 +1043,8 @@ void StorageManager::recoveredQueuesCbImpl( mqbs::ReplicatedStorage* rs = storageMapIt->second; BSLS_ASSERT_SAFE(rs); + const bool isStrongConsistency = rs->isStrongConsistency(); + if (mqbs::RecordType::e_QUEUE_OP != fsIt.type()) { // It's one of MESSAGE/CONFIRM/DELETION records, which means it // must be a file-backed storage. @@ -1060,7 +1068,7 @@ void StorageManager::recoveredQueuesCbImpl( BSLS_ASSERT_SAFE(guid.isUnset()); BSLS_ASSERT_SAFE(mqbs::QueueOpType::e_UNDEFINED != queueOpType); - rs->addQueueOpRecordHandle(fsIt.handle()); + rs->addQueueOpRecordHandle(handle); if (mqbs::QueueOpType::e_PURGE == queueOpType && !appKey.isNull()) { @@ -1087,9 +1095,13 @@ void StorageManager::recoveredQueuesCbImpl( else if (mqbs::RecordType::e_MESSAGE == fsIt.type()) { BSLS_ASSERT_SAFE(false == guid.isUnset()); rs->processMessageRecord(guid, - fs->getMessageLenRaw(fsIt.handle()), + fs->getMessageLenRaw(handle), refCount, - fsIt.handle()); + handle); + if (isStrongConsistency) { + lastStrongConsistencySequenceNum = handle.sequenceNum(); + lastStrongConsistencyPrimaryLeaseId = handle.primaryLeaseId(); + } } else if (mqbs::RecordType::e_CONFIRM == fsIt.type()) { BSLS_ASSERT_SAFE(false == guid.isUnset()); @@ -1111,7 +1123,7 @@ void StorageManager::recoveredQueuesCbImpl( << "]. Dropping this record." << MWCTSK_ALARMLOG_END; continue; // CONTINUE } - rs->processConfirmRecord(guid, appKey, fsIt.handle()); + rs->processConfirmRecord(guid, appKey, handle); } else { BSLS_ASSERT(false); @@ -1124,6 +1136,9 @@ void StorageManager::recoveredQueuesCbImpl( ++it) { fs->removeRecordRaw(*it); } + + fs->setLastStrongConsistency(lastStrongConsistencyPrimaryLeaseId, + lastStrongConsistencySequenceNum); } void StorageManager::recoveredQueuesCb(int partitionId, diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 4118e0ef0e..013fbc9f4f 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -516,6 +516,8 @@ class DataStoreRecordHandle { /// Return the Primary LeaseId which created the record. unsigned int primaryLeaseId() const; + + bsls::Types::Uint64 sequenceNum() const; }; // FREE OPERATORS @@ -1164,6 +1166,12 @@ inline unsigned int DataStoreRecordHandle::primaryLeaseId() const return d_iterator->first.d_primaryLeaseId; } +inline bsls::Types::Uint64 DataStoreRecordHandle::sequenceNum() const +{ + BSLS_ASSERT_SAFE(isValid()); + return d_iterator->first.d_sequenceNum; +} + } // close package namespace // ------------------------- diff --git a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h index ac4a17fd5e..b543820918 100644 --- a/src/groups/mqb/mqbs/mqbs_filebackedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_filebackedstorage.h @@ -501,6 +501,8 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { virtual const RecordHandles& queueOpRecordHandles() const BSLS_KEYWORD_OVERRIDE; + + virtual bool isStrongConsistency() const BSLS_KEYWORD_OVERRIDE; }; // =============================== @@ -750,6 +752,11 @@ FileBackedStorage::queueOpRecordHandles() const return d_queueOpRecordHandles; } +inline bool FileBackedStorage::isStrongConsistency() const +{ + return !d_hasReceipts; +} + inline int FileBackedStorage::numVirtualStorages() const { return d_virtualStorageCatalog.numVirtualStorages(); diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 73773c928e..be553bbcdc 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -2498,11 +2498,6 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, record.d_messagePropertiesInfo = bmqp::MessagePropertiesInfo( *dataHeader); - if (d_lastRecoveredMessage < key) { - // This will be used as Implicit Receipt - d_lastRecoveredMessage = key; - } - // Update in-memory record mapping. d_records.rinsert(bsl::make_pair(key, record)); @@ -5130,6 +5125,7 @@ FileStore::FileStore(const DataStoreConfig& config, , d_unreceipted(d_allocators.get("UnreceiptedRecords")) , d_replicationFactor(replicationFactor) , d_nodes(allocator) +, d_lastRecoveredStrongConsistency() , d_fileSets(allocator) , d_cluster_p(cluster) , d_miscWorkThreadPool_p(miscWorkThreadPool) @@ -6528,14 +6524,15 @@ void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode, d_config.scheduler()->cancelEvent( &d_partitionHighwatermarkEventHandle); - if (d_lastRecoveredMessage.d_primaryLeaseId == d_primaryLeaseId) { + if (d_lastRecoveredStrongConsistency.d_primaryLeaseId == + d_primaryLeaseId) { BALL_LOG_INFO << partitionDesc() << "Issuing Implicit Receipt [" << "primaryLeaseId = " << d_primaryLeaseId << ", d_sequenceNum = " << d_sequenceNum << "]."; issueReceipt(primaryNode, - d_lastRecoveredMessage.d_primaryLeaseId, - d_lastRecoveredMessage.d_sequenceNum); + d_lastRecoveredStrongConsistency.d_primaryLeaseId, + d_lastRecoveredStrongConsistency.d_sequenceNum); } return; // RETURN } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index aa76df5824..4f126d8fd2 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -329,7 +329,7 @@ class FileStore : public DataStore { NodeReceiptContexts d_nodes; - DataStoreRecordKey d_lastRecoveredMessage; + DataStoreRecordKey d_lastRecoveredStrongConsistency; FileSets d_fileSets; // List of file sets. File set at @@ -898,6 +898,10 @@ class FileStore : public DataStore { /// set this to true during testing. void setIgnoreCrc32c(bool value); + // This will be used as Implicit Receipt + void setLastStrongConsistency(unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNum); + /// Load into the specified `storages` the list of queue storages for /// which all filters from the specified `filters` are returning true. void getStorages(StorageList* storages, @@ -1174,6 +1178,14 @@ inline void FileStore::setIgnoreCrc32c(bool value) d_ignoreCrc32c = value; } +inline void +FileStore::setLastStrongConsistency(unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNum) +{ + d_lastRecoveredStrongConsistency.d_primaryLeaseId = primaryLeaseId; + d_lastRecoveredStrongConsistency.d_sequenceNum = sequenceNum; +} + // ACCESSORS inline const mqbi::DispatcherClientData& FileStore::dispatcherClientData() const diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp index 17a2d58b37..ec69dfd78c 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp @@ -584,6 +584,11 @@ InMemoryStorage::queueOpRecordHandles() const return d_queueOpRecordHandles; } +bool InMemoryStorage::isStrongConsistency() const +{ + return false; +} + // ----------------------------- // class InMemoryStorageIterator // ----------------------------- diff --git a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h index 2e170cd61d..60c9e37016 100644 --- a/src/groups/mqb/mqbs/mqbs_inmemorystorage.h +++ b/src/groups/mqb/mqbs/mqbs_inmemorystorage.h @@ -517,6 +517,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage { virtual const RecordHandles& queueOpRecordHandles() const BSLS_KEYWORD_OVERRIDE; + + virtual bool isStrongConsistency() const BSLS_KEYWORD_OVERRIDE; }; // ============================= @@ -807,7 +809,7 @@ inline bool InMemoryStorage::hasVirtualStorage(const bsl::string& appId, return d_virtualStorageCatalog.hasVirtualStorage(appId, appKey); } -inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const +inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID&) const { return true; } diff --git a/src/groups/mqb/mqbs/mqbs_replicatedstorage.h b/src/groups/mqb/mqbs/mqbs_replicatedstorage.h index 61cdd5e806..d95a94a367 100644 --- a/src/groups/mqb/mqbs/mqbs_replicatedstorage.h +++ b/src/groups/mqb/mqbs/mqbs_replicatedstorage.h @@ -100,6 +100,9 @@ class ReplicatedStorage : public mqbi::Storage { /// Return a non-modifiable list of handles of all QUEUEOP records /// associated with this storage. virtual const RecordHandles& queueOpRecordHandles() const = 0; + + // Return 'true' if the storage is of the strong consistency + virtual bool isStrongConsistency() const = 0; }; // ============================================================================