Skip to content

Commit

Permalink
Don't close LeaderElection in ServiceUnitStateChannelTest
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Feb 18, 2025
1 parent 303933e commit a62e10a
Showing 1 changed file with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@
import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost;
import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
Expand Down Expand Up @@ -258,10 +256,18 @@ public void channelOwnerTest() throws Exception {
assertEquals(channelOwner1, channelOwner2);
LeaderElectionService leaderElectionService1 = (LeaderElectionService) FieldUtils.readDeclaredField(
channel1, "leaderElectionService", true);
leaderElectionService1.close();
waitUntilNewChannelOwner(channel2, channelOwner1);
leaderElectionService1.start();
waitUntilNewChannelOwner(channel1, channelOwner1);

// delete the leader node to trigger a new leader election
pulsar1.getLocalMetadataStore().delete("/loadbalance/leader", Optional.of(-1L)).get();

// wait for leader to lose leadership
Thread.sleep(500);

// wait for leader election to happen
Awaitility.await().untilAsserted(() -> {
assertThat(channel1.getChannelOwnerAsync().get()).isPresent();
assertThat(channel2.getChannelOwnerAsync().get()).isPresent();
});

var newChannelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);
var newChannelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS);
Expand Down Expand Up @@ -698,35 +704,31 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException {
var lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

assertEquals(SessionReestablished, lastMetadataSessionEvent);
assertThat(lastMetadataSessionEventTimestamp,
greaterThanOrEqualTo(ts));
assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts);

ts = System.currentTimeMillis();
channel1.handleMetadataSessionEvent(SessionLost);
lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

assertEquals(SessionLost, lastMetadataSessionEvent);
assertThat(lastMetadataSessionEventTimestamp,
greaterThanOrEqualTo(ts));
assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts);

ts = System.currentTimeMillis();
channel1.handleMetadataSessionEvent(ConnectionLost);
lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

assertEquals(SessionLost, lastMetadataSessionEvent);
assertThat(lastMetadataSessionEventTimestamp,
lessThanOrEqualTo(ts));
assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts);

ts = System.currentTimeMillis();
channel1.handleMetadataSessionEvent(Reconnected);
lastMetadataSessionEvent = getLastMetadataSessionEvent(channel1);
lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1);

assertEquals(SessionLost, lastMetadataSessionEvent);
assertThat(lastMetadataSessionEventTimestamp,
lessThanOrEqualTo(ts));
assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts);

}

Expand Down

0 comments on commit a62e10a

Please sign in to comment.