From 303933e5af036da3a0b8599b515f16695dfdec22 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Mon, 17 Feb 2025 13:53:23 +0200 Subject: [PATCH 1/2] [improve][meta] Stop election operations when LeaderElectionImpl gets closed --- .../coordination/impl/LeaderElectionImpl.java | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java index ab35eb7040c10..58277873658d5 100644 --- a/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java +++ b/pulsar-metadata/src/main/java/org/apache/pulsar/metadata/coordination/impl/LeaderElectionImpl.java @@ -104,6 +104,9 @@ public synchronized CompletableFuture elect(T proposedValue } private synchronized CompletableFuture elect() { + if (internalState == InternalState.Closed) { + return FutureUtils.exception(new AlreadyClosedException("The leader election was already closed")); + } // First check if there's already a leader elected internalState = InternalState.ElectionInProgress; return store.get(path).thenCompose(optLock -> { @@ -113,6 +116,11 @@ private synchronized CompletableFuture elect() { return tryToBecomeLeader(); } }).thenCompose(leaderElectionState -> { + synchronized (this) { + if (internalState == InternalState.Closed) { + return FutureUtils.exception(new AlreadyClosedException("The leader election was already closed")); + } + } // make sure that the cache contains the current leader // so that getLeaderValueIfPresent works on all brokers cache.refresh(path); @@ -122,6 +130,9 @@ private synchronized CompletableFuture elect() { } private synchronized CompletableFuture handleExistingLeaderValue(GetResult res) { + if (internalState == InternalState.Closed) { + return FutureUtils.exception(new AlreadyClosedException("The leader election was already closed")); + } T existingValue; try { existingValue = serde.deserialize(path, res.getValue(), res.getStat()); @@ -163,6 +174,9 @@ private synchronized CompletableFuture handleExistingLeader } private synchronized void changeState(LeaderElectionState les) { + if (internalState == InternalState.Closed) { + return; + } internalState = InternalState.LeaderIsPresent; if (this.leaderElectionState != les) { this.leaderElectionState = les; @@ -175,6 +189,9 @@ private synchronized void changeState(LeaderElectionState les) { } private synchronized CompletableFuture tryToBecomeLeader() { + if (internalState == InternalState.Closed) { + return FutureUtils.exception(new AlreadyClosedException("The leader election was already closed")); + } T value = proposedValue.get(); byte[] payload; try { @@ -192,6 +209,11 @@ private synchronized CompletableFuture tryToBecomeLeader() cache.get(path) .thenRun(() -> { synchronized (LeaderElectionImpl.this) { + if (internalState == InternalState.Closed) { + result.completeExceptionally(new AlreadyClosedException( + "The leader election was already closed")); + return; + } log.info("Acquired leadership on {} with {}", path, value); internalState = InternalState.LeaderIsPresent; if (leaderElectionState != LeaderElectionState.Leading) { @@ -254,7 +276,6 @@ private synchronized CompletableFuture tryToBecomeLeader() @Override public void close() throws Exception { - updateCachedValueFuture.cancel(true); try { asyncClose().join(); } catch (CompletionException e) { @@ -268,6 +289,7 @@ public synchronized CompletableFuture asyncClose() { return CompletableFuture.completedFuture(null); } + updateCachedValueFuture.cancel(true); internalState = InternalState.Closed; if (leaderElectionState != LeaderElectionState.Leading) { From a62e10a64aca0f7fa6ece050cd4a2b4f44f4bcf9 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Tue, 18 Feb 2025 11:02:14 +0200 Subject: [PATCH 2/2] Don't close LeaderElection in ServiceUnitStateChannelTest --- .../channel/ServiceUnitStateChannelTest.java | 32 ++++++++++--------- 1 file changed, 17 insertions(+), 15 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java index b6e38d4f6956c..4b9d48a00330d 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/channel/ServiceUnitStateChannelTest.java @@ -35,9 +35,7 @@ import static org.apache.pulsar.metadata.api.extended.SessionEvent.Reconnected; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionLost; import static org.apache.pulsar.metadata.api.extended.SessionEvent.SessionReestablished; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.eq; @@ -258,10 +256,18 @@ public void channelOwnerTest() throws Exception { assertEquals(channelOwner1, channelOwner2); LeaderElectionService leaderElectionService1 = (LeaderElectionService) FieldUtils.readDeclaredField( channel1, "leaderElectionService", true); - leaderElectionService1.close(); - waitUntilNewChannelOwner(channel2, channelOwner1); - leaderElectionService1.start(); - waitUntilNewChannelOwner(channel1, channelOwner1); + + // delete the leader node to trigger a new leader election + pulsar1.getLocalMetadataStore().delete("/loadbalance/leader", Optional.of(-1L)).get(); + + // wait for leader to lose leadership + Thread.sleep(500); + + // wait for leader election to happen + Awaitility.await().untilAsserted(() -> { + assertThat(channel1.getChannelOwnerAsync().get()).isPresent(); + assertThat(channel2.getChannelOwnerAsync().get()).isPresent(); + }); var newChannelOwner1 = channel1.getChannelOwnerAsync().get(2, TimeUnit.SECONDS); var newChannelOwner2 = channel2.getChannelOwnerAsync().get(2, TimeUnit.SECONDS); @@ -698,8 +704,7 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { var lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1); assertEquals(SessionReestablished, lastMetadataSessionEvent); - assertThat(lastMetadataSessionEventTimestamp, - greaterThanOrEqualTo(ts)); + assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts); ts = System.currentTimeMillis(); channel1.handleMetadataSessionEvent(SessionLost); @@ -707,8 +712,7 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1); assertEquals(SessionLost, lastMetadataSessionEvent); - assertThat(lastMetadataSessionEventTimestamp, - greaterThanOrEqualTo(ts)); + assertThat(lastMetadataSessionEventTimestamp).isGreaterThanOrEqualTo(ts); ts = System.currentTimeMillis(); channel1.handleMetadataSessionEvent(ConnectionLost); @@ -716,8 +720,7 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1); assertEquals(SessionLost, lastMetadataSessionEvent); - assertThat(lastMetadataSessionEventTimestamp, - lessThanOrEqualTo(ts)); + assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts); ts = System.currentTimeMillis(); channel1.handleMetadataSessionEvent(Reconnected); @@ -725,8 +728,7 @@ public void handleMetadataSessionEventTest() throws IllegalAccessException { lastMetadataSessionEventTimestamp = getLastMetadataSessionEventTimestamp(channel1); assertEquals(SessionLost, lastMetadataSessionEvent); - assertThat(lastMetadataSessionEventTimestamp, - lessThanOrEqualTo(ts)); + assertThat(lastMetadataSessionEventTimestamp).isLessThanOrEqualTo(ts); }