Skip to content

Commit

Permalink
fix: mix of strong and weak consistencies (#174)
Browse files Browse the repository at this point in the history
* fix: mix of strong and weak consistencies

Signed-off-by: Vitaly Dzhitenov <[email protected]>

* mqbs::ReplicatedStorage::isStrongConsistency()

Signed-off-by: Vitaly Dzhitenov <[email protected]>

---------

Signed-off-by: Vitaly Dzhitenov <[email protected]>
  • Loading branch information
dorjesinpo authored Dec 26, 2023
1 parent 5f65a2a commit e619d69
Show file tree
Hide file tree
Showing 8 changed files with 64 additions and 15 deletions.
25 changes: 20 additions & 5 deletions src/groups/mqb/mqbblp/mqbblp_storagemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -964,6 +964,9 @@ void StorageManager::recoveredQueuesCbImpl(
mqbs::FileStoreIterator fsIt(fs);
DataStoreRecordHandles recordsToPurge;

bsls::Types::Uint64 lastStrongConsistencySequenceNum = 0;
unsigned int lastStrongConsistencyPrimaryLeaseId = 0;

while (fsIt.next()) {
mqbu::StorageKey appKey;
mqbu::StorageKey queueKey;
Expand Down Expand Up @@ -1002,6 +1005,9 @@ void StorageManager::recoveredQueuesCbImpl(

QueueKeyStorageMapIterator storageMapIt = queueKeyStorageMap.find(
queueKey);

const mqbs::DataStoreRecordHandle& handle = fsIt.handle();

// If queue is either not recovered or belongs to an unrecognized
// domain.
if (storageMapIt == queueKeyStorageMap.end()) {
Expand All @@ -1020,7 +1026,7 @@ void StorageManager::recoveredQueuesCbImpl(
domIt->second.find(uri);
BSLS_ASSERT_SAFE(countMapIt != domIt->second.end());
++countMapIt->second;
recordsToPurge.push_back(fsIt.handle());
recordsToPurge.push_back(handle);
continue; // CONTINUE
}

Expand All @@ -1037,6 +1043,8 @@ void StorageManager::recoveredQueuesCbImpl(
mqbs::ReplicatedStorage* rs = storageMapIt->second;
BSLS_ASSERT_SAFE(rs);

const bool isStrongConsistency = rs->isStrongConsistency();

if (mqbs::RecordType::e_QUEUE_OP != fsIt.type()) {
// It's one of MESSAGE/CONFIRM/DELETION records, which means it
// must be a file-backed storage.
Expand All @@ -1060,7 +1068,7 @@ void StorageManager::recoveredQueuesCbImpl(
BSLS_ASSERT_SAFE(guid.isUnset());
BSLS_ASSERT_SAFE(mqbs::QueueOpType::e_UNDEFINED != queueOpType);

rs->addQueueOpRecordHandle(fsIt.handle());
rs->addQueueOpRecordHandle(handle);

if (mqbs::QueueOpType::e_PURGE == queueOpType &&
!appKey.isNull()) {
Expand All @@ -1087,9 +1095,13 @@ void StorageManager::recoveredQueuesCbImpl(
else if (mqbs::RecordType::e_MESSAGE == fsIt.type()) {
BSLS_ASSERT_SAFE(false == guid.isUnset());
rs->processMessageRecord(guid,
fs->getMessageLenRaw(fsIt.handle()),
fs->getMessageLenRaw(handle),
refCount,
fsIt.handle());
handle);
if (isStrongConsistency) {
lastStrongConsistencySequenceNum = handle.sequenceNum();
lastStrongConsistencyPrimaryLeaseId = handle.primaryLeaseId();
}
}
else if (mqbs::RecordType::e_CONFIRM == fsIt.type()) {
BSLS_ASSERT_SAFE(false == guid.isUnset());
Expand All @@ -1111,7 +1123,7 @@ void StorageManager::recoveredQueuesCbImpl(
<< "]. Dropping this record." << MWCTSK_ALARMLOG_END;
continue; // CONTINUE
}
rs->processConfirmRecord(guid, appKey, fsIt.handle());
rs->processConfirmRecord(guid, appKey, handle);
}
else {
BSLS_ASSERT(false);
Expand All @@ -1124,6 +1136,9 @@ void StorageManager::recoveredQueuesCbImpl(
++it) {
fs->removeRecordRaw(*it);
}

fs->setLastStrongConsistency(lastStrongConsistencyPrimaryLeaseId,
lastStrongConsistencySequenceNum);
}

void StorageManager::recoveredQueuesCb(int partitionId,
Expand Down
8 changes: 8 additions & 0 deletions src/groups/mqb/mqbs/mqbs_datastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -516,6 +516,8 @@ class DataStoreRecordHandle {

/// Return the Primary LeaseId which created the record.
unsigned int primaryLeaseId() const;

bsls::Types::Uint64 sequenceNum() const;
};

// FREE OPERATORS
Expand Down Expand Up @@ -1164,6 +1166,12 @@ inline unsigned int DataStoreRecordHandle::primaryLeaseId() const
return d_iterator->first.d_primaryLeaseId;
}

inline bsls::Types::Uint64 DataStoreRecordHandle::sequenceNum() const
{
BSLS_ASSERT_SAFE(isValid());
return d_iterator->first.d_sequenceNum;
}

} // close package namespace

// -------------------------
Expand Down
7 changes: 7 additions & 0 deletions src/groups/mqb/mqbs/mqbs_filebackedstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,8 @@ class FileBackedStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {

virtual const RecordHandles&
queueOpRecordHandles() const BSLS_KEYWORD_OVERRIDE;

virtual bool isStrongConsistency() const BSLS_KEYWORD_OVERRIDE;
};

// ===============================
Expand Down Expand Up @@ -750,6 +752,11 @@ FileBackedStorage::queueOpRecordHandles() const
return d_queueOpRecordHandles;
}

inline bool FileBackedStorage::isStrongConsistency() const
{
return !d_hasReceipts;
}

inline int FileBackedStorage::numVirtualStorages() const
{
return d_virtualStorageCatalog.numVirtualStorages();
Expand Down
13 changes: 5 additions & 8 deletions src/groups/mqb/mqbs/mqbs_filestore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2498,11 +2498,6 @@ int FileStore::recoverMessages(QueueKeyInfoMap* queueKeyInfoMap,
record.d_messagePropertiesInfo = bmqp::MessagePropertiesInfo(
*dataHeader);

if (d_lastRecoveredMessage < key) {
// This will be used as Implicit Receipt
d_lastRecoveredMessage = key;
}

// Update in-memory record mapping.
d_records.rinsert(bsl::make_pair(key, record));

Expand Down Expand Up @@ -5130,6 +5125,7 @@ FileStore::FileStore(const DataStoreConfig& config,
, d_unreceipted(d_allocators.get("UnreceiptedRecords"))
, d_replicationFactor(replicationFactor)
, d_nodes(allocator)
, d_lastRecoveredStrongConsistency()
, d_fileSets(allocator)
, d_cluster_p(cluster)
, d_miscWorkThreadPool_p(miscWorkThreadPool)
Expand Down Expand Up @@ -6528,14 +6524,15 @@ void FileStore::setPrimary(mqbnet::ClusterNode* primaryNode,
d_config.scheduler()->cancelEvent(
&d_partitionHighwatermarkEventHandle);

if (d_lastRecoveredMessage.d_primaryLeaseId == d_primaryLeaseId) {
if (d_lastRecoveredStrongConsistency.d_primaryLeaseId ==
d_primaryLeaseId) {
BALL_LOG_INFO << partitionDesc() << "Issuing Implicit Receipt ["
<< "primaryLeaseId = " << d_primaryLeaseId
<< ", d_sequenceNum = " << d_sequenceNum << "].";

issueReceipt(primaryNode,
d_lastRecoveredMessage.d_primaryLeaseId,
d_lastRecoveredMessage.d_sequenceNum);
d_lastRecoveredStrongConsistency.d_primaryLeaseId,
d_lastRecoveredStrongConsistency.d_sequenceNum);
}
return; // RETURN
}
Expand Down
14 changes: 13 additions & 1 deletion src/groups/mqb/mqbs/mqbs_filestore.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class FileStore : public DataStore {

NodeReceiptContexts d_nodes;

DataStoreRecordKey d_lastRecoveredMessage;
DataStoreRecordKey d_lastRecoveredStrongConsistency;

FileSets d_fileSets;
// List of file sets. File set at
Expand Down Expand Up @@ -898,6 +898,10 @@ class FileStore : public DataStore {
/// set this to true during testing.
void setIgnoreCrc32c(bool value);

// This will be used as Implicit Receipt
void setLastStrongConsistency(unsigned int primaryLeaseId,
bsls::Types::Uint64 sequenceNum);

/// Load into the specified `storages` the list of queue storages for
/// which all filters from the specified `filters` are returning true.
void getStorages(StorageList* storages,
Expand Down Expand Up @@ -1174,6 +1178,14 @@ inline void FileStore::setIgnoreCrc32c(bool value)
d_ignoreCrc32c = value;
}

inline void
FileStore::setLastStrongConsistency(unsigned int primaryLeaseId,
bsls::Types::Uint64 sequenceNum)
{
d_lastRecoveredStrongConsistency.d_primaryLeaseId = primaryLeaseId;
d_lastRecoveredStrongConsistency.d_sequenceNum = sequenceNum;
}

// ACCESSORS
inline const mqbi::DispatcherClientData&
FileStore::dispatcherClientData() const
Expand Down
5 changes: 5 additions & 0 deletions src/groups/mqb/mqbs/mqbs_inmemorystorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -584,6 +584,11 @@ InMemoryStorage::queueOpRecordHandles() const
return d_queueOpRecordHandles;
}

bool InMemoryStorage::isStrongConsistency() const
{
return false;
}

// -----------------------------
// class InMemoryStorageIterator
// -----------------------------
Expand Down
4 changes: 3 additions & 1 deletion src/groups/mqb/mqbs/mqbs_inmemorystorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,8 @@ class InMemoryStorage BSLS_KEYWORD_FINAL : public ReplicatedStorage {

virtual const RecordHandles&
queueOpRecordHandles() const BSLS_KEYWORD_OVERRIDE;

virtual bool isStrongConsistency() const BSLS_KEYWORD_OVERRIDE;
};

// =============================
Expand Down Expand Up @@ -807,7 +809,7 @@ inline bool InMemoryStorage::hasVirtualStorage(const bsl::string& appId,
return d_virtualStorageCatalog.hasVirtualStorage(appId, appKey);
}

inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID& msgGUID) const
inline bool InMemoryStorage::hasReceipt(const bmqt::MessageGUID&) const
{
return true;
}
Expand Down
3 changes: 3 additions & 0 deletions src/groups/mqb/mqbs/mqbs_replicatedstorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ class ReplicatedStorage : public mqbi::Storage {
/// Return a non-modifiable list of handles of all QUEUEOP records
/// associated with this storage.
virtual const RecordHandles& queueOpRecordHandles() const = 0;

// Return 'true' if the storage is of the strong consistency
virtual bool isStrongConsistency() const = 0;
};

// ============================================================================
Expand Down

0 comments on commit e619d69

Please sign in to comment.