Skip to content

Commit bddf91d

Browse files
committed
feat: expose sync-metadata on provider
Signed-off-by: Todd Baert <[email protected]>
1 parent 9ae4c0c commit bddf91d

20 files changed

+730
-518
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdProvider.java

+21-3
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package dev.openfeature.contrib.providers.flagd;
22

3+
import java.util.Collections;
34
import java.util.List;
5+
import java.util.Map;
46

57
import dev.openfeature.contrib.providers.flagd.resolver.Resolver;
68
import dev.openfeature.contrib.providers.flagd.resolver.grpc.GrpcResolver;
@@ -24,6 +26,7 @@ public class FlagdProvider extends EventProvider {
2426
private final Resolver flagResolver;
2527
private volatile boolean initialized = false;
2628
private volatile boolean connected = false;
29+
private volatile Map<String, Object> syncMetadata = Collections.emptyMap();
2730

2831
private EvaluationContext evaluationContext;
2932

@@ -47,13 +50,13 @@ public FlagdProvider(final FlagdOptions options) {
4750
switch (options.getResolverType().asString()) {
4851
case Config.RESOLVER_IN_PROCESS:
4952
this.flagResolver = new InProcessResolver(options, this::isConnected,
50-
this::onResolverConnectionChanged);
53+
this::onConnectionEvent);
5154
break;
5255
case Config.RESOLVER_RPC:
5356
this.flagResolver = new GrpcResolver(options,
5457
new Cache(options.getCacheType(), options.getMaxCacheSize()),
5558
this::isConnected,
56-
this::onResolverConnectionChanged);
59+
this::onConnectionEvent);
5760
break;
5861
default:
5962
throw new IllegalStateException(
@@ -117,6 +120,19 @@ public ProviderEvaluation<Value> getObjectEvaluation(String key, Value defaultVa
117120
return this.flagResolver.objectEvaluation(key, defaultValue, mergeContext(ctx));
118121
}
119122

123+
/**
124+
* An unmodifiable view of an object map representing the latest result of the
125+
* SyncMetadata.
126+
* Set on initial connection and updated with every reconnection.
127+
* see:
128+
* https://buf.build/open-feature/flagd/docs/main:flagd.sync.v1#flagd.sync.v1.FlagSyncService.GetMetadata
129+
*
130+
* @return Object map representing sync metadata
131+
*/
132+
protected Map<String, Object> getSyncMetadata() {
133+
return Collections.unmodifiableMap(syncMetadata);
134+
}
135+
120136
private EvaluationContext mergeContext(final EvaluationContext clientCallCtx) {
121137
if (this.evaluationContext != null) {
122138
return evaluationContext.merge(clientCallCtx);
@@ -129,10 +145,12 @@ private boolean isConnected() {
129145
return this.connected;
130146
}
131147

132-
private void onResolverConnectionChanged(boolean newConnectedState, List<String> changedFlagKeys) {
148+
private void onConnectionEvent(boolean newConnectedState, List<String> changedFlagKeys,
149+
Map<String, Object> syncMetadata) {
133150
boolean previous = connected;
134151
boolean current = newConnectedState;
135152
this.connected = newConnectedState;
153+
this.syncMetadata = syncMetadata;
136154

137155
// configuration changed
138156
if (initialized && previous && current) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/Resolver.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import dev.openfeature.sdk.Value;
66

77
/**
8-
* A generic flag resolving contract for flagd.
8+
* Abstraction that resolves flag values in from some source.
99
*/
1010
public interface Resolver {
1111
void init() throws Exception;

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/EventStreamObserver.java

+10-10
Original file line numberDiff line numberDiff line change
@@ -20,25 +20,25 @@
2020
@Slf4j
2121
@SuppressFBWarnings(justification = "cache needs to be read and write by multiple objects")
2222
class EventStreamObserver implements StreamObserver<EventStreamResponse> {
23-
private final BiConsumer<Boolean, List<String>> stateConsumer;
23+
private final BiConsumer<Boolean, List<String>> onConnectionEvent;
2424
private final Object sync;
2525
private final Cache cache;
2626

27-
private static final String CONFIGURATION_CHANGE = "configuration_change";
28-
private static final String PROVIDER_READY = "provider_ready";
27+
public static final String CONFIGURATION_CHANGE = "configuration_change";
28+
public static final String PROVIDER_READY = "provider_ready";
2929
static final String FLAGS_KEY = "flags";
3030

3131
/**
3232
* Create a gRPC stream that get notified about flag changes.
3333
*
3434
* @param sync synchronization object from caller
3535
* @param cache cache to update
36-
* @param stateConsumer lambda to call for setting the state
36+
* @param onResponse lambda to call to handle the response
3737
*/
38-
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> stateConsumer) {
38+
EventStreamObserver(Object sync, Cache cache, BiConsumer<Boolean, List<String>> onResponse) {
3939
this.sync = sync;
4040
this.cache = cache;
41-
this.stateConsumer = stateConsumer;
41+
this.onConnectionEvent = onResponse;
4242
}
4343

4444
@Override
@@ -61,7 +61,7 @@ public void onError(Throwable t) {
6161
if (this.cache.getEnabled()) {
6262
this.cache.clear();
6363
}
64-
this.stateConsumer.accept(false, Collections.emptyList());
64+
this.onConnectionEvent.accept(false, Collections.emptyList());
6565

6666
// handle last call of this stream
6767
handleEndOfStream();
@@ -72,7 +72,7 @@ public void onCompleted() {
7272
if (this.cache.getEnabled()) {
7373
this.cache.clear();
7474
}
75-
this.stateConsumer.accept(false, Collections.emptyList());
75+
this.onConnectionEvent.accept(false, Collections.emptyList());
7676

7777
// handle last call of this stream
7878
handleEndOfStream();
@@ -99,11 +99,11 @@ private void handleConfigurationChangeEvent(EventStreamResponse value) {
9999
}
100100
}
101101

102-
this.stateConsumer.accept(true, changedFlags);
102+
this.onConnectionEvent.accept(true, changedFlags);
103103
}
104104

105105
private void handleProviderReadyEvent() {
106-
this.stateConsumer.accept(true, Collections.emptyList());
106+
this.onConnectionEvent.accept(true, Collections.emptyList());
107107
if (this.cache.getEnabled()) {
108108
this.cache.clear();
109109
}

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnector.java

+15-14
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,9 @@
22

33
import java.util.Collections;
44
import java.util.List;
5+
import java.util.Map;
56
import java.util.Random;
67
import java.util.concurrent.TimeUnit;
7-
import java.util.function.BiConsumer;
88
import java.util.function.Supplier;
99

1010
import dev.openfeature.contrib.providers.flagd.FlagdOptions;
@@ -14,6 +14,7 @@
1414
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamRequest;
1515
import dev.openfeature.flagd.grpc.evaluation.Evaluation.EventStreamResponse;
1616
import dev.openfeature.flagd.grpc.evaluation.ServiceGrpc;
17+
import dev.openfeature.sdk.internal.TriConsumer;
1718
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1819
import io.grpc.ManagedChannel;
1920
import io.grpc.stub.StreamObserver;
@@ -37,7 +38,7 @@ public class GrpcConnector {
3738
private final long deadline;
3839

3940
private final Cache cache;
40-
private final BiConsumer<Boolean, List<String>> stateConsumer;
41+
private final TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent;
4142
private final Supplier<Boolean> connectedSupplier;
4243

4344
private int eventStreamAttempt = 1;
@@ -48,23 +49,23 @@ public class GrpcConnector {
4849

4950
/**
5051
* GrpcConnector creates an abstraction over gRPC communication.
51-
*
52-
* @param options options to build the gRPC channel.
53-
* @param cache cache to use.
54-
* @param stateConsumer lambda to call for setting the state.
52+
*
53+
* @param options flagd options
54+
* @param cache cache to use
55+
* @param connectedSupplier lambda providing current connection status from caller
56+
* @param onConnectionEvent lambda which handles changes in the connection/stream
5557
*/
5658
public GrpcConnector(final FlagdOptions options, final Cache cache, final Supplier<Boolean> connectedSupplier,
57-
BiConsumer<Boolean, List<String>> stateConsumer) {
59+
TriConsumer<Boolean, List<String>, Map<String, Object>> onConnectionEvent) {
5860
this.channel = ChannelBuilder.nettyChannel(options);
5961
this.serviceStub = ServiceGrpc.newStub(channel);
6062
this.serviceBlockingStub = ServiceGrpc.newBlockingStub(channel);
61-
6263
this.maxEventStreamRetries = options.getMaxEventStreamRetries();
6364
this.startEventStreamRetryBackoff = options.getRetryBackoffMs();
6465
this.eventStreamRetryBackoff = options.getRetryBackoffMs();
6566
this.deadline = options.getDeadline();
6667
this.cache = cache;
67-
this.stateConsumer = stateConsumer;
68+
this.onConnectionEvent = onConnectionEvent;
6869
this.connectedSupplier = connectedSupplier;
6970
}
7071

@@ -104,7 +105,7 @@ public void shutdown() throws Exception {
104105
this.channel.awaitTermination(this.deadline, TimeUnit.MILLISECONDS);
105106
log.warn(String.format("Unable to shut down channel by %d deadline", this.deadline));
106107
}
107-
this.stateConsumer.accept(false, Collections.emptyList());
108+
this.onConnectionEvent.accept(false, Collections.emptyList(), Collections.emptyMap());
108109
}
109110
}
110111

@@ -124,7 +125,7 @@ public ServiceGrpc.ServiceBlockingStub getResolver() {
124125
private void observeEventStream() {
125126
while (this.eventStreamAttempt <= this.maxEventStreamRetries) {
126127
final StreamObserver<EventStreamResponse> responseObserver = new EventStreamObserver(sync, this.cache,
127-
this::grpcStateConsumer);
128+
this::grpconConnectionEvent);
128129
this.serviceStub.eventStream(EventStreamRequest.getDefaultInstance(), responseObserver);
129130

130131
try {
@@ -155,16 +156,16 @@ private void observeEventStream() {
155156
}
156157

157158
log.error("failed to connect to event stream, exhausted retries");
158-
this.grpcStateConsumer(false, null);
159+
this.grpconConnectionEvent(false, Collections.emptyList());
159160
}
160161

161-
private void grpcStateConsumer(final boolean connected, final List<String> changedFlags) {
162+
private void grpconConnectionEvent(final boolean connected, final List<String> changedFlags) {
162163
// reset reconnection states
163164
if (connected) {
164165
this.eventStreamAttempt = 1;
165166
this.eventStreamRetryBackoff = this.startEventStreamRetryBackoff;
166167
}
167168
// chain to initiator
168-
this.stateConsumer.accept(connected, changedFlags);
169+
this.onConnectionEvent.accept(connected, changedFlags, Collections.emptyMap());
169170
}
170171
}

0 commit comments

Comments
 (0)