Skip to content

Commit 49340ef

Browse files
authored
fix: await shutdown in in-process mode (#445)
Signed-off-by: Todd Baert <[email protected]>
1 parent 3b5092f commit 49340ef

File tree

13 files changed

+104
-50
lines changed

13 files changed

+104
-50
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

+5-3
Original file line numberDiff line numberDiff line change
@@ -89,14 +89,16 @@ public void shutdown() throws Exception {
8989
}
9090

9191
try {
92-
if (this.channel != null) {
92+
if (this.channel != null && !this.channel.isShutdown()) {
9393
this.channel.shutdown();
94-
this.channel.awaitTermination(5, TimeUnit.SECONDS);
94+
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
9595
}
9696
} finally {
9797
this.cache.clear();
98-
if (this.channel != null) {
98+
if (this.channel != null && !this.channel.isShutdown()) {
9999
this.channel.shutdownNow();
100+
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
101+
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
100102
}
101103
}
102104
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/InProcessResolver.java

+3-2
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,9 @@ public void init() throws Exception {
8686

8787
/**
8888
* Shutdown in-process resolver.
89-
*/
90-
public void shutdown() {
89+
* @throws InterruptedException if stream can't be closed within deadline.
90+
*/
91+
public void shutdown() throws InterruptedException {
9192
flagStore.shutdown();
9293
this.connected.set(false);
9394
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStore.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,9 @@ public void init() {
5656

5757
/**
5858
* Shutdown storage layer.
59+
* @throws InterruptedException if stream can't be closed within deadline.
5960
*/
60-
public void shutdown() {
61+
public void shutdown() throws InterruptedException {
6162
if (shutdown.getAndSet(true)) {
6263
return;
6364
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/Storage.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
public interface Storage {
1111
void init();
1212

13-
void shutdown();
13+
void shutdown() throws InterruptedException;
1414

1515
FeatureFlag getFlag(final String key);
1616

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/Connector.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,5 @@ public interface Connector {
1111

1212
BlockingQueue<StreamPayload> getStream();
1313

14-
void shutdown();
14+
void shutdown() throws InterruptedException;
1515
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

+28-8
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,12 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;
22

3+
import java.util.Random;
4+
import java.util.concurrent.BlockingQueue;
5+
import java.util.concurrent.LinkedBlockingQueue;
6+
import java.util.concurrent.TimeUnit;
7+
import java.util.concurrent.atomic.AtomicBoolean;
8+
import java.util.logging.Level;
9+
310
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
411
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
512
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.Connector;
@@ -11,12 +18,6 @@
1118
import io.grpc.ManagedChannel;
1219
import lombok.extern.java.Log;
1320

14-
import java.util.Random;
15-
import java.util.concurrent.BlockingQueue;
16-
import java.util.concurrent.LinkedBlockingQueue;
17-
import java.util.concurrent.atomic.AtomicBoolean;
18-
import java.util.logging.Level;
19-
2021
/**
2122
* Implements the {@link Connector} contract and emit flags obtained from flagd sync gRPC contract.
2223
*/
@@ -36,10 +37,17 @@ public class GrpcStreamConnector implements Connector {
3637

3738
private final ManagedChannel channel;
3839
private final FlagSyncServiceGrpc.FlagSyncServiceStub serviceStub;
40+
private final int deadline;
3941

42+
/**
43+
* Construct a new GrpcStreamConnector.
44+
*
45+
* @param options flagd options
46+
*/
4047
public GrpcStreamConnector(final FlagdOptions options) {
4148
channel = ChannelBuilder.nettyChannel(options);
4249
serviceStub = FlagSyncServiceGrpc.newStub(channel);
50+
this.deadline = options.getDeadline();
4351
}
4452

4553
/**
@@ -67,13 +75,25 @@ public BlockingQueue<StreamPayload> getStream() {
6775

6876
/**
6977
* Shutdown gRPC stream connector.
78+
* @throws InterruptedException if stream can't be closed within deadline.
7079
*/
71-
public void shutdown() {
80+
public void shutdown() throws InterruptedException {
7281
if (shutdown.getAndSet(true)) {
7382
return;
7483
}
7584

76-
channel.shutdown();
85+
try {
86+
if (this.channel != null && !this.channel.isShutdown()) {
87+
this.channel.shutdown();
88+
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
89+
}
90+
} finally {
91+
if (this.channel != null && !this.channel.isShutdown()) {
92+
this.channel.shutdownNow();
93+
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
94+
log.warning(String.format("Unable to shut down channel by %d deadline", this.deadline));
95+
}
96+
}
7797
}
7898

7999
/**

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdInProcessCucumberTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.e2e;
22

3+
import org.junit.jupiter.api.Order;
34
import org.junit.platform.suite.api.ConfigurationParameter;
45
import org.junit.platform.suite.api.IncludeEngines;
56
import org.junit.platform.suite.api.SelectClasspathResource;
@@ -8,6 +9,7 @@
89
import static io.cucumber.junit.platform.engine.Constants.PLUGIN_PROPERTY_NAME;
910
import static io.cucumber.junit.platform.engine.Constants.GLUE_PROPERTY_NAME;
1011

12+
@Order(value = Integer.MAX_VALUE)
1113
@Suite
1214
@IncludeEngines("cucumber")
1315
@SelectClasspathResource("features/evaluation.feature")

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunFlagdRpcCucumberTest.java

+2
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
package dev.openfeature.contrib.providers.flagd.e2e;
22

3+
import org.apache.logging.log4j.core.config.Order;
34
import org.junit.platform.suite.api.ConfigurationParameter;
45
import org.junit.platform.suite.api.IncludeEngines;
56
import org.junit.platform.suite.api.SelectClasspathResource;
@@ -8,6 +9,7 @@
89
import static io.cucumber.junit.platform.engine.Constants.PLUGIN_PROPERTY_NAME;
910
import static io.cucumber.junit.platform.engine.Constants.GLUE_PROPERTY_NAME;
1011

12+
@Order(value = Integer.MAX_VALUE)
1113
@Suite
1214
@IncludeEngines("cucumber")
1315
@SelectClasspathResource("features/evaluation.feature")
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,29 @@
11
package dev.openfeature.contrib.providers.flagd.e2e.process;
22

3+
import org.junit.jupiter.api.Order;
4+
import org.junit.jupiter.api.parallel.Isolated;
5+
36
import dev.openfeature.contrib.providers.flagd.Config;
47
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
58
import dev.openfeature.contrib.providers.flagd.FlagdProvider;
69
import dev.openfeature.contrib.providers.flagd.e2e.steps.StepDefinitions;
7-
import dev.openfeature.sdk.Client;
8-
import dev.openfeature.sdk.OpenFeatureAPI;
10+
import dev.openfeature.sdk.FeatureProvider;
911
import io.cucumber.java.BeforeAll;
1012

13+
@Isolated()
14+
@Order(value = Integer.MAX_VALUE)
1115
public class FlagdInProcessSetup {
16+
17+
private static FeatureProvider provider;
1218

1319
@BeforeAll()
1420
public static void setup() throws InterruptedException {
15-
FlagdProvider provider = new FlagdProvider(FlagdOptions.builder()
21+
FlagdInProcessSetup.provider = new FlagdProvider(FlagdOptions.builder()
1622
.resolverType(Config.Evaluator.IN_PROCESS)
23+
.deadline(3000)
1724
.host("localhost")
1825
.port(9090)
1926
.build());
20-
OpenFeatureAPI.getInstance().setProviderAndWait("process", provider);
21-
Client client = OpenFeatureAPI.getInstance().getClient("process");
22-
StepDefinitions.setClient(client);
27+
StepDefinitions.setProvider(provider);
2328
}
2429
}
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,28 @@
11
package dev.openfeature.contrib.providers.flagd.e2e.rpc;
22

3+
import org.junit.jupiter.api.Order;
4+
import org.junit.jupiter.api.parallel.Isolated;
5+
36
import dev.openfeature.contrib.providers.flagd.Config;
47
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
58
import dev.openfeature.contrib.providers.flagd.FlagdProvider;
69
import dev.openfeature.contrib.providers.flagd.e2e.steps.StepDefinitions;
7-
import dev.openfeature.sdk.Client;
8-
import dev.openfeature.sdk.OpenFeatureAPI;
10+
import dev.openfeature.sdk.FeatureProvider;
911
import io.cucumber.java.BeforeAll;
1012

11-
13+
@Isolated()
14+
@Order(value = Integer.MAX_VALUE)
1215
public class FlagdRpcSetup {
1316

17+
private static FeatureProvider provider;
18+
1419
@BeforeAll()
1520
public static void setup() {
16-
FlagdProvider provider = new FlagdProvider(FlagdOptions.builder()
21+
FlagdRpcSetup.provider = new FlagdProvider(FlagdOptions.builder()
1722
.resolverType(Config.Evaluator.RPC)
1823
// set a generous deadline, to prevent timeouts in actions
1924
.deadline(3000)
2025
.build());
21-
OpenFeatureAPI.getInstance().setProvider("rpc", provider);
22-
Client client = OpenFeatureAPI.getInstance().getClient("rpc");
23-
StepDefinitions.setClient(client);
26+
StepDefinitions.setProvider(provider);
2427
}
2528
}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/StepDefinitions.java

+24-7
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,22 @@
44

55
import java.util.HashMap;
66
import java.util.Map;
7-
import java.util.concurrent.locks.ReentrantReadWriteLock;
7+
8+
import org.junit.jupiter.api.BeforeEach;
9+
import org.junit.jupiter.api.Order;
10+
import org.junit.jupiter.api.parallel.Isolated;
811

912
import dev.openfeature.sdk.Client;
1013
import dev.openfeature.sdk.EvaluationContext;
14+
import dev.openfeature.sdk.FeatureProvider;
1115
import dev.openfeature.sdk.FlagEvaluationDetails;
1216
import dev.openfeature.sdk.ImmutableContext;
1317
import dev.openfeature.sdk.ImmutableStructure;
18+
import dev.openfeature.sdk.OpenFeatureAPI;
1419
import dev.openfeature.sdk.Reason;
1520
import dev.openfeature.sdk.Structure;
1621
import dev.openfeature.sdk.Value;
17-
import io.cucumber.java.BeforeAll;
22+
import io.cucumber.java.AfterAll;
1823
import io.cucumber.java.en.And;
1924
import io.cucumber.java.en.Given;
2025
import io.cucumber.java.en.Then;
@@ -23,10 +28,12 @@
2328
/**
2429
* Common test suite used by both RPC and in-process flagd providers.
2530
*/
31+
@Isolated()
32+
@Order(value = Integer.MAX_VALUE)
2633
public class StepDefinitions {
2734

28-
private static final ReentrantReadWriteLock sync = new ReentrantReadWriteLock();
2935
private static Client client;
36+
private static FeatureProvider provider;
3037

3138
private String booleanFlagKey;
3239
private String stringFlagKey;
@@ -63,15 +70,25 @@ public class StepDefinitions {
6370
*
6471
* @param client client to inject into test.
6572
*/
66-
public static void setClient(Client client) {
67-
StepDefinitions.client = client;
73+
public static void setProvider(FeatureProvider provider) {
74+
StepDefinitions.provider = provider;
6875
}
6976

70-
@BeforeAll()
77+
@BeforeEach()
7178
@Given("a provider is registered")
7279
@Given("a flagd provider is set")
7380
public static void setup() {
74-
// this is handled by the "Setup" files
81+
if (StepDefinitions.client == null) {
82+
OpenFeatureAPI.getInstance().setProviderAndWait("e2e", provider);
83+
StepDefinitions.client = OpenFeatureAPI.getInstance().getClient("e2e");
84+
}
85+
}
86+
87+
@AfterAll()
88+
public static void cleanUp() throws InterruptedException {
89+
StepDefinitions.provider.shutdown();
90+
StepDefinitions.provider = null;
91+
StepDefinitions.client = null;
7592
}
7693

7794
/*

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/FlagStoreTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
class FlagStoreTest {
1919

2020
@Test
21-
public void connectorHandling() {
21+
public void connectorHandling() throws InterruptedException {
2222
final int maxDelay = 500;
2323

2424
final BlockingQueue<StreamPayload> payload = new LinkedBlockingQueue<>();

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java

+14-13
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,5 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.grpc;
22

3-
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
4-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
5-
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
6-
import dev.openfeature.flagd.sync.FlagSyncServiceGrpc;
7-
import dev.openfeature.flagd.sync.SyncService;
8-
import org.junit.jupiter.api.Test;
9-
import org.mockito.Mockito;
10-
11-
import java.lang.reflect.Field;
12-
import java.time.Duration;
13-
import java.util.concurrent.BlockingQueue;
14-
import java.util.concurrent.TimeUnit;
15-
163
import static org.junit.jupiter.api.Assertions.assertEquals;
174
import static org.junit.jupiter.api.Assertions.assertNotNull;
185
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -21,6 +8,20 @@
218
import static org.mockito.Mockito.times;
229
import static org.mockito.Mockito.verify;
2310

11+
import java.lang.reflect.Field;
12+
import java.time.Duration;
13+
import java.util.concurrent.BlockingQueue;
14+
import java.util.concurrent.TimeUnit;
15+
16+
import org.junit.jupiter.api.Test;
17+
import org.mockito.Mockito;
18+
19+
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
20+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayload;
21+
import dev.openfeature.contrib.providers.flagd.resolver.process.storage.connector.StreamPayloadType;
22+
import dev.openfeature.flagd.sync.FlagSyncServiceGrpc;
23+
import dev.openfeature.flagd.sync.SyncService;
24+
2425
class GrpcStreamConnectorTest {
2526

2627
private static final Duration MAX_WAIT_MS = Duration.ofMillis(500);

0 commit comments

Comments
 (0)