Skip to content

Commit

Permalink
[Java] Handle multiple PendingServiceMessageTrackers while producing …
Browse files Browse the repository at this point in the history
…consensus module patch.
  • Loading branch information
vyazelenko authored and DarrylGamroth committed Jan 22, 2025
1 parent 4c66f71 commit a863d2f
Show file tree
Hide file tree
Showing 3 changed files with 234 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ final class PendingServiceMessageTracker
this.logPublisher = logPublisher;
this.clusterClock = clusterClock;

logServiceSessionId = ((long)serviceId << 56) | Long.MIN_VALUE;
logServiceSessionId = serviceSessionId(serviceId, Long.MIN_VALUE);
nextServiceSessionId = logServiceSessionId + 1;
}

Expand Down Expand Up @@ -280,4 +280,9 @@ static int serviceId(final long clusterSessionId)
{
return ((int)(clusterSessionId >>> 56)) & 0x7F;
}

static long serviceSessionId(final int serviceId, final long sessionId)
{
return ((long)serviceId << 56) | sessionId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public class ConsensusModuleSnapshotPendingServiceMessagesPatch
{
static final int SNAPSHOT_REPLAY_STREAM_ID = 103;
static final int SNAPSHOT_RECORDING_STREAM_ID = 107;
static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch";
static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch|term-length=64m";
private final String archiveLocalRequestChannel;
private final int archiveLocalRequestStreamId;

Expand Down Expand Up @@ -111,20 +111,12 @@ public boolean execute(final File clusterDir)
final SnapshotReader snapshotReader = new SnapshotReader();
replayLocalSnapshotRecording(aeron, archive, recordingId, snapshotReader);

final long targetNextServiceSessionId = max(
max(snapshotReader.nextServiceSessionId, snapshotReader.maxClusterSessionId + 1),
snapshotReader.logServiceSessionId + 1 + snapshotReader.pendingServiceMessageCount);
final long targetLogServiceSessionId =
targetNextServiceSessionId - 1 - snapshotReader.pendingServiceMessageCount;

if (targetNextServiceSessionId != snapshotReader.nextServiceSessionId ||
targetLogServiceSessionId != snapshotReader.logServiceSessionId ||
0 != snapshotReader.pendingServiceMessageCount &&
(targetLogServiceSessionId + 1 != snapshotReader.minClusterSessionId ||
targetNextServiceSessionId - 1 != snapshotReader.maxClusterSessionId))
final TargetState[] targetStates =
TargetState.compute(snapshotReader.serviceCount, snapshotReader.pendingMessageTrackers);

if (snapshotIsNotValid(snapshotReader, targetStates))
{
final long tempRecordingId = createNewSnapshotRecording(
aeron, archive, recordingId, targetLogServiceSessionId, targetNextServiceSessionId);
final long tempRecordingId = createNewSnapshotRecording(aeron, archive, recordingId, targetStates);

final long stopPosition = awaitRecordingStopPosition(archive, recordingId);
final long newStopPosition = awaitRecordingStopPosition(archive, tempRecordingId);
Expand Down Expand Up @@ -211,12 +203,29 @@ static void replayLocalSnapshotRecording(
}
}

private static boolean snapshotIsNotValid(final SnapshotReader snapshotReader, final TargetState[] targetStates)
{
for (int i = 0; i < targetStates.length; i++)
{
final TargetState targetState = targetStates[i];
final PendingMessageTrackerState actualState = snapshotReader.pendingMessageTrackers[i];
if (targetState.nextServiceSessionId != actualState.nextServiceSessionId ||
targetState.logServiceSessionId != actualState.logServiceSessionId ||
0 != actualState.pendingServiceMessageCount &&
(targetState.logServiceSessionId + 1 != actualState.minClusterSessionId ||
targetState.nextServiceSessionId - 1 != actualState.maxClusterSessionId))
{
return true;
}
}

final TargetState defaultTracker = targetStates[0];
return defaultTracker.nextServiceSessionId != snapshotReader.nextServiceSessionId ||
defaultTracker.logServiceSessionId != snapshotReader.logServiceSessionId;
}

private static long createNewSnapshotRecording(
final Aeron aeron,
final AeronArchive archive,
final long oldRecordingId,
final long targetLogServiceSessionId,
final long targetNextServiceSessionId)
final Aeron aeron, final AeronArchive archive, final long oldRecordingId, final TargetState[] targetStates)
{
try (ExclusivePublication publication = archive.addRecordedExclusivePublication(
PATCH_CHANNEL, SNAPSHOT_RECORDING_STREAM_ID))
Expand All @@ -229,10 +238,7 @@ private static long createNewSnapshotRecording(
final long newRecordingId = RecordingPos.getRecordingId(countersReader, counterId);

replayLocalSnapshotRecording(
aeron,
archive,
oldRecordingId,
new SnapshotWriter(publication, targetLogServiceSessionId, targetNextServiceSessionId));
aeron, archive, oldRecordingId, new SnapshotWriter(publication, targetStates));

awaitRecordingComplete(countersReader, counterId, publication.position(), newRecordingId);

Expand Down Expand Up @@ -298,13 +304,51 @@ public static void main(final String[] args)
new ConsensusModuleSnapshotPendingServiceMessagesPatch().execute(new File(args[0]));
}

private static final class TargetState
{
final long nextServiceSessionId;
final long logServiceSessionId;
long clusterSessionId;

private TargetState(final long nextServiceSessionId, final long logServiceSessionId)
{
this.nextServiceSessionId = nextServiceSessionId;
this.logServiceSessionId = logServiceSessionId;
clusterSessionId = logServiceSessionId + 1;
}

static TargetState[] compute(final int serviceCount, final PendingMessageTrackerState[] states)
{
final TargetState[] targetStates = new TargetState[serviceCount];
for (int i = 0; i < serviceCount; i++)
{
final PendingMessageTrackerState state = states[i];
final long targetNextServiceSessionId = max(
max(state.nextServiceSessionId, state.maxClusterSessionId + 1),
state.logServiceSessionId + 1 + state.pendingServiceMessageCount);
final long targetLogServiceSessionId =
targetNextServiceSessionId - 1 - state.pendingServiceMessageCount;
targetStates[i] = new TargetState(targetNextServiceSessionId, targetLogServiceSessionId);
}
return targetStates;
}
}

private static final class PendingMessageTrackerState
{
long nextServiceSessionId = Long.MIN_VALUE;
long logServiceSessionId = Long.MIN_VALUE;
long minClusterSessionId = Long.MAX_VALUE;
long maxClusterSessionId = Long.MIN_VALUE;
int pendingServiceMessageCount;
}

private static final class SnapshotReader implements ConsensusModuleSnapshotListener
{
private long nextServiceSessionId = Long.MIN_VALUE;
private long logServiceSessionId = Long.MIN_VALUE;
private long minClusterSessionId = Long.MAX_VALUE;
private long maxClusterSessionId = Long.MIN_VALUE;
private int pendingServiceMessageCount = 0;
long nextServiceSessionId = Long.MIN_VALUE;
long logServiceSessionId = Long.MIN_VALUE;
final PendingMessageTrackerState[] pendingMessageTrackers = new PendingMessageTrackerState[127];
int serviceCount;

public void onLoadBeginSnapshot(
final int appVersion,
Expand All @@ -331,9 +375,11 @@ public void onLoadConsensusModuleState(
public void onLoadPendingMessage(
final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length)
{
pendingServiceMessageCount++;
minClusterSessionId = min(minClusterSessionId, clusterSessionId);
maxClusterSessionId = max(maxClusterSessionId, clusterSessionId);
final int serviceId = PendingServiceMessageTracker.serviceId(clusterSessionId);
final PendingMessageTrackerState trackerState = tracker(serviceId);
trackerState.pendingServiceMessageCount++;
trackerState.minClusterSessionId = min(trackerState.minClusterSessionId, clusterSessionId);
trackerState.maxClusterSessionId = max(trackerState.maxClusterSessionId, clusterSessionId);
}

public void onLoadClusterSession(
Expand Down Expand Up @@ -368,11 +414,26 @@ public void onLoadPendingMessageTracker(
final int offset,
final int length)
{
final PendingMessageTrackerState trackerState = tracker(serviceId);
trackerState.nextServiceSessionId = nextServiceSessionId;
trackerState.logServiceSessionId = logServiceSessionId;
}

public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length)
{
}

private PendingMessageTrackerState tracker(final int serviceId)
{
PendingMessageTrackerState trackerState = pendingMessageTrackers[serviceId];
if (null == trackerState)
{
trackerState = new PendingMessageTrackerState();
pendingMessageTrackers[serviceId] = trackerState;
serviceCount++;
}
return trackerState;
}
}

private static final class SnapshotWriter implements ConsensusModuleSnapshotListener
Expand All @@ -382,19 +443,14 @@ private static final class SnapshotWriter implements ConsensusModuleSnapshotList
private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder();
private final PendingMessageTrackerEncoder pendingMessageTrackerEncoder = new PendingMessageTrackerEncoder();
private final ExclusivePublication snapshotPublication;
private final long targetNextServiceSessionId;
private final long targetLogServiceSessionId;
private long nextClusterSessionId;
private final TargetState[] targetStates;

SnapshotWriter(
final ExclusivePublication snapshotPublication,
final long targetLogServiceSessionId,
final long targetNextServiceSessionId)
final TargetState[] targetStates)
{
this.snapshotPublication = snapshotPublication;
this.targetLogServiceSessionId = targetLogServiceSessionId;
this.targetNextServiceSessionId = targetNextServiceSessionId;
nextClusterSessionId = targetLogServiceSessionId + 1;
this.targetStates = targetStates;
}

public void onLoadBeginSnapshot(
Expand All @@ -416,11 +472,13 @@ public void onLoadConsensusModuleState(
final int offset,
final int length)
{
final TargetState defaultTrackerState = targetStates[0];

tempBuffer.putBytes(0, buffer, offset, length);
consensusModuleEncoder
.wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH)
.logServiceSessionId(targetLogServiceSessionId)
.nextServiceSessionId(targetNextServiceSessionId);
.logServiceSessionId(defaultTrackerState.logServiceSessionId)
.nextServiceSessionId(defaultTrackerState.nextServiceSessionId);

writeToSnapshot(tempBuffer, 0, length);
}
Expand All @@ -431,10 +489,12 @@ public void onLoadPendingMessage(
final int offset,
final int length)
{
final TargetState targetState = targetStates[PendingServiceMessageTracker.serviceId(clusterSessionId)];

tempBuffer.putBytes(0, buffer, offset, length);
sessionMessageHeaderEncoder
.wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH)
.clusterSessionId(nextClusterSessionId++);
.clusterSessionId(targetState.clusterSessionId++);

writeToSnapshot(tempBuffer, 0, length);
}
Expand Down Expand Up @@ -473,20 +533,15 @@ public void onLoadPendingMessageTracker(
final int offset,
final int length)
{
if (0 == serviceId)
{
tempBuffer.putBytes(0, buffer, offset, length);
pendingMessageTrackerEncoder
.wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH)
.logServiceSessionId(targetLogServiceSessionId)
.nextServiceSessionId(targetNextServiceSessionId);
final TargetState targetState = targetStates[serviceId];

writeToSnapshot(tempBuffer, 0, length);
}
else
{
writeToSnapshot(buffer, offset, length);
}
tempBuffer.putBytes(0, buffer, offset, length);
pendingMessageTrackerEncoder
.wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH)
.logServiceSessionId(targetState.logServiceSessionId)
.nextServiceSessionId(targetState.nextServiceSessionId);

writeToSnapshot(tempBuffer, 0, length);
}

public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length)
Expand Down
Loading

0 comments on commit a863d2f

Please sign in to comment.