From d5cbcc6f8bfbd4d67546dd979753d7f9c0f8edf1 Mon Sep 17 00:00:00 2001 From: Vitaly Dzhitenov Date: Wed, 20 Dec 2023 11:36:03 -0500 Subject: [PATCH] fix: mix of strong and weak consistencies Signed-off-by: Vitaly Dzhitenov --- .../mqb/mqbblp/mqbblp_storagemanager.cpp | 25 +++++++++++++++---- src/groups/mqb/mqbs/mqbs_datastore.h | 8 ++++++ src/groups/mqb/mqbs/mqbs_filestore.cpp | 13 ++++------ src/groups/mqb/mqbs/mqbs_filestore.h | 15 ++++++++++- 4 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp index 3d345dc0c5..63c98f2ccc 100644 --- a/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp +++ b/src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp @@ -964,6 +964,9 @@ void StorageManager::recoveredQueuesCbImpl( mqbs::FileStoreIterator fsIt(fs); DataStoreRecordHandles recordsToPurge; + bsls::Types::Uint64 lastStrongConsistencySequenceNum = 0; + unsigned int lastStrongConsistencyPrimaryLeaseId = 0; + while (fsIt.next()) { mqbu::StorageKey appKey; mqbu::StorageKey queueKey; @@ -1002,6 +1005,9 @@ void StorageManager::recoveredQueuesCbImpl( QueueKeyStorageMapIterator storageMapIt = queueKeyStorageMap.find( queueKey); + + const mqbs::DataStoreRecordHandle& handle = fsIt.handle(); + // If queue is either not recovered or belongs to an unrecognized // domain. if (storageMapIt == queueKeyStorageMap.end()) { @@ -1020,7 +1026,7 @@ void StorageManager::recoveredQueuesCbImpl( domIt->second.find(uri); BSLS_ASSERT_SAFE(countMapIt != domIt->second.end()); ++countMapIt->second; - recordsToPurge.push_back(fsIt.handle()); + recordsToPurge.push_back(handle); continue; // CONTINUE } @@ -1037,6 +1043,8 @@ void StorageManager::recoveredQueuesCbImpl( mqbs::ReplicatedStorage* rs = storageMapIt->second; BSLS_ASSERT_SAFE(rs); + const bool isStrongConsistency = !rs->hasReceipt(bmqt::MessageGUID()); + if (mqbs::RecordType::e_QUEUE_OP != fsIt.type()) { // It's one of MESSAGE/CONFIRM/DELETION records, which means it // must be a file-backed storage. @@ -1060,7 +1068,7 @@ void StorageManager::recoveredQueuesCbImpl( BSLS_ASSERT_SAFE(guid.isUnset()); BSLS_ASSERT_SAFE(mqbs::QueueOpType::e_UNDEFINED != queueOpType); - rs->addQueueOpRecordHandle(fsIt.handle()); + rs->addQueueOpRecordHandle(handle); if (mqbs::QueueOpType::e_PURGE == queueOpType && !appKey.isNull()) { @@ -1087,9 +1095,13 @@ void StorageManager::recoveredQueuesCbImpl( else if (mqbs::RecordType::e_MESSAGE == fsIt.type()) { BSLS_ASSERT_SAFE(false == guid.isUnset()); rs->processMessageRecord(guid, - fs->getMessageLenRaw(fsIt.handle()), + fs->getMessageLenRaw(handle), refCount, - fsIt.handle()); + handle); + if (isStrongConsistency) { + lastStrongConsistencySequenceNum = handle.sequenceNum(); + lastStrongConsistencyPrimaryLeaseId = handle.primaryLeaseId(); + } } else if (mqbs::RecordType::e_CONFIRM == fsIt.type()) { BSLS_ASSERT_SAFE(false == guid.isUnset()); @@ -1111,7 +1123,7 @@ void StorageManager::recoveredQueuesCbImpl( << "]. Dropping this record." << MWCTSK_ALARMLOG_END; continue; // CONTINUE } - rs->processConfirmRecord(guid, appKey, fsIt.handle()); + rs->processConfirmRecord(guid, appKey, handle); } else { BSLS_ASSERT(false); @@ -1124,6 +1136,9 @@ void StorageManager::recoveredQueuesCbImpl( ++it) { fs->removeRecordRaw(*it); } + + fs->setLastStrongConsistency(lastStrongConsistencyPrimaryLeaseId, + lastStrongConsistencySequenceNum); } void StorageManager::recoveredQueuesCb(int partitionId, diff --git a/src/groups/mqb/mqbs/mqbs_datastore.h b/src/groups/mqb/mqbs/mqbs_datastore.h index 4118e0ef0e..013fbc9f4f 100644 --- a/src/groups/mqb/mqbs/mqbs_datastore.h +++ b/src/groups/mqb/mqbs/mqbs_datastore.h @@ -516,6 +516,8 @@ class DataStoreRecordHandle { /// Return the Primary LeaseId which created the record. unsigned int primaryLeaseId() const; + + bsls::Types::Uint64 sequenceNum() const; }; // FREE OPERATORS @@ -1164,6 +1166,12 @@ inline unsigned int DataStoreRecordHandle::primaryLeaseId() const return d_iterator->first.d_primaryLeaseId; } +inline bsls::Types::Uint64 DataStoreRecordHandle::sequenceNum() const +{ + BSLS_ASSERT_SAFE(isValid()); + return d_iterator->first.d_sequenceNum; +} + } // close package namespace // ------------------------- diff --git a/src/groups/mqb/mqbs/mqbs_filestore.cpp b/src/groups/mqb/mqbs/mqbs_filestore.cpp index 73773c928e..679a6c8a06 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.cpp +++ b/src/groups/mqb/mqbs/mqbs_filestore.cpp @@ -2498,11 +2498,6 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap, record.d_messagePropertiesInfo = bmqp::MessagePropertiesInfo( *dataHeader); - if (d_lastRecoveredMessage < key) { - // This will be used as Implicit Receipt - d_lastRecoveredMessage = key; - } - // Update in-memory record mapping. d_records.rinsert(bsl::make_pair(key, record)); @@ -5130,6 +5125,8 @@ FileStore::FileStore(const DataStoreConfig& config, , d_unreceipted(d_allocators.get("UnreceiptedRecords")) , d_replicationFactor(replicationFactor) , d_nodes(allocator) +, d_lastStrongConsistencySequenceNum(0) +, d_lastStrongConsistencyPrimaryLeaseId(0) , d_fileSets(allocator) , d_cluster_p(cluster) , d_miscWorkThreadPool_p(miscWorkThreadPool) @@ -6528,14 +6525,14 @@ void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode, d_config.scheduler()->cancelEvent( &d_partitionHighwatermarkEventHandle); - if (d_lastRecoveredMessage.d_primaryLeaseId == d_primaryLeaseId) { + if (d_lastStrongConsistencyPrimaryLeaseId == d_primaryLeaseId) { BALL_LOG_INFO << partitionDesc() << "Issuing Implicit Receipt [" << "primaryLeaseId = " << d_primaryLeaseId << ", d_sequenceNum = " << d_sequenceNum << "]."; issueReceipt(primaryNode, - d_lastRecoveredMessage.d_primaryLeaseId, - d_lastRecoveredMessage.d_sequenceNum); + d_lastStrongConsistencyPrimaryLeaseId, + d_lastStrongConsistencySequenceNum); } return; // RETURN } diff --git a/src/groups/mqb/mqbs/mqbs_filestore.h b/src/groups/mqb/mqbs/mqbs_filestore.h index aa76df5824..5bade3043f 100644 --- a/src/groups/mqb/mqbs/mqbs_filestore.h +++ b/src/groups/mqb/mqbs/mqbs_filestore.h @@ -329,7 +329,8 @@ class FileStore : public DataStore { NodeReceiptContexts d_nodes; - DataStoreRecordKey d_lastRecoveredMessage; + bsls::Types::Uint64 d_lastStrongConsistencySequenceNum; + unsigned int d_lastStrongConsistencyPrimaryLeaseId; FileSets d_fileSets; // List of file sets. File set at @@ -898,6 +899,10 @@ class FileStore : public DataStore { /// set this to true during testing. void setIgnoreCrc32c(bool value); + // This will be used as Implicit Receipt + void setLastStrongConsistency(unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNum); + /// Load into the specified `storages` the list of queue storages for /// which all filters from the specified `filters` are returning true. void getStorages(StorageList* storages, @@ -1174,6 +1179,14 @@ inline void FileStore::setIgnoreCrc32c(bool value) d_ignoreCrc32c = value; } +inline void +FileStore::setLastStrongConsistency(unsigned int primaryLeaseId, + bsls::Types::Uint64 sequenceNum) +{ + d_lastStrongConsistencyPrimaryLeaseId = primaryLeaseId; + d_lastStrongConsistencySequenceNum = sequenceNum; +} + // ACCESSORS inline const mqbi::DispatcherClientData& FileStore::dispatcherClientData() const