Skip to content

Commit 82db146

Browse files
committed
fixup: fixing rpc mode - in-process still buggy on reconnect
Signed-off-by: Simon Schrottner <[email protected]>
1 parent 0fe3c4f commit 82db146

File tree

14 files changed

+118
-85
lines changed

14 files changed

+118
-85
lines changed

providers/flagd/pom.xml

+5-1
Original file line numberDiff line numberDiff line change
@@ -155,7 +155,11 @@
155155
<version>1.20.4</version>
156156
<scope>test</scope>
157157
</dependency>
158-
158+
<dependency>
159+
<groupId>org.slf4j</groupId>
160+
<artifactId>slf4j-simple</artifactId>
161+
<version>2.0.16</version>
162+
</dependency>
159163
</dependencies>
160164

161165
<dependencyManagement>

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ConnectionEvent.java

+9
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,15 @@ public boolean isConnected() {
114114
return this.connected == ConnectionState.CONNECTED;
115115
}
116116

117+
/**
118+
* Indicates
119+
* whether the current connection state is disconnected.
120+
*
121+
* @return {@code true} if disconnected, otherwise {@code false}.
122+
*/
123+
public boolean isDisconnected() {
124+
return this.connected == ConnectionState.DISCONNECTED;
125+
}
117126
/**
118127
* Indicates
119128
* whether the current connection state is stale.

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/GrpcConnector.java

+2-3
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ public GrpcConnector(
101101
ManagedChannel channel) {
102102

103103
this.channel = channel;
104-
this.serviceStub = stub.apply(channel);
105-
this.blockingStub = blockingStub.apply(channel);
104+
this.serviceStub = stub.apply(channel).withWaitForReady();
105+
this.blockingStub = blockingStub.apply(channel).withWaitForReady();
106106
this.deadline = options.getDeadline();
107107
this.streamDeadlineMs = options.getStreamDeadlineMs();
108108
this.onConnectionEvent = onConnectionEvent;
@@ -190,7 +190,6 @@ private synchronized void onReady() {
190190
log.debug("Reconnection task cancelled as connection became READY.");
191191
}
192192
restartStream();
193-
this.onConnectionEvent.accept(new ConnectionEvent(true));
194193
}
195194

196195
/**
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
22

33
import com.google.protobuf.Value;
4+
import dev.openfeature.contrib.providers.flagd.resolver.common.ConnectionEvent;
45
import dev.openfeature.contrib.providers.flagd.resolver.grpc.cache.Cache;
56
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
67
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
@@ -10,6 +11,7 @@
1011
import java.util.List;
1112
import java.util.Map;
1213
import java.util.function.BiConsumer;
14+
import java.util.function.Consumer;
1315
import lombok.extern.slf4j.Slf4j;
1416

1517
/**
@@ -20,25 +22,18 @@
2022
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
2123
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
2224

23-
/**
24-
* A consumer to handle connection events with a flag indicating success and a list of changed flags.
25-
*/
26-
private final BiConsumer<Boolean, List<String>> onConnectionEvent;
2725

28-
/**
29-
* The cache to update based on received events.
30-
*/
31-
private final Cache cache;
26+
private final Consumer<List<String>> onConfigurationChange;
27+
private final Consumer<ConnectionEvent> onReady;
3228

3329
/**
3430
* Constructs a new {@code EventStreamObserver} instance.
3531
*
36-
* @param cache the cache to update based on received events
3732
* @param onConnectionEvent a consumer to handle connection events with a boolean and a list of changed flags
3833
*/
39-
EventStreamObserver(Cache cache, BiConsumer<Boolean, List<String>> onConnectionEvent) {
40-
this.cache = cache;
41-
this.onConnectionEvent = onConnectionEvent;
34+
EventStreamObserver(Consumer<List<String>> onConfigurationChange, Consumer<ConnectionEvent> onReady) {
35+
this.onConfigurationChange = onConfigurationChange;
36+
this.onReady = onReady;
4237
}
4338

4439
/**
@@ -60,27 +55,14 @@ public void onNext(EventStreamResponse value) {
6055
}
6156
}
6257

63-
/**
64-
* Called when an error occurs in the stream.
65-
*
66-
* @param throwable the error that occurred
67-
*/
6858
@Override
6959
public void onError(Throwable throwable) {
70-
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
71-
this.cache.clear();
72-
}
60+
7361
}
7462

75-
/**
76-
* Called when the stream is completed.
77-
*/
7863
@Override
7964
public void onCompleted() {
80-
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
81-
this.cache.clear();
82-
}
83-
this.onConnectionEvent.accept(false, Collections.emptyList());
65+
8466
}
8567

8668
/**
@@ -90,33 +72,22 @@ public void onCompleted() {
9072
*/
9173
private void handleConfigurationChangeEvent(EventStreamResponse value) {
9274
List<String> changedFlags = new ArrayList<>();
93-
boolean cachingEnabled = this.cache.getEnabled();
9475

9576
Map<String, Value> data = value.getData().getFieldsMap();
9677
Value flagsValue = data.get(Constants.FLAGS_KEY);
97-
if (flagsValue == null) {
98-
if (cachingEnabled) {
99-
this.cache.clear();
100-
}
101-
} else {
78+
if (flagsValue != null) {
10279
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
103-
for (String flagKey : flags.keySet()) {
104-
changedFlags.add(flagKey);
105-
if (cachingEnabled) {
106-
this.cache.remove(flagKey);
107-
}
108-
}
80+
changedFlags.addAll(flags.keySet());
10981
}
11082

111-
this.onConnectionEvent.accept(true, changedFlags);
83+
onConfigurationChange.accept(changedFlags);
11284
}
11385

11486
/**
11587
* Handles provider readiness events by clearing the cache (if enabled) and notifying listeners of readiness.
11688
*/
11789
private void handleProviderReadyEvent() {
118-
if (this.cache.getEnabled().equals(Boolean.TRUE)) {
119-
this.cache.clear();
120-
}
90+
log.info("Received provider ready event");
91+
onReady.accept(new ConnectionEvent(true));
12192
}
12293
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java

+15-5
Original file line numberDiff line numberDiff line change
@@ -67,13 +67,23 @@ public GrpcResolver(
6767
options,
6868
ServiceGrpc::newStub,
6969
ServiceGrpc::newBlockingStub,
70-
onConnectionEvent,
70+
(event) -> {
71+
if( cache != null && event.isDisconnected()) {
72+
cache.clear();
73+
}
74+
onConnectionEvent.accept(event);
75+
},
7176
stub -> stub.eventStream(
7277
Evaluation.EventStreamRequest.getDefaultInstance(),
7378
new EventStreamObserver(
74-
cache,
75-
(k, e) ->
76-
onConnectionEvent.accept(new ConnectionEvent(ConnectionState.CONNECTED, e)))));
79+
(flags) -> {
80+
if( cache != null) {
81+
flags.forEach(cache::remove);
82+
}
83+
onConnectionEvent.accept(new ConnectionEvent(ConnectionState.CONNECTED, flags));
84+
},
85+
onConnectionEvent
86+
)));
7787
}
7888

7989
/**
@@ -207,7 +217,7 @@ private <T> Boolean isEvaluationCacheable(ProviderEvaluation<T> evaluation) {
207217
}
208218

209219
private Boolean cacheAvailable() {
210-
return this.cache.getEnabled() && this.connector.isConnected();
220+
return this.cache.getEnabled();
211221
}
212222

213223
private static ImmutableMetadata metadataFromResponse(Message response) {

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunConfigCucumberTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,5 +18,5 @@
1818
@IncludeEngines("cucumber")
1919
@SelectFile("test-harness/gherkin/config.feature")
2020
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
21-
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps.config")
21+
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
2222
public class RunConfigCucumberTest {}

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunInProcessTest.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
2727
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
2828
@IncludeTags("in-process")
29-
@ExcludeTags({"unixsocket", "customCert"})
29+
@ExcludeTags({"unixsocket", "customCert", "targetURI"})
3030
@Testcontainers
3131
public class RunInProcessTest {
3232

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/RunRpcTest.java

+5-4
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import org.junit.platform.suite.api.ExcludeTags;
1212
import org.junit.platform.suite.api.IncludeEngines;
1313
import org.junit.platform.suite.api.IncludeTags;
14+
import org.junit.platform.suite.api.SelectDirectories;
1415
import org.junit.platform.suite.api.SelectFile;
1516
import org.junit.platform.suite.api.Suite;
1617
import org.testcontainers.junit.jupiter.Testcontainers;
@@ -21,13 +22,13 @@
2122
@Order(value = Integer.MAX_VALUE)
2223
@Suite
2324
@IncludeEngines("cucumber")
24-
// @SelectDirectories("test-harness/gherkin")
25-
@SelectFile("test-harness/gherkin/connection.feature")
25+
@SelectDirectories("test-harness/gherkin")
26+
//@SelectFile("test-harness/gherkin/rpc-caching.feature")
2627
@ConfigurationParameter(key = PLUGIN_PROPERTY_NAME, value = "pretty")
2728
@ConfigurationParameter(key = GLUE_PROPERTY_NAME, value = "dev.openfeature.contrib.providers.flagd.e2e.steps")
2829
@ConfigurationParameter(key = OBJECT_FACTORY_PROPERTY_NAME, value = "io.cucumber.picocontainer.PicoFactory")
29-
@IncludeTags({"rpc", "reconnect"})
30-
@ExcludeTags({"targetURI", "customCert", "unixsocket"})
30+
@IncludeTags({"rpc"})
31+
@ExcludeTags({ "unixsocket", "targetURI"})
3132
@Testcontainers
3233
public class RunRpcTest {
3334

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/ConfigSteps.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import io.cucumber.java.en.Given;
88
import io.cucumber.java.en.Then;
99
import io.cucumber.java.en.When;
10+
import java.io.IOException;
1011
import java.lang.reflect.InvocationTargetException;
1112
import java.lang.reflect.Method;
1213
import java.util.ArrayList;
@@ -57,7 +58,7 @@ public void we_initialize_a_config_for(String string) {
5758

5859
@Given("an option {string} of type {string} with value {string}")
5960
public void we_have_an_option_of_type_with_value(String option, String type, String value)
60-
throws ClassNotFoundException, NoSuchMethodException, InvocationTargetException, IllegalAccessException {
61+
throws Throwable {
6162
if (IGNORED_FOR_NOW.contains(option)) {
6263
LOG.error("option '{}' is not supported", option);
6364
return;

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/steps/FlagSteps.java

+12-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package dev.openfeature.contrib.providers.flagd.e2e.steps;
22

3+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
34
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.awaitility.Awaitility.await;
46

57
import dev.openfeature.contrib.providers.flagd.e2e.State;
68
import dev.openfeature.sdk.FlagEvaluationDetails;
@@ -18,12 +20,12 @@ public FlagSteps(State state) {
1820
}
1921

2022
@Given("a {}-flag with key {string} and a default value {string}")
21-
public void givenAFlag(String type, String name, String defaultValue) throws ClassNotFoundException {
23+
public void givenAFlag(String type, String name, String defaultValue) throws Throwable {
2224
state.flag = new Flag(type, name, Utils.convert(defaultValue, type));
2325
}
2426

2527
@When("the flag was evaluated with details")
26-
public void the_flag_was_evaluated_with_details() {
28+
public void the_flag_was_evaluated_with_details() throws InterruptedException {
2729
FlagEvaluationDetails details;
2830
switch (state.flag.type) {
2931
case "String":
@@ -52,8 +54,8 @@ public void the_flag_was_evaluated_with_details() {
5254
state.evaluation = details;
5355
}
5456

55-
@Then("the resolved details value should be {string}")
56-
public void the_resolved_details_value_should_be(String value) throws ClassNotFoundException {
57+
@Then("the resolved details value should be \"{}\"")
58+
public void the_resolved_details_value_should_be(String value) throws Throwable {
5759
assertThat(state.evaluation.getValue()).isEqualTo(Utils.convert(value, state.flag.type));
5860
}
5961

@@ -66,13 +68,14 @@ public void the_reason_should_be(String reason) {
6668
public void the_variant_should_be(String variant) {
6769
assertThat(state.evaluation.getVariant()).isEqualTo(variant);
6870
}
69-
71+
@Then("the flag should be part of the event payload")
7072
@Then("the flag was modified")
7173
public void the_flag_was_modified() {
72-
assertThat(state.lastEvent).isPresent().hasValueSatisfying((event) -> {
73-
assertThat(event.type).isEqualTo("change");
74-
assertThat(event.details.getFlagsChanged()).contains(state.flag.name);
75-
});
74+
await().atMost(5000, MILLISECONDS)
75+
.until(() -> state.events.stream().anyMatch(event -> event.type.equals("change") && event.details.getFlagsChanged().contains(state.flag.name)));
76+
state.lastEvent = state.events.stream()
77+
.filter(event -> event.type.equals("change") && event.details.getFlagsChanged().contains(state.flag.name))
78+
.findFirst();
7679
}
7780

7881
public class Flag {

0 commit comments

Comments
 (0)