From 51f04cbc9bf852a3ed9b14ef9904083018172bfa Mon Sep 17 00:00:00 2001 From: Bernd Warmuth Date: Tue, 7 Jan 2025 12:06:17 +0100 Subject: [PATCH] cleanup Signed-off-by: Bernd Warmuth --- .../flagd/resolver/common/ChannelMonitor.java | 77 ------------------- .../resolver/grpc/EventStreamObserver.java | 1 - .../e2e/RunFlagdRpcReconnectCucumberTest.java | 9 +-- .../grpc/EventStreamObserverTest.java | 12 --- 4 files changed, 3 insertions(+), 96 deletions(-) diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java index 2b59bd6ffd..0878ce9107 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelMonitor.java @@ -4,10 +4,7 @@ import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import lombok.extern.slf4j.Slf4j; /** @@ -98,78 +95,4 @@ private static void waitForDesiredState( "Deadline exceeded. Condition did not complete within the %d " + "deadline", timeout)); } } - - /** - * Polls the state of a gRPC channel at regular intervals and triggers callbacks upon state changes. - * - * @param executor the ScheduledExecutorService used for polling. - * @param channel the ManagedChannel to monitor. - * @param onConnectionReady callback invoked when the channel transitions to a READY state. - * @param onConnectionLost callback invoked when the channel transitions to a FAILURE or SHUTDOWN state. - * @param pollIntervalMs the polling interval in milliseconds. - */ - public static void pollChannelState( - ScheduledExecutorService executor, - ManagedChannel channel, - Runnable onConnectionReady, - Runnable onConnectionLost, - long pollIntervalMs) { - - AtomicReference lastState = new AtomicReference<>(ConnectivityState.READY); - - Runnable pollTask = () -> { - ConnectivityState currentState = channel.getState(true); - if (currentState != lastState.get()) { - if (currentState == ConnectivityState.READY) { - log.debug("gRPC connection became READY"); - onConnectionReady.run(); - } else if (currentState == ConnectivityState.TRANSIENT_FAILURE - || currentState == ConnectivityState.SHUTDOWN) { - log.debug("gRPC connection became TRANSIENT_FAILURE"); - onConnectionLost.run(); - } - lastState.set(currentState); - } - }; - executor.scheduleAtFixedRate(pollTask, 0, pollIntervalMs, TimeUnit.MILLISECONDS); - } - - /** - * Polls the channel state at fixed intervals and waits for the channel to reach a desired state within a timeout - * period. - * - * @param executor the ScheduledExecutorService used for polling. - * @param channel the ManagedChannel to monitor. - * @param desiredState the ConnectivityState to wait for. - * @param connectCallback callback invoked when the desired state is reached. - * @param timeout the maximum amount of time to wait. - * @param unit the time unit of the timeout. - * @return {@code true} if the desired state was reached within the timeout period, {@code false} otherwise. - * @throws InterruptedException if the current thread is interrupted while waiting. - */ - public static boolean pollForDesiredState( - ScheduledExecutorService executor, - ManagedChannel channel, - ConnectivityState desiredState, - Runnable connectCallback, - long timeout, - TimeUnit unit) - throws InterruptedException { - CountDownLatch latch = new CountDownLatch(1); - - Runnable waitForStateTask = () -> { - ConnectivityState currentState = channel.getState(true); - if (currentState == desiredState) { - connectCallback.run(); - latch.countDown(); - } - }; - - ScheduledFuture scheduledFuture = - executor.scheduleWithFixedDelay(waitForStateTask, 0, 100, TimeUnit.MILLISECONDS); - - boolean success = latch.await(timeout, unit); - scheduledFuture.cancel(true); - return success; - } } diff --git a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java index 504b762715..db89931e5e 100644 --- a/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java +++ b/providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java @@ -115,7 +115,6 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) { * Handles provider readiness events by clearing the cache (if enabled) and notifying listeners of readiness. */ private void handleProviderReadyEvent() { - this.onConnectionEvent.accept(true, Collections.emptyList()); // TODO: check if this is needed if (this.cache.getEnabled().equals(Boolean.TRUE)) { this.cache.clear(); } diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcReconnectCucumberTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcReconnectCucumberTest.java index c0baf47bb7..436ebf00fa 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcReconnectCucumberTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcReconnectCucumberTest.java @@ -6,18 +6,15 @@ import org.apache.logging.log4j.core.config.Order; import org.junit.platform.suite.api.ConfigurationParameter; import org.junit.platform.suite.api.IncludeEngines; -import org.junit.platform.suite.api.SelectClasspathResource; +import org.junit.platform.suite.api.SelectFile; import org.junit.platform.suite.api.Suite; import org.testcontainers.junit.jupiter.Testcontainers; -/** - * Class for running the reconnection tests for the RPC provider - */ +/** Class for running the reconnection tests for the RPC provider */ @Order(value = Integer.MAX_VALUE) @Suite @IncludeEngines("cucumber") -@SelectClasspathResource("features/flagd-reconnect.feature") -@SelectClasspathResource("features/events.feature") +@SelectFile("test-harness/gherkin/flagd-reconnect.feature") @ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty") @ConfigurationParameter( key = GLUE_PROPERTY_NAME, diff --git a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java index f794ddccad..9370f821a7 100644 --- a/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java +++ b/providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserverTest.java @@ -56,18 +56,6 @@ public void change() { verify(cache, atLeast(1)).clear(); } - @Test - public void ready() { - EventStreamResponse resp = mock(EventStreamResponse.class); - when(resp.getType()).thenReturn("provider_ready"); - stream.onNext(resp); - // we notify that we are ready - assertEquals(1, states.size()); - assertTrue(states.get(0)); - // cache was cleaned - verify(cache, atLeast(1)).clear(); - } - @Test public void cacheBustingForKnownKeys() { final String key1 = "myKey1";