Skip to content

Commit

Permalink
fix test code
Browse files Browse the repository at this point in the history
  • Loading branch information
thetumbled committed May 8, 2024
1 parent 845122d commit 3e2e3a4
Showing 1 changed file with 6 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1095,20 +1095,20 @@ public void testCancelTxnTimeout() throws Exception{
}

@Test
public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot() throws Exception {
public void testNotChangeMaxReadPositionCountWhenCheckIfNoSnapshot() throws Exception {
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0)
.getBrokerService()
.getTopic(NAMESPACE1 + "/changeMaxReadPositionAndAddAbortTimes" + UUID.randomUUID(), true)
.getTopic(NAMESPACE1 + "/changeMaxReadPositionCount" + UUID.randomUUID(), true)
.get().get();
TransactionBuffer buffer = persistentTopic.getTransactionBuffer();
Field processorField = TopicTransactionBuffer.class.getDeclaredField("snapshotAbortedTxnProcessor");
processorField.setAccessible(true);

AbortedTxnProcessor abortedTxnProcessor = (AbortedTxnProcessor) processorField.get(buffer);
Field changeTimeField = TopicTransactionBuffer
.class.getDeclaredField("changeMaxReadPositionAndAddAbortTimes");
.class.getDeclaredField("changeMaxReadPositionCount");
changeTimeField.setAccessible(true);
AtomicLong changeMaxReadPositionAndAddAbortTimes = (AtomicLong) changeTimeField.get(buffer);
AtomicLong changeMaxReadPositionCount = (AtomicLong) changeTimeField.get(buffer);

Field field1 = TopicTransactionBufferState.class.getDeclaredField("state");
field1.setAccessible(true);
Expand All @@ -1117,10 +1117,10 @@ public void testNotChangeMaxReadPositionAndAddAbortTimesWhenCheckIfNoSnapshot()
TopicTransactionBufferState.State state = (TopicTransactionBufferState.State) field1.get(buffer);
Assert.assertEquals(state, TopicTransactionBufferState.State.NoSnapshot);
});
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

buffer.syncMaxReadPositionForNormalPublish(new PositionImpl(1, 1));
Assert.assertEquals(changeMaxReadPositionAndAddAbortTimes.get(), 0L);
Assert.assertEquals(changeMaxReadPositionCount.get(), 0L);

}

Expand Down

0 comments on commit 3e2e3a4

Please sign in to comment.