Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

overhauling all threading concerns related to informers #3108

Merged
merged 9 commits into from
May 24, 2021
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
#### Bugs

#### Improvements
* Fix #3135 added mock crud support for patch status, and will return exceptions for unsupported patch types
* Fix #3135: added mock crud support for patch status, and will return exceptions for unsupported patch types
* Fix #3072: various changes to refine how threads are handled by informers. Note that the SharedInformer.run call is now blocking when starting the informer.
* Fix #3143: a new SharedInformerEventListener.onException(SharedIndexInformer, Exception) method is available to determine which informer could not start.

#### Dependency Upgrade

Expand Down
4 changes: 4 additions & 0 deletions kubernetes-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,10 @@
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,12 @@
import io.fabric8.kubernetes.client.extended.run.RunOperations;
import io.fabric8.kubernetes.client.informers.SharedInformerFactory;
import io.fabric8.kubernetes.client.utils.Serialization;

import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ForkJoinPool;

import okhttp3.OkHttpClient;

/**
Expand Down Expand Up @@ -574,7 +575,7 @@ public AutoscalingAPIGroupDSL autoscaling() {
*/
@Override
public SharedInformerFactory informers() {
return new SharedInformerFactory(ForkJoinPool.commonPool(), httpClient, getConfiguration());
return new SharedInformerFactory(httpClient, getConfiguration());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,17 @@
package io.fabric8.kubernetes.client.dsl.internal;

import io.fabric8.kubernetes.api.model.ListOptions;
import io.fabric8.kubernetes.client.KubernetesClientException;
import io.fabric8.kubernetes.client.Watch;
import io.fabric8.kubernetes.client.Watcher;
import io.fabric8.kubernetes.client.WatcherException;
import io.fabric8.kubernetes.client.utils.Utils;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.WebSocket;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -45,11 +44,12 @@ public abstract class AbstractWatchManager<T> implements Watch {
private final int reconnectInterval;
private final int maxIntervalExponent;
final AtomicInteger currentReconnectAttempt;
private final ScheduledExecutorService executorService;
private ScheduledFuture<?> reconnectAttempt;

private final RequestBuilder requestBuilder;
protected ClientRunner runner;

private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

AbstractWatchManager(
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder
Expand All @@ -61,11 +61,6 @@ public abstract class AbstractWatchManager<T> implements Watch {
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
this.currentReconnectAttempt = new AtomicInteger(0);
this.forceClosed = new AtomicBoolean();
this.executorService = Executors.newSingleThreadScheduledExecutor(r -> {
Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(AbstractWatchManager.this));
ret.setDaemon(true);
return ret;
});

this.requestBuilder = requestBuilder;
}
Expand Down Expand Up @@ -93,30 +88,35 @@ final void closeEvent() {
watcher.onClose();
}

final void closeExecutorService() {
if (executorService != null && !executorService.isShutdown()) {
logger.debug("Closing ExecutorService");
try {
executorService.shutdown();
if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) {
logger.warn("Executor didn't terminate in time after shutdown in close(), killing it.");
executorService.shutdownNow();
}
} catch (Exception t) {
throw KubernetesClientException.launderThrowable(t);
}
final synchronized void cancelReconnect() {
if (reconnectAttempt != null) {
reconnectAttempt.cancel(true);
}
}

void submit(Runnable task) {
if (!executorService.isShutdown()) {
executorService.submit(task);

void scheduleReconnect(Runnable command, boolean shouldBackoff) {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
}

void schedule(Runnable command, long delay, TimeUnit timeUnit) {
if (!executorService.isShutdown()) {
executorService.schedule(command, delay, timeUnit);

logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? nextReconnectInterval()
: 0;

synchronized (this) {
reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> {
try {
command.run();
} finally {
reconnectPending.set(false);
}
}, delay, TimeUnit.MILLISECONDS);
if (forceClosed.get()) {
cancelReconnect();
}
}
}

Expand Down Expand Up @@ -182,7 +182,7 @@ public void close() {
logger.debug("Force closing the watch {}", this);
closeEvent();
runner.close();
closeExecutorService();
cancelReconnect();
}

@FunctionalInterface
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,7 @@
import java.io.IOException;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.api.model.KubernetesResource;
Expand Down Expand Up @@ -96,7 +94,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) {

private abstract static class HTTPClientRunner<T extends HasMetadata> extends AbstractWatchManager.ClientRunner {
private final AbstractWatchManager<T> manager;
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

public HTTPClientRunner(OkHttpClient client, AbstractWatchManager<T> manager) {
super(client);
Expand Down Expand Up @@ -155,43 +152,16 @@ private void scheduleReconnect(boolean shouldBackoff) {
return;
}

logger.debug("Submitting reconnect task to the executor");

// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor.
manager.submit(() -> {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
manager.scheduleReconnect(() -> {
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");

long delay = shouldBackoff
? manager.nextReconnectInterval()
: 0;

manager.schedule(() -> {
try {
manager.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
}, delay, TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
// This is a standard exception if we close the scheduler. We should not print it
if (!manager.isForceClosed()) {
logger.error("Exception in reconnect", e);
}
reconnectPending.set(false);
manager.runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
close();
manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e));
}
});
}, shouldBackoff);
}

public void onMessage(String messageSource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@
package io.fabric8.kubernetes.client.dsl.internal;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -43,7 +41,7 @@ abstract class WatcherWebSocketListener<T> extends WebSocketListener {
* True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started.
*/
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);

/**
* Blocking queue for startup exceptions.
*/
Expand Down Expand Up @@ -152,40 +150,17 @@ public void onClosed(WebSocket webSocket, int code, String reason) {
}

private void scheduleReconnect() {
logger.debug("Submitting reconnect task to the executor");
// make sure that whichever thread calls this method, the tasks are
// performed serially in the executor
manager.submit(new NamedRunnable("scheduleReconnect") {
@Override
public void execute() {
if (!reconnectPending.compareAndSet(false, true)) {
logger.debug("Reconnect already scheduled");
return;
}
webSocketRef.set(null);
manager.scheduleReconnect(() -> {
try {
manager.runWatch();
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
try {
// actual reconnect only after the back-off time has passed, without
// blocking the thread
logger.debug("Scheduling reconnect task");
manager.schedule(new NamedRunnable("reconnectAttempt") {
@Override
public void execute() {
try {
manager.runWatch();
reconnectPending.set(false);
} catch (Exception e) {
// An unexpected error occurred and we didn't even get an onFailure callback.
logger.error("Exception in reconnect", e);
webSocketRef.set(null);
manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
manager.close();
}
}
}, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS);
} catch (RejectedExecutionException e) {
reconnectPending.set(false);
}
manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e));
manager.close();
}
});
}, true);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,5 @@ public interface ResourceEventHandler<T> {
* @param deletedFinalStateUnknown get final state of item if it is known or not.
*/
void onDelete(T obj, boolean deletedFinalStateUnknown);

}
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ public interface SharedInformer<T> {

/**
* Starts the shared informer, which will be stopped until stop() is called.
*
* <br>If the informer is not already running, this is a blocking call
*/
void run();

Expand All @@ -56,12 +58,17 @@ public interface SharedInformer<T> {
* The value returned is not synchronized with access to the underlying store
* and is not thread-safe.
*
* @return string value
* @return string value or null if never synced
*/
String lastSyncResourceVersion();

/**
* Return true if the informer is running
*/
boolean isRunning();

/**
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,14 @@
package io.fabric8.kubernetes.client.informers;

public interface SharedInformerEventListener {

/**
* @deprecated Use {@link #onException(SharedIndexInformer, Exception)} instead
*/
@Deprecated
void onException(Exception exception);

default void onException(SharedIndexInformer<?> informer, Exception e) {
onException(e);
}
}
Loading