Skip to content

Commit d3de874

Browse files
UtkarshSharma2612utkarshtoddbaert
authored
feat: emit changed flags in configuration change event (#925)
Signed-off-by: utkarsh <[email protected]> Signed-off-by: Todd Baert <[email protected]> Co-authored-by: utkarsh <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent f53b898 commit d3de874

18 files changed

+279
-144
lines changed

providers/flagd/lombok.config

+5
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# This file is needed to avoid errors throw by findbugs when working with lombok.
2+
lombok.addSuppressWarnings = true
3+
lombok.addLombokGeneratedAnnotation = true
4+
config.stopBubbling = true
5+
lombok.extern.findbugs.addSuppressFBWarnings = true

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+8-7
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,14 @@
66
import dev.openfeature.contrib.providers.flagd.resolver.process.InProcessResolver;
77
import dev.openfeature.sdk.EvaluationContext;
88
import dev.openfeature.sdk.EventProvider;
9-
import dev.openfeature.sdk.FeatureProvider;
109
import dev.openfeature.sdk.Metadata;
1110
import dev.openfeature.sdk.ProviderEvaluation;
1211
import dev.openfeature.sdk.ProviderEventDetails;
1312
import dev.openfeature.sdk.ProviderState;
1413
import dev.openfeature.sdk.Value;
1514
import lombok.extern.slf4j.Slf4j;
1615

16+
import java.util.List;
1717
import java.util.concurrent.locks.Lock;
1818
import java.util.concurrent.locks.ReadWriteLock;
1919
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -23,7 +23,7 @@
2323
*/
2424
@Slf4j
2525
@SuppressWarnings({"PMD.TooManyStaticImports", "checkstyle:NoFinalizer"})
26-
public class FlagdProvider extends EventProvider implements FeatureProvider {
26+
public class FlagdProvider extends EventProvider {
2727
private static final String FLAGD_PROVIDER = "flagD Provider";
2828

2929
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@@ -142,7 +142,7 @@ private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
142142
return clientCallCtx;
143143
}
144144

145-
private void setState(ProviderState newState) {
145+
private void setState(ProviderState newState, List<String> changedFlagsKeys) {
146146
ProviderState oldState;
147147
Lock l = this.lock.writeLock();
148148
try {
@@ -152,17 +152,17 @@ private void setState(ProviderState newState) {
152152
} finally {
153153
l.unlock();
154154
}
155-
this.handleStateTransition(oldState, newState);
155+
this.handleStateTransition(oldState, newState, changedFlagsKeys);
156156
}
157157

158-
private void handleStateTransition(ProviderState oldState, ProviderState newState) {
158+
private void handleStateTransition(ProviderState oldState, ProviderState newState, List<String> changedFlagKeys) {
159159
// we got initialized
160160
if (ProviderState.NOT_READY.equals(oldState) && ProviderState.READY.equals(newState)) {
161161
// nothing to do, the SDK emits the events
162162
log.debug("Init completed");
163163
return;
164164
}
165-
// we got shutdown, not checking oldState as behavior remains the same for shutdown
165+
// we got shutdown, not checking oldState as behavior remains the same for shutdown
166166
if (ProviderState.NOT_READY.equals(newState)) {
167167
// nothing to do
168168
log.debug("shutdown completed");
@@ -171,7 +171,8 @@ private void handleStateTransition(ProviderState oldState, ProviderState newStat
171171
// configuration changed
172172
if (ProviderState.READY.equals(oldState) && ProviderState.READY.equals(newState)) {
173173
log.debug("Configuration changed");
174-
ProviderEventDetails details = ProviderEventDetails.builder().message("configuration changed").build();
174+
ProviderEventDetails details = ProviderEventDetails.builder().flagsChanged(changedFlagKeys)
175+
.message("configuration changed").build();
175176
this.emitProviderConfigurationChanged(details);
176177
return;
177178
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

+28-19
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@
88
import io.grpc.stub.StreamObserver;
99
import lombok.extern.slf4j.Slf4j;
1010

11+
import java.util.ArrayList;
12+
import java.util.Collections;
13+
import java.util.List;
1114
import java.util.Map;
12-
import java.util.function.Consumer;
15+
import java.util.function.BiConsumer;
1316

1417
/**
1518
* EventStreamObserver handles events emitted by flagd.
1619
*/
1720
@Slf4j
1821
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
1922
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
20-
private final Consumer<ProviderState> stateConsumer;
23+
private final BiConsumer<ProviderState, List<String>> stateConsumer;
2124
private final Object sync;
2225
private final Cache cache;
2326

@@ -28,11 +31,11 @@ class EventStreamObserver implements StreamObserver<EventStreamResponse> {
2831
/**
2932
* Create a gRPC stream that get notified about flag changes.
3033
*
31-
* @param sync synchronization object from caller
32-
* @param cache cache to update
33-
* @param stateConsumer lambda to call for setting the state
34+
* @param sync synchronization object from caller
35+
* @param cache cache to update
36+
* @param stateConsumer lambda to call for setting the state
3437
*/
35-
EventStreamObserver(Object sync, Cache cache, Consumer<ProviderState> stateConsumer) {
38+
EventStreamObserver(Object sync, Cache cache, BiConsumer<ProviderState, List<String>> stateConsumer) {
3639
this.sync = sync;
3740
this.cache = cache;
3841
this.stateConsumer = stateConsumer;
@@ -58,7 +61,7 @@ public void onError(Throwable t) {
5861
if (this.cache.getEnabled()) {
5962
this.cache.clear();
6063
}
61-
this.stateConsumer.accept(ProviderState.ERROR);
64+
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());
6265

6366
// handle last call of this stream
6467
handleEndOfStream();
@@ -69,32 +72,38 @@ public void onCompleted() {
6972
if (this.cache.getEnabled()) {
7073
this.cache.clear();
7174
}
72-
this.stateConsumer.accept(ProviderState.ERROR);
75+
this.stateConsumer.accept(ProviderState.ERROR, Collections.emptyList());
7376

7477
// handle last call of this stream
7578
handleEndOfStream();
7679
}
7780

7881
private void handleConfigurationChangeEvent(EventStreamResponse value) {
79-
this.stateConsumer.accept(ProviderState.READY);
80-
if (!this.cache.getEnabled()) {
81-
return;
82-
}
82+
List<String> changedFlags = new ArrayList<>();
83+
boolean cachingEnabled = this.cache.getEnabled();
84+
8385
Map<String, Value> data = value.getData().getFieldsMap();
8486
Value flagsValue = data.get(FLAGS_KEY);
8587
if (flagsValue == null) {
86-
this.cache.clear();
87-
return;
88+
if (cachingEnabled) {
89+
this.cache.clear();
90+
}
91+
} else {
92+
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
93+
this.cache.getEnabled();
94+
for (String flagKey : flags.keySet()) {
95+
changedFlags.add(flagKey);
96+
if (cachingEnabled) {
97+
this.cache.remove(flagKey);
98+
}
99+
}
88100
}
89101

90-
Map<String, Value> flags = flagsValue.getStructValue().getFieldsMap();
91-
for (String flagKey : flags.keySet()) {
92-
this.cache.remove(flagKey);
93-
}
102+
this.stateConsumer.accept(ProviderState.READY, changedFlags);
94103
}
95104

96105
private void handleProviderReadyEvent() {
97-
this.stateConsumer.accept(ProviderState.READY);
106+
this.stateConsumer.accept(ProviderState.READY, Collections.emptyList());
98107
if (this.cache.getEnabled()) {
99108
this.cache.clear();
100109
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

+22-15
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
package dev.openfeature.contrib.providers.flagd.resolver.grpc;
22

3+
import java.util.Collections;
4+
import java.util.List;
35
import java.util.Random;
46
import java.util.concurrent.TimeUnit;
57
import java.util.concurrent.atomic.AtomicBoolean;
6-
import java.util.function.Consumer;
7-
8+
import java.util.function.BiConsumer;
89
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
910
import dev.openfeature.contrib.providers.flagd.resolver.common.ChannelBuilder;
1011
import dev.openfeature.contrib.providers.flagd.resolver.common.Util;
@@ -37,7 +38,7 @@ public class GrpcConnector {
3738
private final long deadline;
3839

3940
private final Cache cache;
40-
private final Consumer<ProviderState> stateConsumer;
41+
private final BiConsumer<ProviderState, List<String>> stateConsumer;
4142

4243
private int eventStreamAttempt = 1;
4344
private int eventStreamRetryBackoff;
@@ -52,7 +53,8 @@ public class GrpcConnector {
5253
* @param cache cache to use.
5354
* @param stateConsumer lambda to call for setting the state.
5455
*/
55-
public GrpcConnector(final FlagdOptions options, final Cache cache, Consumer<ProviderState> stateConsumer) {
56+
public GrpcConnector(final FlagdOptions options, final Cache cache,
57+
BiConsumer<ProviderState, List<String>> stateConsumer) {
5658
this.channel = ChannelBuilder.nettyChannel(options);
5759
this.serviceStub = ServiceGrpc.newStub(channel);
5860
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
@@ -80,7 +82,8 @@ public void initialize() throws Exception {
8082
/**
8183
* Shuts down all gRPC resources.
8284
*
83-
* @throws Exception is something goes wrong while terminating the communication.
85+
* @throws Exception is something goes wrong while terminating the
86+
* communication.
8487
*/
8588
public void shutdown() throws Exception {
8689
// first shutdown the event listener
@@ -100,7 +103,7 @@ public void shutdown() throws Exception {
100103
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
101104
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
102105
}
103-
this.stateConsumer.accept(ProviderState.NOT_READY);
106+
this.stateConsumer.accept(ProviderState.NOT_READY, Collections.emptyList());
104107
}
105108
}
106109

@@ -114,21 +117,24 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
114117
}
115118

116119
/**
117-
* Event stream observer logic. This contains blocking mechanisms, hence must be run in a dedicated thread.
120+
* Event stream observer logic. This contains blocking mechanisms, hence must be
121+
* run in a dedicated thread.
118122
*/
119123
private void observeEventStream() {
120124
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
121-
final StreamObserver<EventStreamResponse> responseObserver =
122-
new EventStreamObserver(sync, this.cache, this::grpcStateConsumer);
125+
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
126+
this::grpcStateConsumer);
123127
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
124128

125129
try {
126130
synchronized (sync) {
127131
sync.wait();
128132
}
129133
} catch (InterruptedException e) {
130-
// Interruptions are considered end calls for this observer, hence log and return
131-
// Note - this is the most common interruption when shutdown, hence the log level debug
134+
// Interruptions are considered end calls for this observer, hence log and
135+
// return
136+
// Note - this is the most common interruption when shutdown, hence the log
137+
// level debug
132138
log.debug("interruption while waiting for condition", e);
133139
Thread.currentThread().interrupt();
134140
}
@@ -140,17 +146,18 @@ private void observeEventStream() {
140146
try {
141147
Thread.sleep(this.eventStreamRetryBackoff);
142148
} catch (InterruptedException e) {
143-
// Interruptions are considered end calls for this observer, hence log and return
149+
// Interruptions are considered end calls for this observer, hence log and
150+
// return
144151
log.warn("interrupted while restarting gRPC Event Stream");
145152
Thread.currentThread().interrupt();
146153
}
147154
}
148155

149156
log.error("failed to connect to event stream, exhausted retries");
150-
this.grpcStateConsumer(ProviderState.ERROR);
157+
this.grpcStateConsumer(ProviderState.ERROR, null);
151158
}
152159

153-
private void grpcStateConsumer(final ProviderState state) {
160+
private void grpcStateConsumer(final ProviderState state, final List<String> changedFlags) {
154161
// check for readiness
155162
if (ProviderState.READY.equals(state)) {
156163
this.eventStreamAttempt = 1;
@@ -162,6 +169,6 @@ private void grpcStateConsumer(final ProviderState state) {
162169
}
163170

164171
// chain to initiator
165-
this.stateConsumer.accept(state);
172+
this.stateConsumer.accept(state, changedFlags);
166173
}
167174
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcResolver.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
8-
import java.util.function.Consumer;
8+
import java.util.function.BiConsumer;
99
import java.util.function.Function;
1010
import java.util.function.Supplier;
1111
import java.util.stream.Collectors;
@@ -62,7 +62,7 @@ public final class GrpcResolver implements Resolver {
6262
* @param stateConsumer lambda to communicate back the state.
6363
*/
6464
public GrpcResolver(final FlagdOptions options, final Cache cache, final Supplier<ProviderState> stateSupplier,
65-
final Consumer<ProviderState> stateConsumer) {
65+
final BiConsumer<ProviderState,List<String>> stateConsumer) {
6666
this.cache = cache;
6767
this.stateSupplier = stateSupplier;
6868

0 commit comments

Comments
 (0)