Skip to content

Commit

Permalink
mqbc::RecoveryManager: Apply clang format
Browse files Browse the repository at this point in the history
Signed-off-by: Yuan Jing Vincent Yan <[email protected]>
  • Loading branch information
kaikulimu committed Aug 23, 2024
1 parent 3bcd80b commit 39fa4e7
Showing 1 changed file with 50 additions and 68 deletions.
118 changes: 50 additions & 68 deletions src/groups/mqb/mqbc/mqbc_recoverymanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to truncate journal file ["
<< partitionId << "]: " << "Failed to truncate journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << ", error: " << errorDesc.str()
<< MWCTSK_ALARMLOG_END;
Expand All @@ -189,8 +188,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to flush journal file ["
<< partitionId << "]: " << "Failed to flush journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << ", error: " << errorDesc.str()
<< MWCTSK_ALARMLOG_END;
Expand All @@ -201,8 +199,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to close journal file ["
<< partitionId << "]: " << "Failed to close journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << MWCTSK_ALARMLOG_END;
}
Expand All @@ -213,8 +210,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (0 != rc) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to move file ["
<< partitionId << "]: " << "Failed to move file ["
<< recoveryCtx.d_recoveryFileSet.journalFile() << "] "
<< "to location [" << d_dataStoreConfig.archiveLocation()
<< "] rc: " << rc << MWCTSK_ALARMLOG_END;
Expand All @@ -228,8 +224,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to truncate data file ["
<< partitionId << "]: " << "Failed to truncate data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< ", error: " << errorDesc.str() << MWCTSK_ALARMLOG_END;
errorDesc.reset();
Expand All @@ -241,8 +236,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to flush data file ["
<< partitionId << "]: " << "Failed to flush data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< ", error: " << errorDesc.str() << MWCTSK_ALARMLOG_END;
errorDesc.reset();
Expand All @@ -252,8 +246,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to close data file ["
<< partitionId << "]: " << "Failed to close data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< MWCTSK_ALARMLOG_END;
}
Expand All @@ -263,8 +256,7 @@ void RecoveryManager::deprecateFileSet(int partitionId)
if (0 != rc) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to move file ["
<< partitionId << "]: " << "Failed to move file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "] "
<< "to location [" << d_dataStoreConfig.archiveLocation()
<< "] rc: " << rc << MWCTSK_ALARMLOG_END;
Expand Down Expand Up @@ -324,12 +316,12 @@ void RecoveryManager::setExpectedDataChunkRange(

BALL_LOG_INFO_BLOCK
{
BALL_LOG_OUTPUT_STREAM << d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< "Got notification to expect data chunks "
<< "of range " << beginSeqNum << " to "
<< endSeqNum << " from "
<< source->nodeDescription() << ".";
BALL_LOG_OUTPUT_STREAM
<< d_clusterData.identity().description() << " Partition ["
<< partitionId
<< "]: " << "Got notification to expect data chunks "
<< "of range " << beginSeqNum << " to " << endSeqNum << " from "
<< source->nodeDescription() << ".";
if (requestId != -1) {
BALL_LOG_OUTPUT_STREAM << " Recovery requestId is " << requestId
<< ".";
Expand Down Expand Up @@ -552,9 +544,8 @@ int RecoveryManager::processSendDataChunks(
}

BALL_LOG_INFO << d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Sent data chunks from " << beginSeqNum << " to "
<< endSeqNum
<< partitionId << "]: " << "Sent data chunks from "
<< beginSeqNum << " to " << endSeqNum
<< " to node: " << destination->nodeDescription() << ".";

return rc_SUCCESS;
Expand Down Expand Up @@ -594,8 +585,8 @@ int RecoveryManager::processReceiveDataChunks(
if (!receiveDataCtx.d_expectChunks) {
MWCTSK_ALARMLOG_ALARM("RECOVERY")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Received partition-sync event from node "
<< partitionId
<< "]: " << "Received partition-sync event from node "
<< source->nodeDescription()
<< ", but self is not expecting data chunks. "
<< "Ignoring this event." << MWCTSK_ALARMLOG_END;
Expand All @@ -606,8 +597,8 @@ int RecoveryManager::processReceiveDataChunks(
if (receiveDataCtx.d_recoveryDataSource_p->nodeId() != source->nodeId()) {
MWCTSK_ALARMLOG_ALARM("RECOVERY")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Received partition-sync event from node "
<< partitionId
<< "]: " << "Received partition-sync event from node "
<< source->nodeDescription()
<< ", which is not identified as recovery peer node "
<< receiveDataCtx.d_recoveryDataSource_p->nodeDescription()
Expand Down Expand Up @@ -681,8 +672,8 @@ int RecoveryManager::processReceiveDataChunks(
if (recordSeqNum <= receiveDataCtx.d_currSeqNum) {
MWCTSK_ALARMLOG_ALARM("REPLICATION")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Received partition sync msg of type "
<< partitionId
<< "]: " << "Received partition sync msg of type "
<< header.messageType() << " with sequenceNumber "
<< recordSeqNum
<< ", smaller than or equal to self current sequence number: "
Expand All @@ -696,8 +687,8 @@ int RecoveryManager::processReceiveDataChunks(
if (recordSeqNum > receiveDataCtx.d_endSeqNum) {
MWCTSK_ALARMLOG_ALARM("REPLICATION")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Received partition sync msg of type "
<< partitionId
<< "]: " << "Received partition sync msg of type "
<< header.messageType() << " with sequenceNumber "
<< recordSeqNum
<< ", larger than self's expected ending sequence number of "
Expand Down Expand Up @@ -725,9 +716,8 @@ int RecoveryManager::processReceiveDataChunks(

MWCTSK_ALARMLOG_ALARM("REPLICATION")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Received journal record of type [" << header.messageType()
<< "] with journal offset mismatch. "
<< partitionId << "]: " << "Received journal record of type ["
<< header.messageType() << "] with journal offset mismatch. "
<< "Source's journal offset: " << sourceJournalOffset
<< ", self journal offset: " << journalPos
<< ", msg sequence number (" << recHeader->primaryLeaseId()
Expand Down Expand Up @@ -861,10 +851,10 @@ int RecoveryManager::processReceiveDataChunks(
recordOffset);
if (mqbs::QueueOpType::e_CREATION != queueRec->type() &&
mqbs::QueueOpType::e_ADDITION != queueRec->type()) {
BALL_LOG_ERROR
<< d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< " Unexpected QueueOpType: " << queueRec->type();
BALL_LOG_ERROR << d_clusterData.identity().description()
<< " Partition [" << partitionId
<< "]: " << " Unexpected QueueOpType: "
<< queueRec->type();
return rc_INVALID_QUEUE_RECORD; // RETURN
}
}
Expand Down Expand Up @@ -924,8 +914,8 @@ int RecoveryManager::createRecoveryFileSet(bsl::ostream& errorDescription,
BSLS_ASSERT_SAFE(recoveryCtx.d_mappedDataFd.isValid());

BALL_LOG_INFO << d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Created recovery data file store set: "
<< partitionId
<< "]: " << "Created recovery data file store set: "
<< recoveryCtx.d_recoveryFileSet
<< ", journal file position: "
<< recoveryCtx.d_journalFilePosition
Expand Down Expand Up @@ -1067,8 +1057,7 @@ int RecoveryManager::openRecoveryFileSet(bsl::ostream& errorDescription,
}

BALL_LOG_INFO << d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Opened recovery file set: "
<< partitionId << "]: " << "Opened recovery file set: "
<< recoveryCtx.d_recoveryFileSet
<< ", journal file position: "
<< recoveryCtx.d_journalFilePosition
Expand Down Expand Up @@ -1105,8 +1094,7 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to truncate journal file ["
<< partitionId << "]: " << "Failed to truncate journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << ", error: " << errorDesc.str()
<< MWCTSK_ALARMLOG_END;
Expand All @@ -1120,8 +1108,7 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to flush journal file ["
<< partitionId << "]: " << "Failed to flush journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << ", error: " << errorDesc.str()
<< MWCTSK_ALARMLOG_END;
Expand All @@ -1132,16 +1119,15 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to close journal file ["
<< partitionId << "]: " << "Failed to close journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile()
<< "], rc: " << rc << MWCTSK_ALARMLOG_END;
return rc * 10 + rc_JOURNAL_FD_CLOSE_FAILURE; // RETURN
}

BALL_LOG_INFO << d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< "Closed journal file in recovery file set; "
<< " Partition [" << partitionId
<< "]: " << "Closed journal file in recovery file set; "
<< "journal file position was "
<< recoveryCtx.d_journalFilePosition;
}
Expand All @@ -1154,8 +1140,7 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to truncate data file ["
<< partitionId << "]: " << "Failed to truncate data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< ", error: " << errorDesc.str() << MWCTSK_ALARMLOG_END;
errorDesc.reset();
Expand All @@ -1167,8 +1152,7 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to flush data file ["
<< partitionId << "]: " << "Failed to flush data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< ", error: " << errorDesc.str() << MWCTSK_ALARMLOG_END;
errorDesc.reset();
Expand All @@ -1178,16 +1162,15 @@ int RecoveryManager::closeRecoveryFileSet(int partitionId)
if (rc != 0) {
MWCTSK_ALARMLOG_ALARM("FILE_IO")
<< d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Failed to close data file ["
<< partitionId << "]: " << "Failed to close data file ["
<< recoveryCtx.d_recoveryFileSet.dataFile() << "], rc: " << rc
<< MWCTSK_ALARMLOG_END;
return rc * 10 + rc_DATA_FD_CLOSE_FAILURE; // RETURN
}

BALL_LOG_INFO << d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< "Closed data file in recovery file set; "
<< " Partition [" << partitionId
<< "]: " << "Closed data file in recovery file set; "
<< "data file position was "
<< recoveryCtx.d_dataFilePosition;
}
Expand Down Expand Up @@ -1244,8 +1227,8 @@ int RecoveryManager::recoverSeqNum(
const mqbs::RecordHeader& lastRecordHeader = jit.lastRecordHeader();

BALL_LOG_INFO << d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< "Recovered Sequence Number "
<< " Partition [" << partitionId
<< "]: " << "Recovered Sequence Number "
<< lastRecordHeader.partitionSequenceNumber()
<< " from journal file ["
<< recoveryCtx.d_recoveryFileSet.journalFile() << "].";
Expand Down Expand Up @@ -1277,8 +1260,7 @@ void RecoveryManager::setLiveDataSource(mqbnet::ClusterNode* source,
RecoveryContext& recoveryCtx = d_recoveryContextVec[partitionId];

BALL_LOG_INFO << d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Setting live data source from "
<< partitionId << "]: " << "Setting live data source from "
<< (recoveryCtx.d_liveDataSource_p
? recoveryCtx.d_liveDataSource_p->nodeDescription()
: "** NULL **")
Expand All @@ -1305,8 +1287,8 @@ void RecoveryManager::bufferStorageEvent(
BSLS_ASSERT_SAFE(recoveryCtx.d_liveDataSource_p);
if (recoveryCtx.d_liveDataSource_p->nodeId() != source->nodeId()) {
BALL_LOG_ERROR << d_clusterData.identity().description()
<< " Partition [" << partitionId << "]: "
<< "Storage event from node "
<< " Partition [" << partitionId
<< "]: " << "Storage event from node "
<< source->nodeDescription() << "cannot be buffered, "
<< "because it is different from the expected live "
<< "data source node "
Expand All @@ -1319,8 +1301,8 @@ void RecoveryManager::bufferStorageEvent(
recoveryCtx.d_bufferedEvents.push_back(blob);

BALL_LOG_INFO << d_clusterData.identity().description() << " Partition ["
<< partitionId << "]: "
<< "Buffered a storage event from primary node "
<< partitionId
<< "]: " << "Buffered a storage event from primary node "
<< source->nodeDescription()
<< " as self is still healing the partition.";
}
Expand Down

0 comments on commit 39fa4e7

Please sign in to comment.