From a863d2f3292ee4b7a88ca2b1880ca0a068ea4ef9 Mon Sep 17 00:00:00 2001 From: Dmytro Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Thu, 16 Jan 2025 14:37:04 +0100 Subject: [PATCH] [Java] Handle multiple PendingServiceMessageTrackers while producing consensus module patch. --- .../cluster/PendingServiceMessageTracker.java | 7 +- ...leSnapshotPendingServiceMessagesPatch.java | 165 +++++++++++------ ...apshotPendingServiceMessagesPatchTest.java | 172 ++++++++++++------ 3 files changed, 234 insertions(+), 110 deletions(-) diff --git a/aeron-cluster/src/main/java/io/aeron/cluster/PendingServiceMessageTracker.java b/aeron-cluster/src/main/java/io/aeron/cluster/PendingServiceMessageTracker.java index 11133e00a4..b050b42638 100644 --- a/aeron-cluster/src/main/java/io/aeron/cluster/PendingServiceMessageTracker.java +++ b/aeron-cluster/src/main/java/io/aeron/cluster/PendingServiceMessageTracker.java @@ -60,7 +60,7 @@ final class PendingServiceMessageTracker this.logPublisher = logPublisher; this.clusterClock = clusterClock; - logServiceSessionId = ((long)serviceId << 56) | Long.MIN_VALUE; + logServiceSessionId = serviceSessionId(serviceId, Long.MIN_VALUE); nextServiceSessionId = logServiceSessionId + 1; } @@ -280,4 +280,9 @@ static int serviceId(final long clusterSessionId) { return ((int)(clusterSessionId >>> 56)) & 0x7F; } + + static long serviceSessionId(final int serviceId, final long sessionId) + { + return ((long)serviceId << 56) | sessionId; + } } diff --git a/aeron-samples/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatch.java b/aeron-samples/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatch.java index e77bf7282d..358510c693 100644 --- a/aeron-samples/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatch.java +++ b/aeron-samples/src/main/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatch.java @@ -53,7 +53,7 @@ public class ConsensusModuleSnapshotPendingServiceMessagesPatch { static final int SNAPSHOT_REPLAY_STREAM_ID = 103; static final int SNAPSHOT_RECORDING_STREAM_ID = 107; - static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch"; + static final String PATCH_CHANNEL = "aeron:ipc?alias=consensus-module-snapshot-patch|term-length=64m"; private final String archiveLocalRequestChannel; private final int archiveLocalRequestStreamId; @@ -111,20 +111,12 @@ public boolean execute(final File clusterDir) final SnapshotReader snapshotReader = new SnapshotReader(); replayLocalSnapshotRecording(aeron, archive, recordingId, snapshotReader); - final long targetNextServiceSessionId = max( - max(snapshotReader.nextServiceSessionId, snapshotReader.maxClusterSessionId + 1), - snapshotReader.logServiceSessionId + 1 + snapshotReader.pendingServiceMessageCount); - final long targetLogServiceSessionId = - targetNextServiceSessionId - 1 - snapshotReader.pendingServiceMessageCount; - - if (targetNextServiceSessionId != snapshotReader.nextServiceSessionId || - targetLogServiceSessionId != snapshotReader.logServiceSessionId || - 0 != snapshotReader.pendingServiceMessageCount && - (targetLogServiceSessionId + 1 != snapshotReader.minClusterSessionId || - targetNextServiceSessionId - 1 != snapshotReader.maxClusterSessionId)) + final TargetState[] targetStates = + TargetState.compute(snapshotReader.serviceCount, snapshotReader.pendingMessageTrackers); + + if (snapshotIsNotValid(snapshotReader, targetStates)) { - final long tempRecordingId = createNewSnapshotRecording( - aeron, archive, recordingId, targetLogServiceSessionId, targetNextServiceSessionId); + final long tempRecordingId = createNewSnapshotRecording(aeron, archive, recordingId, targetStates); final long stopPosition = awaitRecordingStopPosition(archive, recordingId); final long newStopPosition = awaitRecordingStopPosition(archive, tempRecordingId); @@ -211,12 +203,29 @@ static void replayLocalSnapshotRecording( } } + private static boolean snapshotIsNotValid(final SnapshotReader snapshotReader, final TargetState[] targetStates) + { + for (int i = 0; i < targetStates.length; i++) + { + final TargetState targetState = targetStates[i]; + final PendingMessageTrackerState actualState = snapshotReader.pendingMessageTrackers[i]; + if (targetState.nextServiceSessionId != actualState.nextServiceSessionId || + targetState.logServiceSessionId != actualState.logServiceSessionId || + 0 != actualState.pendingServiceMessageCount && + (targetState.logServiceSessionId + 1 != actualState.minClusterSessionId || + targetState.nextServiceSessionId - 1 != actualState.maxClusterSessionId)) + { + return true; + } + } + + final TargetState defaultTracker = targetStates[0]; + return defaultTracker.nextServiceSessionId != snapshotReader.nextServiceSessionId || + defaultTracker.logServiceSessionId != snapshotReader.logServiceSessionId; + } + private static long createNewSnapshotRecording( - final Aeron aeron, - final AeronArchive archive, - final long oldRecordingId, - final long targetLogServiceSessionId, - final long targetNextServiceSessionId) + final Aeron aeron, final AeronArchive archive, final long oldRecordingId, final TargetState[] targetStates) { try (ExclusivePublication publication = archive.addRecordedExclusivePublication( PATCH_CHANNEL, SNAPSHOT_RECORDING_STREAM_ID)) @@ -229,10 +238,7 @@ private static long createNewSnapshotRecording( final long newRecordingId = RecordingPos.getRecordingId(countersReader, counterId); replayLocalSnapshotRecording( - aeron, - archive, - oldRecordingId, - new SnapshotWriter(publication, targetLogServiceSessionId, targetNextServiceSessionId)); + aeron, archive, oldRecordingId, new SnapshotWriter(publication, targetStates)); awaitRecordingComplete(countersReader, counterId, publication.position(), newRecordingId); @@ -298,13 +304,51 @@ public static void main(final String[] args) new ConsensusModuleSnapshotPendingServiceMessagesPatch().execute(new File(args[0])); } + private static final class TargetState + { + final long nextServiceSessionId; + final long logServiceSessionId; + long clusterSessionId; + + private TargetState(final long nextServiceSessionId, final long logServiceSessionId) + { + this.nextServiceSessionId = nextServiceSessionId; + this.logServiceSessionId = logServiceSessionId; + clusterSessionId = logServiceSessionId + 1; + } + + static TargetState[] compute(final int serviceCount, final PendingMessageTrackerState[] states) + { + final TargetState[] targetStates = new TargetState[serviceCount]; + for (int i = 0; i < serviceCount; i++) + { + final PendingMessageTrackerState state = states[i]; + final long targetNextServiceSessionId = max( + max(state.nextServiceSessionId, state.maxClusterSessionId + 1), + state.logServiceSessionId + 1 + state.pendingServiceMessageCount); + final long targetLogServiceSessionId = + targetNextServiceSessionId - 1 - state.pendingServiceMessageCount; + targetStates[i] = new TargetState(targetNextServiceSessionId, targetLogServiceSessionId); + } + return targetStates; + } + } + + private static final class PendingMessageTrackerState + { + long nextServiceSessionId = Long.MIN_VALUE; + long logServiceSessionId = Long.MIN_VALUE; + long minClusterSessionId = Long.MAX_VALUE; + long maxClusterSessionId = Long.MIN_VALUE; + int pendingServiceMessageCount; + } + private static final class SnapshotReader implements ConsensusModuleSnapshotListener { - private long nextServiceSessionId = Long.MIN_VALUE; - private long logServiceSessionId = Long.MIN_VALUE; - private long minClusterSessionId = Long.MAX_VALUE; - private long maxClusterSessionId = Long.MIN_VALUE; - private int pendingServiceMessageCount = 0; + long nextServiceSessionId = Long.MIN_VALUE; + long logServiceSessionId = Long.MIN_VALUE; + final PendingMessageTrackerState[] pendingMessageTrackers = new PendingMessageTrackerState[127]; + int serviceCount; public void onLoadBeginSnapshot( final int appVersion, @@ -331,9 +375,11 @@ public void onLoadConsensusModuleState( public void onLoadPendingMessage( final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) { - pendingServiceMessageCount++; - minClusterSessionId = min(minClusterSessionId, clusterSessionId); - maxClusterSessionId = max(maxClusterSessionId, clusterSessionId); + final int serviceId = PendingServiceMessageTracker.serviceId(clusterSessionId); + final PendingMessageTrackerState trackerState = tracker(serviceId); + trackerState.pendingServiceMessageCount++; + trackerState.minClusterSessionId = min(trackerState.minClusterSessionId, clusterSessionId); + trackerState.maxClusterSessionId = max(trackerState.maxClusterSessionId, clusterSessionId); } public void onLoadClusterSession( @@ -368,11 +414,26 @@ public void onLoadPendingMessageTracker( final int offset, final int length) { + final PendingMessageTrackerState trackerState = tracker(serviceId); + trackerState.nextServiceSessionId = nextServiceSessionId; + trackerState.logServiceSessionId = logServiceSessionId; } public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length) { } + + private PendingMessageTrackerState tracker(final int serviceId) + { + PendingMessageTrackerState trackerState = pendingMessageTrackers[serviceId]; + if (null == trackerState) + { + trackerState = new PendingMessageTrackerState(); + pendingMessageTrackers[serviceId] = trackerState; + serviceCount++; + } + return trackerState; + } } private static final class SnapshotWriter implements ConsensusModuleSnapshotListener @@ -382,19 +443,14 @@ private static final class SnapshotWriter implements ConsensusModuleSnapshotList private final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder(); private final PendingMessageTrackerEncoder pendingMessageTrackerEncoder = new PendingMessageTrackerEncoder(); private final ExclusivePublication snapshotPublication; - private final long targetNextServiceSessionId; - private final long targetLogServiceSessionId; - private long nextClusterSessionId; + private final TargetState[] targetStates; SnapshotWriter( final ExclusivePublication snapshotPublication, - final long targetLogServiceSessionId, - final long targetNextServiceSessionId) + final TargetState[] targetStates) { this.snapshotPublication = snapshotPublication; - this.targetLogServiceSessionId = targetLogServiceSessionId; - this.targetNextServiceSessionId = targetNextServiceSessionId; - nextClusterSessionId = targetLogServiceSessionId + 1; + this.targetStates = targetStates; } public void onLoadBeginSnapshot( @@ -416,11 +472,13 @@ public void onLoadConsensusModuleState( final int offset, final int length) { + final TargetState defaultTrackerState = targetStates[0]; + tempBuffer.putBytes(0, buffer, offset, length); consensusModuleEncoder .wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH) - .logServiceSessionId(targetLogServiceSessionId) - .nextServiceSessionId(targetNextServiceSessionId); + .logServiceSessionId(defaultTrackerState.logServiceSessionId) + .nextServiceSessionId(defaultTrackerState.nextServiceSessionId); writeToSnapshot(tempBuffer, 0, length); } @@ -431,10 +489,12 @@ public void onLoadPendingMessage( final int offset, final int length) { + final TargetState targetState = targetStates[PendingServiceMessageTracker.serviceId(clusterSessionId)]; + tempBuffer.putBytes(0, buffer, offset, length); sessionMessageHeaderEncoder .wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH) - .clusterSessionId(nextClusterSessionId++); + .clusterSessionId(targetState.clusterSessionId++); writeToSnapshot(tempBuffer, 0, length); } @@ -473,20 +533,15 @@ public void onLoadPendingMessageTracker( final int offset, final int length) { - if (0 == serviceId) - { - tempBuffer.putBytes(0, buffer, offset, length); - pendingMessageTrackerEncoder - .wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH) - .logServiceSessionId(targetLogServiceSessionId) - .nextServiceSessionId(targetNextServiceSessionId); + final TargetState targetState = targetStates[serviceId]; - writeToSnapshot(tempBuffer, 0, length); - } - else - { - writeToSnapshot(buffer, offset, length); - } + tempBuffer.putBytes(0, buffer, offset, length); + pendingMessageTrackerEncoder + .wrap(tempBuffer, MessageHeaderEncoder.ENCODED_LENGTH) + .logServiceSessionId(targetState.logServiceSessionId) + .nextServiceSessionId(targetState.nextServiceSessionId); + + writeToSnapshot(tempBuffer, 0, length); } public void onLoadEndSnapshot(final DirectBuffer buffer, final int offset, final int length) diff --git a/aeron-samples/src/test/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatchTest.java b/aeron-samples/src/test/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatchTest.java index dc7ba92030..638dae951c 100644 --- a/aeron-samples/src/test/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatchTest.java +++ b/aeron-samples/src/test/java/io/aeron/cluster/ConsensusModuleSnapshotPendingServiceMessagesPatchTest.java @@ -24,6 +24,8 @@ import io.aeron.cluster.codecs.ConsensusModuleEncoder; import io.aeron.cluster.codecs.MessageHeaderDecoder; import io.aeron.cluster.codecs.MessageHeaderEncoder; +import io.aeron.cluster.codecs.PendingMessageTrackerDecoder; +import io.aeron.cluster.codecs.PendingMessageTrackerEncoder; import io.aeron.cluster.codecs.SessionMessageHeaderDecoder; import io.aeron.cluster.codecs.SessionMessageHeaderEncoder; import io.aeron.samples.archive.RecordingReplicator; @@ -63,6 +65,7 @@ import static io.aeron.CommonContext.IPC_CHANNEL; import static io.aeron.CommonContext.NULL_SESSION_ID; import static io.aeron.cluster.ConsensusModuleSnapshotPendingServiceMessagesPatch.replayLocalSnapshotRecording; +import static io.aeron.cluster.PendingServiceMessageTracker.*; import static io.aeron.test.cluster.TestCluster.aCluster; import static java.nio.ByteOrder.LITTLE_ENDIAN; import static org.agrona.BitUtil.SIZE_OF_INT; @@ -175,27 +178,27 @@ void executeIsANoOpIfTheSnapshotIsValid() final MutableLong mutableLogServiceSessionId = new MutableLong(NULL_SESSION_ID); final LongArrayList pendingMessageClusterSessionIds = new LongArrayList(); final ConsensusModuleSnapshotListener stateReader = new NoOpConsensusModuleSnapshotListener() + { + public void onLoadConsensusModuleState( + final long nextSessionId, + final long nextServiceSessionId, + final long logServiceSessionId, + final int pendingMessageCapacity, + final DirectBuffer buffer, + final int offset, + final int length) { - public void onLoadConsensusModuleState( - final long nextSessionId, - final long nextServiceSessionId, - final long logServiceSessionId, - final int pendingMessageCapacity, - final DirectBuffer buffer, - final int offset, - final int length) - { - mutableNextSessionId.set(nextSessionId); - mutableNextServiceSessionId.set(nextServiceSessionId); - mutableLogServiceSessionId.set(logServiceSessionId); - } + mutableNextSessionId.set(nextSessionId); + mutableNextServiceSessionId.set(nextServiceSessionId); + mutableLogServiceSessionId.set(logServiceSessionId); + } - public void onLoadPendingMessage( - final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) - { - pendingMessageClusterSessionIds.add(clusterSessionId); - } - }; + public void onLoadPendingMessage( + final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) + { + pendingMessageClusterSessionIds.add(clusterSessionId); + } + }; readSnapshotRecording(leader, leaderSnapshot.recordingId, stateReader); @@ -290,7 +293,12 @@ void executeShouldPatchTheStateOfTheLeaderSnapshot( final MutableLong mutableNextServiceSessionId = new MutableLong(); final MutableLong mutableLogServiceSessionId = new MutableLong(); final MutableInteger consensusModuleStateOffset = new MutableInteger(); - final IntArrayList pendingMessageOffsets = new IntArrayList(); + final IntArrayList pendingMessageTrackerOffsets = new IntArrayList(); + final IntArrayList[] pendingMessageOffsets = new IntArrayList[serviceCount]; + for (int i = 0; i < serviceCount; i++) + { + pendingMessageOffsets[i] = new IntArrayList(); + } readSnapshotRecording( leader, leaderSnapshot.recordingId, @@ -312,18 +320,37 @@ public void onLoadConsensusModuleState( assertEquals(MessageHeaderDecoder.ENCODED_LENGTH + ConsensusModuleDecoder.BLOCK_LENGTH, length); } + public void onLoadPendingMessageTracker( + final long nextServiceSessionId, + final long logServiceSessionId, + final int pendingMessageCapacity, + final int serviceId, + final DirectBuffer buffer, + final int offset, + final int length) + { + pendingMessageTrackerOffsets.add(offset); + assertEquals( + MessageHeaderDecoder.ENCODED_LENGTH + PendingMessageTrackerDecoder.BLOCK_LENGTH, + length); + } + public void onLoadPendingMessage( final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) { - pendingMessageOffsets.addInt(offset); + final int serviceId = serviceId(clusterSessionId); + pendingMessageOffsets[serviceId].addInt(offset); assertEquals( MessageHeaderDecoder.ENCODED_LENGTH + SessionMessageHeaderDecoder.BLOCK_LENGTH + SIZE_OF_INT, length); } }); assertNotEquals(0, consensusModuleStateOffset.get()); - final int numPendingMessages = pendingMessageOffsets.size(); - assertNotEquals(0, numPendingMessages); + assertEquals(serviceCount, pendingMessageTrackerOffsets.size()); + for (int i = 0; i < serviceCount; i++) + { + assertNotEquals(0, pendingMessageOffsets[i].size()); + } final long expectedNextSessionId = mutableNextSessionId.get(); assertNotEquals(Long.toString(mutableLogServiceSessionId.get()), baseLogServiceSessionId); @@ -333,6 +360,7 @@ public void onLoadPendingMessage( leader, leaderSnapshot, consensusModuleStateOffset, + pendingMessageTrackerOffsets, pendingMessageOffsets, baseLogServiceSessionId, baseNextServiceSessionId, @@ -346,13 +374,17 @@ public void onLoadPendingMessage( assertTrue(snapshotPatch.execute(leaderClusterDir)); final MutableBoolean hasLoadedConsensusModuleState = new MutableBoolean(); - final MutableInteger onLoadPendingMessageCount = new MutableInteger(); + final MutableInteger[] onLoadPendingMessageCounters = new MutableInteger[serviceCount]; + for (int i = 0; i < serviceCount; i++) + { + onLoadPendingMessageCounters[i] = new MutableInteger(); + } readSnapshotRecording( leader, leaderSnapshot.recordingId, new NoOpConsensusModuleSnapshotListener() { - long nextClusterSessionId; + final long[] nextClusterSessionIds = new long[serviceCount]; public void onLoadConsensusModuleState( final long nextSessionId, @@ -364,6 +396,7 @@ public void onLoadConsensusModuleState( final int length) { assertEquals(expectedNextSessionId, nextSessionId); + final int numPendingMessages = pendingMessageOffsets[0].size(); switch (mode) { @@ -394,19 +427,35 @@ public void onLoadConsensusModuleState( } } - nextClusterSessionId = logServiceSessionId + 1; hasLoadedConsensusModuleState.set(true); } + public void onLoadPendingMessageTracker( + final long nextServiceSessionId, + final long logServiceSessionId, + final int pendingMessageCapacity, + final int serviceId, + final DirectBuffer buffer, + final int offset, + final int length) + { + nextClusterSessionIds[serviceId] = logServiceSessionId + 1; + } + public void onLoadPendingMessage( final long clusterSessionId, final DirectBuffer buffer, final int offset, final int length) { - assertEquals(nextClusterSessionId++, clusterSessionId, "Invalid pending message header!"); - onLoadPendingMessageCount.increment(); + final int serviceId = serviceId(clusterSessionId); + onLoadPendingMessageCounters[serviceId].increment(); + assertEquals( + nextClusterSessionIds[serviceId]++, clusterSessionId, "Invalid pending message header!"); } }); assertTrue(hasLoadedConsensusModuleState.get()); - assertEquals(numPendingMessages, onLoadPendingMessageCount.get()); + for (int i = 0; i < serviceCount; i++) + { + assertEquals(pendingMessageOffsets[i].size(), onLoadPendingMessageCounters[i].get()); + } for (int i = 0; i < 3; i++) { @@ -469,13 +518,14 @@ private static void modifySnapshot( final TestNode leader, final RecordingLog.Entry leaderSnapshot, final MutableInteger consensusModuleStateOffset, - final IntArrayList pendingMessageOffsets, + final IntArrayList pendingMessageTrackerOffsets, + final IntArrayList[] pendingMessageOffsets, final String baseLogServiceSessionId, final String baseNextServiceSessionId, final long clusterSessionIdLowerBound, final long clusterSessionIdUpperBound) { - final int numberOfPendingMessages = pendingMessageOffsets.size(); + final int numberOfPendingMessages = pendingMessageOffsets[0].size(); final long logServiceSessionId; if ("$compute".equals(baseLogServiceSessionId)) { @@ -503,6 +553,7 @@ private static void modifySnapshot( { final UnsafeBuffer snapshotBuffer = new UnsafeBuffer(mappedByteBuffer); final MessageHeaderEncoder messageHeaderEncoder = new MessageHeaderEncoder(); + final MessageHeaderDecoder messageHeaderDecoder = new MessageHeaderDecoder(); // Set the ConsensusModuleState values final ConsensusModuleEncoder consensusModuleEncoder = new ConsensusModuleEncoder(); @@ -511,33 +562,46 @@ private static void modifySnapshot( .logServiceSessionId(logServiceSessionId) .nextServiceSessionId(nextServiceSessionId); + final PendingMessageTrackerEncoder pendingMessageTrackerEncoder = new PendingMessageTrackerEncoder(); + final int serviceCount = pendingMessageTrackerOffsets.size(); + for (int i = 0; i < serviceCount; i++) + { + final int offset = pendingMessageTrackerOffsets.get(i); + pendingMessageTrackerEncoder + .wrapAndApplyHeader(snapshotBuffer, offset, messageHeaderEncoder) + .logServiceSessionId(serviceSessionId(i, logServiceSessionId)) + .nextServiceSessionId(serviceSessionId(i, nextServiceSessionId)); + } + // Now randomize clusterSessionId of every pending service message final SessionMessageHeaderEncoder sessionMessageHeaderEncoder = new SessionMessageHeaderEncoder(); + final SessionMessageHeaderDecoder sessionMessageHeaderDecoder = new SessionMessageHeaderDecoder(); final MutableInteger count = new MutableInteger(); - pendingMessageOffsets.forEachInt( - (offset) -> - { - final long clusterSessionId; - switch (count.getAndIncrement()) + for (int i = 0; i < serviceCount; i++) + { + final IntArrayList offsets = pendingMessageOffsets[i]; + offsets.forEachInt( + (offset) -> { - case 0: - clusterSessionId = clusterSessionIdLowerBound; - break; - - case 1: - clusterSessionId = clusterSessionIdUpperBound; - break; - - default: - clusterSessionId = ThreadLocalRandom.current() - .nextLong(clusterSessionIdLowerBound + 1, clusterSessionIdUpperBound); - break; - } - - sessionMessageHeaderEncoder - .wrapAndApplyHeader(snapshotBuffer, offset, messageHeaderEncoder) - .clusterSessionId(clusterSessionId); - }); + final long clusterSessionId = switch (count.getAndIncrement()) + { + case 0 -> clusterSessionIdLowerBound; + case 1 -> clusterSessionIdUpperBound; + default -> ThreadLocalRandom.current().nextLong( + clusterSessionIdLowerBound + 1, clusterSessionIdUpperBound); + }; + + final long originalClusterSessionId = sessionMessageHeaderDecoder + .wrapAndApplyHeader(snapshotBuffer, offset, messageHeaderDecoder) + .clusterSessionId(); + final int serviceId = serviceId(originalClusterSessionId); + final long newClusterSessionId = serviceSessionId(serviceId, clusterSessionId); + + sessionMessageHeaderEncoder + .wrapAndApplyHeader(snapshotBuffer, offset, messageHeaderEncoder) + .clusterSessionId(newClusterSessionId); + }); + } } finally {