Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: informer related behavior on startup and in case of errors #1571

Merged
merged 25 commits into from
Oct 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions .github/workflows/pr.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,12 @@ jobs:
driver: 'docker'
- name: Run integration tests
run: ./mvnw ${MAVEN_ARGS} -B package -P no-unit-tests --file pom.xml
- name: Adjust Minikube Min Request Timeout Setting
uses: manusa/[email protected]
with:
minikube version: 'v1.26.0'
kubernetes version: ${{ matrix.kubernetes }}
driver: 'docker'
start args: '--extra-config=apiserver.min-request-timeout=3'
- name: Run Special Integration Tests
run: ./mvnw ${MAVEN_ARGS} -B package -P minimal-watch-timeout-dependent-it --file pom.xml
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -152,6 +153,35 @@ default Optional<LeaderElectionConfiguration> getLeaderElectionConfiguration() {
return Optional.empty();
}

/**
* <p>
* if true, operator stops if there are some issues with informers
* {@link io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource} or
* {@link io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource}
* on startup. Other event sources may also respect this flag.
* </p>
* <p>
* if false, the startup will ignore recoverable errors, caused for example by RBAC issues, and
* will try to reconnect periodically in the background.
* </p>
*/
default boolean stopOnInformerErrorDuringStartup() {
return true;
}

/**
* Timeout for cache sync in milliseconds. In other words source start timeout. Note that is
* "stopOnInformerErrorDuringStartup" is true the operator will stop on timeout. Default is 2
* minutes.
*/
default Duration cacheSyncTimeout() {
return Duration.ofMinutes(2);
}

/**
* Handler for an informer stop. Informer stops if there is a non-recoverable error. Like received
* a resource that cannot be deserialized.
*/
default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return Optional.of((informer, ex) -> {
if (ex != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.javaoperatorsdk.operator.api.config;

import java.time.Duration;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
Expand Down Expand Up @@ -27,6 +28,8 @@ public class ConfigurationServiceOverrider {
private ExecutorService workflowExecutorService;
private LeaderElectionConfiguration leaderElectionConfiguration;
private InformerStoppedHandler informerStoppedHandler;
private Boolean stopOnInformerErrorDuringStartup;
private Duration cacheSyncTimeout;

ConfigurationServiceOverrider(ConfigurationService original) {
this.original = original;
Expand Down Expand Up @@ -99,6 +102,17 @@ public ConfigurationServiceOverrider withInformerStoppedHandler(InformerStoppedH
return this;
}

public ConfigurationServiceOverrider withStopOnInformerErrorDuringStartup(
boolean stopOnInformerErrorDuringStartup) {
this.stopOnInformerErrorDuringStartup = stopOnInformerErrorDuringStartup;
return this;
}

public ConfigurationServiceOverrider withCacheSyncTimeout(Duration cacheSyncTimeout) {
this.cacheSyncTimeout = cacheSyncTimeout;
return this;
}

public ConfigurationService build() {
return new BaseConfigurationService(original.getVersion(), cloner, objectMapper) {
@Override
Expand Down Expand Up @@ -171,6 +185,17 @@ public Optional<InformerStoppedHandler> getInformerStoppedHandler() {
return informerStoppedHandler != null ? Optional.of(informerStoppedHandler)
: original.getInformerStoppedHandler();
}

@Override
public boolean stopOnInformerErrorDuringStartup() {
return stopOnInformerErrorDuringStartup != null ? stopOnInformerErrorDuringStartup
: super.stopOnInformerErrorDuringStartup();
}

@Override
public Duration cacheSyncTimeout() {
return cacheSyncTimeout != null ? cacheSyncTimeout : super.cacheSyncTimeout();
}
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand All @@ -11,6 +14,7 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.informers.ExceptionHandler;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
Expand All @@ -37,10 +41,9 @@ public InformerWrapper(SharedIndexInformer<T> informer) {
@Override
public void start() throws OperatorException {
try {
informer.run();

var configService = ConfigurationServiceProvider.instance();
// register stopped handler if we have one defined
ConfigurationServiceProvider.instance().getInformerStoppedHandler()
configService.getInformerStoppedHandler()
.ifPresent(ish -> {
final var stopped = informer.stopped();
if (stopped != null) {
Expand All @@ -58,6 +61,27 @@ public void start() throws OperatorException {
+ fullResourceName + "/" + version);
}
});
if (!configService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
try {
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException | ExecutionException e) {
if (configService.stopOnInformerErrorDuringStartup()) {
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
throw new OperatorException(e);
} else {
log.warn("Informer startup error. Will periodically retry. Informer: {}", informer, e);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
}

} catch (Exception e) {
log.error("Couldn't start informer for " + versionedFullResourceName() + " resources", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

public class MockKubernetesClient {

Expand All @@ -44,17 +43,21 @@ public static <T extends HasMetadata> KubernetesClient client(Class<T> clazz,
when(resources.inAnyNamespace()).thenReturn(inAnyNamespace);
when(inAnyNamespace.withLabelSelector(nullable(String.class))).thenReturn(filterable);
SharedIndexInformer<T> informer = mock(SharedIndexInformer.class);
CompletableFuture<Void> informerStartRes = new CompletableFuture<>();
informerStartRes.complete(null);
when(informer.start()).thenReturn(informerStartRes);
CompletableFuture<Void> stopped = new CompletableFuture<>();
when(informer.stopped()).thenReturn(stopped);
when(informer.getApiTypeClass()).thenReturn(clazz);
if (informerRunBehavior != null) {
doAnswer(invocation -> {
try {
informerRunBehavior.accept(null);
} catch (Exception e) {
stopped.completeExceptionally(e);
}
return null;
}).when(informer).run();
return stopped;
}).when(informer).start();
}
doAnswer(invocation -> null).when(informer).stop();
Indexer mockIndexer = mock(Indexer.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_NAMESPACES_SET;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -255,7 +256,10 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {
MockKubernetesClient.client(Deployment.class, unused -> {
throw exception;
}));
informerEventSource.start();

// by default informer fails to start if there is an exception in the client on start.
// Throws the exception further.
assertThrows(RuntimeException.class, () -> informerEventSource.start());
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,10 @@ public RegisteredController getRegisteredControllerForReconcile(
return registeredControllers.get(getReconcilerOfType(type));
}

public Operator getOperator() {
return operator;
}

@SuppressWarnings("unchecked")
@Override
protected void before(ExtensionContext context) {
Expand Down Expand Up @@ -159,12 +163,20 @@ protected void before(ExtensionContext context) {
}

private void applyCrd(String resourceTypeName) {
applyCrd(resourceTypeName, getKubernetesClient());
}

public static void applyCrd(Class<? extends HasMetadata> resourceClass, KubernetesClient client) {
applyCrd(ReconcilerUtils.getResourceTypeName(resourceClass), client);
}

public static void applyCrd(String resourceTypeName, KubernetesClient client) {
String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml";
try (InputStream is = getClass().getResourceAsStream(path)) {
try (InputStream is = LocallyRunOperatorExtension.class.getResourceAsStream(path)) {
if (is == null) {
throw new IllegalStateException("Cannot find CRD at " + path);
}
final var crd = getKubernetesClient().load(is);
final var crd = client.load(is);
crd.createOrReplace();
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
LOGGER.debug("Applied CRD with path: {}", path);
Expand Down
Loading