Skip to content

Commit 494c72a

Browse files
committed
fixup: remove stream deadline
Signed-off-by: Todd Baert <[email protected]>
1 parent ac1078a commit 494c72a

File tree

6 files changed

+31
-12
lines changed

6 files changed

+31
-12
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnector.java

+1-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,7 @@ static void observeEventStream(final BlockingQueue<QueuePayload> writeTo,
135135
syncRequest.setSelector(selector);
136136
}
137137

138-
serviceStub.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).syncFlags(syncRequest.build(),
139-
new GrpcStreamHandler(streamReceiver));
138+
serviceStub.syncFlags(syncRequest.build(), new GrpcStreamHandler(streamReceiver));
140139
try {
141140
GetMetadataResponse metadataResponse = serviceBlockingStub
142141
.withDeadlineAfter(deadline, TimeUnit.MILLISECONDS).getMetadata(metadataRequest.build());

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/process/FlagdInProcessSetup.java

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ public static void setup() throws InterruptedException {
2626
flagdContainer.start();
2727
FlagdInProcessSetup.provider = new FlagdProvider(FlagdOptions.builder()
2828
.resolverType(Config.Resolver.IN_PROCESS)
29+
// set a generous deadline, to prevent timeouts in actions
2930
.deadline(3000)
3031
.port(flagdContainer.getFirstMappedPort())
3132
.build());

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/process/FlagdInProcessSetup.java

+2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@ public static void setup() throws InterruptedException {
2424
FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder()
2525
.resolverType(Config.Resolver.IN_PROCESS)
2626
.port(flagdContainer.getFirstMappedPort())
27+
// set a generous deadline, to prevent timeouts in actions
28+
.deadline(3000)
2729
.build());
2830
StepDefinitions.setUnstableProvider(workingProvider);
2931

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/e2e/reconnect/rpc/FlagdRpcSetup.java

+2
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ public static void setup() throws InterruptedException {
2828
FeatureProvider workingProvider = new FlagdProvider(FlagdOptions.builder()
2929
.resolverType(Config.Resolver.RPC)
3030
.port(flagdContainer.getFirstMappedPort())
31+
// set a generous deadline, to prevent timeouts in actions
32+
.deadline(3000)
3133
.cacheType(CacheType.DISABLED.getValue())
3234
.build());
3335
StepDefinitions.setUnstableProvider(workingProvider);

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/grpc/GrpcConnectorTest.java

+25-7
Original file line numberDiff line numberDiff line change
@@ -130,15 +130,33 @@ void initialization_fail_with_timeout() throws Exception {
130130
final Cache cache = new Cache("disabled", 0);
131131
final ServiceGrpc.ServiceStub mockStub = mock(ServiceGrpc.ServiceStub.class);
132132
Consumer<ConnectionEvent> onConnectionEvent = mock(Consumer.class);
133-
doAnswer(invocation -> null).when(mockStub).eventStream(any(), any());
133+
doAnswer((InvocationOnMock invocation) -> {
134+
EventStreamObserver eventStreamObserver = (EventStreamObserver) invocation.getArgument(1);
135+
eventStreamObserver
136+
.onError(new Exception("fake"));
137+
return null;
138+
}).when(mockStub).eventStream(any(), any());
139+
140+
try (MockedStatic<ServiceGrpc> mockStaticService = mockStatic(ServiceGrpc.class)) {
141+
mockStaticService.when(() -> ServiceGrpc.newStub(any()))
142+
.thenReturn(mockStub);
143+
144+
// pass true in connected lambda
145+
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> {
146+
try {
147+
Thread.sleep(100);
148+
return true;
149+
} catch (Exception e) {
150+
}
151+
return false;
134152

135-
final GrpcConnector connector = new GrpcConnector(FlagdOptions.builder().build(), cache, () -> false,
136-
onConnectionEvent);
153+
},
154+
onConnectionEvent);
137155

138-
// assert throws
139-
assertThrows(RuntimeException.class, connector::initialize);
140-
// assert that onConnectionEvent is not connected
141-
verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected()));
156+
assertDoesNotThrow(connector::initialize);
157+
// assert that onConnectionEvent is connected
158+
verify(onConnectionEvent).accept(argThat(arg -> !arg.isConnected()));
159+
}
142160
}
143161

144162
@Test

providers/flagd/src/test/java/dev/openfeature/contrib/providers/flagd/resolver/process/storage/connector/grpc/GrpcStreamConnectorTest.java

-3
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,6 @@ public void connectionParameters() throws Throwable {
4949
final FlagSyncServiceBlockingStub blockingStubMock = mockBlockingStubAndReturn(connector);
5050
final SyncFlagsRequest[] request = new SyncFlagsRequest[1];
5151

52-
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
5352
doAnswer(invocation -> {
5453
request[0] = invocation.getArgument(0, SyncFlagsRequest.class);
5554
return null;
@@ -58,7 +57,6 @@ public void connectionParameters() throws Throwable {
5857
// when
5958
connector.init();
6059
verify(stubMock, timeout(MAX_WAIT_MS.toMillis()).times(1)).syncFlags(any(), any());
61-
verify(stubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
6260
verify(blockingStubMock).withDeadlineAfter(1337, TimeUnit.MILLISECONDS);
6361

6462
// then
@@ -87,7 +85,6 @@ public void grpcConnectionStatus() throws Throwable {
8785

8886
final GrpcStreamHandler[] injectedHandler = new GrpcStreamHandler[1];
8987

90-
when(stubMock.withDeadlineAfter(anyLong(), any())).thenReturn(stubMock);
9188
doAnswer(invocation -> {
9289
injectedHandler[0] = invocation.getArgument(1, GrpcStreamHandler.class);
9390
return null;

0 commit comments

Comments
 (0)