From ff174cc31fcf2c6d01396d7342103253ec6a2172 Mon Sep 17 00:00:00 2001 From: shawkins Date: Wed, 12 May 2021 06:49:56 -0400 Subject: [PATCH 1/9] overhauling all threading concerns related to informers using a shared resync scheduler allowing handlers to not have dedicated threads making run blocking --- .../client/DefaultKubernetesClient.java | 7 +- .../dsl/internal/AbstractWatchManager.java | 7 +- .../informers/ResourceEventHandler.java | 8 + .../client/informers/ResyncRunnable.java | 10 +- .../client/informers/SharedInformer.java | 4 +- .../informers/SharedInformerFactory.java | 6 +- .../client/informers/SharedScheduler.java | 73 +++++ .../client/informers/cache/Controller.java | 154 --------- .../informers/cache/ProcessorListener.java | 35 +- .../client/informers/cache/Reflector.java | 2 +- .../informers/cache/SharedProcessor.java | 82 ++--- .../impl/DefaultSharedIndexInformer.java | 119 +++++-- .../kubernetes/client/utils/Utils.java | 20 ++ .../client/informers/SharedSchedulerTest.java | 52 +++ .../informers/cache/ControllerTest.java | 307 ------------------ .../cache/ProcessorListenerTest.java | 7 - .../informers/cache/SharedProcessorTest.java | 8 +- .../DefaultSharedIndexInformerResyncTest.java | 174 ++++++++++ .../kubernetes/client/internal/UtilsTest.java | 10 + .../mock/DefaultSharedIndexInformerTest.java | 1 - 20 files changed, 490 insertions(+), 596 deletions(-) create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java delete mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java create mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java delete mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java create mode 100644 kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java index a650c62168f..a807bee5931 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java @@ -122,11 +122,14 @@ import io.fabric8.kubernetes.client.extended.run.RunOperations; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.fabric8.kubernetes.client.utils.Serialization; +import io.fabric8.kubernetes.client.utils.Utils; + import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.Executors; + import okhttp3.OkHttpClient; /** @@ -574,7 +577,7 @@ public AutoscalingAPIGroupDSL autoscaling() { */ @Override public SharedInformerFactory informers() { - return new SharedInformerFactory(ForkJoinPool.commonPool(), httpClient, getConfiguration()); + return new SharedInformerFactory(Executors.newCachedThreadPool(Utils.daemonThreadFactory(this)), httpClient, getConfiguration()); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index a68a98b9a2c..0ebf076dafd 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -20,6 +20,7 @@ import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.WebSocket; @@ -61,11 +62,7 @@ public abstract class AbstractWatchManager implements Watch { this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); - this.executorService = Executors.newSingleThreadScheduledExecutor(r -> { - Thread ret = new Thread(r, "Executor for Watch " + System.identityHashCode(AbstractWatchManager.this)); - ret.setDaemon(true); - return ret; - }); + this.executorService = Executors.newSingleThreadScheduledExecutor(Utils.daemonThreadFactory(AbstractWatchManager.this)); this.requestBuilder = requestBuilder; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java index 47e9f9db60a..39cbf005a64 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java @@ -54,4 +54,12 @@ public interface ResourceEventHandler { * @param deletedFinalStateUnknown get final state of item if it is known or not. */ void onDelete(T obj, boolean deletedFinalStateUnknown); + + /** + * If the event handler should be called in a separate thread + * @return + */ + default boolean isCalledAsync() { + return true; + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java index bd0730862b9..90fc3843224 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java @@ -19,7 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.Supplier; +import java.util.function.BooleanSupplier; /** * Calls the resync function of store interface which is always implemented @@ -30,9 +30,9 @@ public class ResyncRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class); private Store store; - private Supplier shouldResyncFunc; + private BooleanSupplier shouldResyncFunc; - public ResyncRunnable(Store store, Supplier shouldResyncFunc) { + public ResyncRunnable(Store store, BooleanSupplier shouldResyncFunc) { this.store = store; this.shouldResyncFunc = shouldResyncFunc; } @@ -42,7 +42,9 @@ public void run() { log.debug("ResyncRunnable#resync .. .."); } - if (shouldResyncFunc == null || shouldResyncFunc.get()) { + // TODO if there is a concern that this processing could overwhelm the single + // thread, then hand this off to the common pool + if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) { if (log.isDebugEnabled()) { log.debug("ResyncRunnable#force resync"); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java index 1cdc8f76257..8785dbb567f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java @@ -41,6 +41,8 @@ public interface SharedInformer { /** * Starts the shared informer, which will be stopped until stop() is called. + * + *
If the informer is not already running, this is a blocking call */ void run(); @@ -56,7 +58,7 @@ public interface SharedInformer { * The value returned is not synchronized with access to the underlying store * and is not thread-safe. * - * @return string value + * @return string value or null if never synced */ String lastSyncResourceVersion(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index dd08a4b08c7..0963777e34d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -53,6 +53,7 @@ public class SharedInformerFactory extends BaseOperation { private final Map startedInformers = new HashMap<>(); private final ExecutorService informerExecutor; + private final SharedScheduler resyncExecutor = new SharedScheduler(); private final BaseOperation baseOperation; @@ -230,7 +231,7 @@ private synchronized context = context.withIsNamespaceConfiguredFromGlobalConfig(false); } } - SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners); + SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners, resyncExecutor); this.informers.put(getInformerKey(context), informer); return informer; } @@ -274,7 +275,8 @@ public synchronized SharedIndexInformer getExistingSharedIndexInformer(Cl } /** - * Starts all registered informers. + * Starts all registered informers in an asynchronous fashion. + *
use {@link #addSharedInformerEventListener(SharedInformerEventListener)} to receive startup errors. */ public synchronized void startAllRegisteredInformers() { if (informers.isEmpty()) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java new file mode 100644 index 00000000000..82eeb2607a5 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers; + +import io.fabric8.kubernetes.client.utils.Utils; + +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * Maintains a single thread daemon scheduler. + * It is not intended for long-running tasks + */ +public class SharedScheduler { + + public static long DEFAULT_TTL_MILLIS = TimeUnit.SECONDS.toMillis(10); + + private final long ttlMillis; + private ScheduledThreadPoolExecutor executor; + + public SharedScheduler() { + this(DEFAULT_TTL_MILLIS); + } + + public SharedScheduler(long ttlMillis) { + this.ttlMillis = ttlMillis; + } + + public synchronized ScheduledFuture scheduleWithFixedDelay(Runnable command, + long initialDelay, + long delay, + TimeUnit unit) { + if (executor == null) { + // start the executor and add a ttl task + executor = new ScheduledThreadPoolExecutor(1, Utils.daemonThreadFactory(this)); + executor.setRemoveOnCancelPolicy(true); + executor.scheduleWithFixedDelay(this::shutdownCheck, ttlMillis, ttlMillis, TimeUnit.MILLISECONDS); + } + return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + /** + * if the queue is empty since we're locked that means there's nothing pending. + * since there is only a single thread, that means this running task is holding + * it, so we can shut ourselves down + */ + private synchronized void shutdownCheck() { + if (executor.getQueue().isEmpty()) { + executor.shutdownNow(); + executor = null; + } + } + + synchronized boolean hasExecutor() { + return executor != null; + } + +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java deleted file mode 100644 index 57f92047aa7..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Controller.java +++ /dev/null @@ -1,154 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResourceList; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; -import io.fabric8.kubernetes.client.informers.ListerWatcher; -import io.fabric8.kubernetes.client.informers.ResyncRunnable; -import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.TimeUnit; -import java.util.function.Supplier; - -/** - * Controller is a generic controller framework. - * - * This is taken from https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/Controller.java - * which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/controller.go - */ -public class Controller> { - private static final Logger log = LoggerFactory.getLogger(Controller.class); - - /** - * resync fifo internals in millis - */ - private final long fullResyncPeriod; - - private final Store store; - - private final ListerWatcher listerWatcher; - - private Reflector reflector; - - private final Supplier resyncFunc; - - private final ScheduledExecutorService resyncExecutor; - - private ScheduledFuture resyncFuture; - - private final OperationContext operationContext; - - private final ConcurrentLinkedQueue eventListeners; - - private final Class apiTypeClass; - - Controller(Class apiTypeClass, Store store, ListerWatcher listerWatcher, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners, ScheduledExecutorService resyncExecutor) { - this.store = store; - this.listerWatcher = listerWatcher; - this.apiTypeClass = apiTypeClass; - this.resyncFunc = resyncFunc; - if (fullResyncPeriod < 0) { - throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); - } - this.fullResyncPeriod = fullResyncPeriod; - this.operationContext = context; - this.eventListeners = eventListeners; - - this.reflector = new Reflector<>(apiTypeClass, listerWatcher, store, operationContext); - this.resyncExecutor = resyncExecutor; - } - - public Controller(Class apiTypeClass, Store store, ListerWatcher listerWatcher, Supplier resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { - this(apiTypeClass, store, listerWatcher, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor()); - } - - public void run() { - log.info("informer#Controller: ready to run resync and reflector runnable"); - - scheduleResync(); - - try { - log.info("Started Reflector watch for {}", apiTypeClass); - reflector.listSyncAndWatch(); - } catch (Exception exception) { - log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception); - this.eventListeners.forEach(listener -> listener.onException(exception)); - } - } - - void scheduleResync() { - // Start the resync runnable - if (fullResyncPeriod > 0) { - ResyncRunnable resyncRunnable = new ResyncRunnable(store, resyncFunc); - resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS); - } else { - log.info("informer#Controller: resync skipped due to 0 full resync period"); - } - } - - /** - * Stops the resync thread pool first, then stops the reflector. - */ - public void stop() { - synchronized (this) { - reflector.stop(); - if (resyncFuture != null) { - resyncFuture.cancel(true); - } - resyncExecutor.shutdown(); - } - } - - /** - * Returns true if the queue has been resynced - * @return boolean value about queue sync status - */ - public boolean hasSynced() { - return this.store.hasSynced(); - } - - /** - * Returns the latest resource version watched by controller. - * @return latest resource version - */ - public String lastSyncResourceVersion() { - return reflector.getLastSyncResourceVersion(); - } - - Reflector getReflector() { - return reflector; - } - - ScheduledExecutorService getResyncExecutor() { - return this.resyncExecutor; - } - - public boolean isRunning() { - return this.reflector.isRunning(); - } - - public long getFullResyncPeriod() { - return fullResyncPeriod; - } -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java index bf8c4ade3b5..a6bc8f2eb75 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java @@ -21,9 +21,7 @@ import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; -import java.util.Optional; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.Executor; /** * ProcessorListener implements Runnable interface. It's supposed to run in background @@ -31,41 +29,38 @@ * * This has been taken from official client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/ProcessorListener.java * which has been ported from official go client: https://github.com/kubernetes/client-go/blob/master/tools/cache/shared_informer.go#L570 + * + *
Modified to execute loosely coupled from its processing thread * * @param type of ProcessorListener */ -public class ProcessorListener implements Runnable { +public class ProcessorListener { private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class); private long resyncPeriodInMillis; private ZonedDateTime nextResync; - private BlockingQueue> queue; private ResourceEventHandler handler; - + private Executor executor; + public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMillis) { + this(handler, resyncPeriodInMillis, Runnable::run); + } + + public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMillis, Executor executorService) { this.resyncPeriodInMillis = resyncPeriodInMillis; this.handler = handler; - this.queue = new LinkedBlockingQueue<>(); + this.executor = executorService; determineNextResync(ZonedDateTime.now()); } - @Override - public void run() { - while (true) { + public void add(Notification notification) { + executor.execute(() -> { try { - queue.take().handle(handler); - } catch(InterruptedException ex) { - log.warn("Processor thread interrupted: {}", ex.getMessage()); - Thread.currentThread().interrupt(); - return; + notification.handle(handler); } catch (Exception ex) { log.error("Failed invoking {} event handler: {}", handler, ex.getMessage(), ex); } - } - } - - public void add(Notification obj) { - Optional.ofNullable(obj).ifPresent(this.queue::add); + }); } public void determineNextResync(ZonedDateTime now) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java index 287ff2f5334..25ab1c0dca3 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java @@ -74,7 +74,7 @@ private synchronized void stopWatcher() { *
Starts the watch with a fresh store state. *
Should be called only at start and when HttpGone is seen. */ - void listSyncAndWatch() { + public void listSyncAndWatch() { store.isPopulated(false); final L list = getList(); final String latestResourceVersion = list.getMetadata().getResourceVersion(); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java index f0dd5cbcfad..11a0ac2864e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java @@ -15,9 +15,13 @@ */ package io.fabric8.kubernetes.client.informers.cache; +import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.utils.Utils; + import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.ReadWriteLock; @@ -28,80 +32,36 @@ * notifications. * * This has been taken from official-client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java + * + *
Modified to not submit tasks directly */ public class SharedProcessor { private ReadWriteLock lock = new ReentrantReadWriteLock(); + private List toClose = new ArrayList<>(); private List> listeners; private List> syncingListeners; - private ExecutorService executorService; - public SharedProcessor() { - this(Executors.newCachedThreadPool()); - } - - public SharedProcessor(ExecutorService threadPool) { this.listeners = new ArrayList<>(); this.syncingListeners = new ArrayList<>(); - this.executorService = threadPool; } /** - * First adds the specific processorListener then starts the listener - * with executor. - * - * @param processorListener specific processor listener - */ - public void addAndStartListener(final ProcessorListener processorListener) { - lock.writeLock().lock(); - try { - addListenerLocked(processorListener); - - executorService.execute(processorListener); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Adds the specific processorListener, but not start it. + * Adds the specific processorListener * * @param processorListener specific processor listener */ public void addListener(final ProcessorListener processorListener) { lock.writeLock().lock(); try { - addListenerLocked(processorListener); + this.listeners.add(processorListener); + this.syncingListeners.add(processorListener); } finally { lock.writeLock().unlock(); } } - private void addListenerLocked(final ProcessorListener processorListener) { - this.listeners.add(processorListener); - this.syncingListeners.add(processorListener); - } - - /** - * Starts the processor listeners. - */ - public void run() { - lock.readLock().lock(); - try { - if (listeners == null || listeners.isEmpty()) { - return; - } - for (ProcessorListener listener : listeners) { - if (!executorService.isShutdown()) { - executorService.submit(listener); - } - } - } finally { - lock.readLock().unlock(); - } - } - /** * Distribute the object amount listeners. * @@ -148,10 +108,30 @@ public boolean shouldResync() { public void stop() { lock.writeLock().lock(); try { + toClose.forEach(c->c.run()); + toClose.clear(); + syncingListeners = null; listeners = null; } finally { lock.writeLock().unlock(); } - executorService.shutdownNow(); + } + + public ProcessorListener addProcessorListener(ResourceEventHandler handler, long resyncPeriodMillis) { + lock.writeLock().lock(); + try { + Executor executor = Runnable::run; + if (handler.isCalledAsync()) { + ExecutorService service = Executors.newSingleThreadExecutor(Utils.daemonThreadFactory(handler)); + executor = service; + toClose.add(service::shutdownNow); + } + + ProcessorListener listener = new ProcessorListener<>(handler, resyncPeriodMillis, executor); + addListener(listener); + return listener; + } finally { + lock.writeLock().unlock(); + } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index f7db0fa725f..2cb6306a5ce 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -20,20 +20,26 @@ import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; +import io.fabric8.kubernetes.client.informers.ResyncRunnable; +import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; +import io.fabric8.kubernetes.client.informers.SharedScheduler; import io.fabric8.kubernetes.client.informers.cache.Cache; -import io.fabric8.kubernetes.client.informers.cache.Controller; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ProcessorListener; import io.fabric8.kubernetes.client.informers.cache.ProcessorStore; +import io.fabric8.kubernetes.client.informers.cache.Reflector; import io.fabric8.kubernetes.client.informers.cache.SharedProcessor; -import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collection; import java.util.List; import java.util.Map; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.BooleanSupplier; import java.util.function.Function; public class DefaultSharedIndexInformer> implements SharedIndexInformer { @@ -49,30 +55,39 @@ public class DefaultSharedIndexInformer eventListeners; + private final SharedScheduler resyncExecutor; + private final Reflector reflector; + private final Class apiTypeClass; + + private final ProcessorStore processorStore; + private Cache indexer; private SharedProcessor processor; - private Controller controller; - - private Thread controllerThread; - - private volatile boolean started = false; + private AtomicBoolean started = new AtomicBoolean(); private volatile boolean stopped = false; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, ConcurrentLinkedQueue eventListeners) { + private ScheduledFuture resyncFuture; + + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Collection eventListeners, SharedScheduler resyncExecutor) { + if (resyncPeriod < 0) { + throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); + } this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; + this.eventListeners = eventListeners; + this.resyncExecutor = resyncExecutor; + this.apiTypeClass = apiTypeClass; this.processor = new SharedProcessor<>(); this.indexer = new Cache<>(); this.indexer.setIsRunning(this::isRunning); - ProcessorStore processorStore = new ProcessorStore<>(this.indexer, this.processor); - - this.controller = new Controller<>(apiTypeClass, processorStore, listerWatcher, processor::shouldResync, resyncCheckPeriodMillis, context, eventListeners); - controllerThread = new Thread(controller::run, "informer-controller-" + apiTypeClass.getSimpleName()); + processorStore = new ProcessorStore<>(this.indexer, this.processor); + this.reflector = new Reflector<>(apiTypeClass, listerWatcher, processorStore, context); } /** @@ -99,7 +114,7 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon } if (resyncPeriodMillis < this.resyncCheckPeriodMillis) { - if (started) { + if (started.get()) { log.warn("DefaultSharedIndexInformer#resyncPeriod {} is smaller than resyncCheckPeriod {} and the informer has already started. Changing it to {}", resyncPeriodMillis, resyncCheckPeriodMillis); resyncPeriodMillis = resyncCheckPeriodMillis; } else { @@ -110,14 +125,13 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon } } } - - ProcessorListener listener = new ProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis)); - if (!started) { - this.processor.addListener(listener); + + ProcessorListener listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis)); + + if (!started.get()) { return; } - this.processor.addAndStartListener(listener); List objectList = this.indexer.list(); for (Object item : objectList) { listener.add(new ProcessorListener.AddNotification(item)); @@ -126,7 +140,7 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon @Override public String lastSyncResourceVersion() { - return this.controller.lastSyncResourceVersion(); + return this.reflector.getLastSyncResourceVersion(); } @Override @@ -134,32 +148,47 @@ public void run() { if (stopped) { throw new IllegalStateException("Cannot restart a stopped informer"); } - if (started) { + if (!started.compareAndSet(false, true)) { return; } - started = true; + try { + log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); - this.processor.run(); - controllerThread.start(); + scheduleResync(processor::shouldResync); + + reflector.listSyncAndWatch(); + } catch (RuntimeException exception) { + log.warn("Informer start did not complete", exception); + this.eventListeners.forEach(listener -> listener.onException(exception)); + throw exception; + } + // stop called while run is called could be ineffective, check for it afterwards + synchronized (this) { + if (stopped) { + stop(); + } + } } @Override - public void stop() { - if (!started || stopped) { - return; - } - + public synchronized void stop() { stopped = true; - controller.stop(); - controllerThread.interrupt(); - + reflector.stop(); + stopResync(); processor.stop(); } + private synchronized void stopResync() { + if (resyncFuture != null) { + resyncFuture.cancel(true); + resyncFuture = null; + } + } + @Override public boolean hasSynced() { - return controller != null && this.controller.hasSynced(); + return this.processorStore.hasSynced(); } @Override @@ -167,7 +196,6 @@ public void addIndexers(Map>> indexers) { indexer.addIndexers(indexers); } - @Override public Indexer getIndexer() { return this.indexer; @@ -186,6 +214,25 @@ private long determineResyncPeriod(long desired, long check) { @Override public boolean isRunning() { - return !stopped && started && controller.isRunning(); + return !stopped && started.get() && reflector.isRunning(); + } + + synchronized void scheduleResync(BooleanSupplier resyncFunc) { + // schedule the resync runnable + if (resyncCheckPeriodMillis > 0) { + ResyncRunnable resyncRunnable = new ResyncRunnable<>(processorStore, resyncFunc); + resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS); + } else { + log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass); + } } + + public long getFullResyncPeriod() { + return resyncCheckPeriodMillis; + } + + ScheduledFuture getResyncFuture() { + return resyncFuture; + } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 5d6b26dce9b..87db484be71 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -42,6 +42,8 @@ import java.util.Random; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; import java.util.stream.Stream; @@ -450,5 +452,23 @@ public static List getCommandPlatformPrefix() { private static String getOperatingSystemFromSystemProperty() { return System.getProperty(OS_NAME); } + + /** + * Create a {@link ThreadFactory} with daemon threads and a thread + * name based upon the object passed in. + */ + public static ThreadFactory daemonThreadFactory(Object forObject) { + return new ThreadFactory() { + ThreadFactory threadFactory = Executors.defaultThreadFactory(); + + @Override + public Thread newThread(Runnable r) { + Thread ret = threadFactory.newThread(r); + ret.setName(forObject.getClass().getSimpleName() + "-" + System.identityHashCode(forObject) + "-" + ret.getName()); + ret.setDaemon(true); + return ret; + } + }; + } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java new file mode 100644 index 00000000000..2963d7f399b --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java @@ -0,0 +1,52 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers; + +import org.junit.jupiter.api.Test; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SharedSchedulerTest { + + @Test + void testAutoShutdown() throws InterruptedException { + SharedScheduler scheduler = new SharedScheduler(50); + assertFalse(scheduler.hasExecutor()); + CountDownLatch latch = new CountDownLatch(2); + ScheduledFuture future = scheduler.scheduleWithFixedDelay(()->latch.countDown(), 50, 50, TimeUnit.MILLISECONDS); + assertTrue(scheduler.hasExecutor()); + latch.await(); + assertFalse(future.isDone()); + future.cancel(true); + for (int i = 0; i < 10; i++) { + if (!scheduler.hasExecutor()) { + break; + } + Thread.sleep(100); + } + // should be shutdown + assertFalse(scheduler.hasExecutor()); + // should start again + future = scheduler.scheduleWithFixedDelay(()->latch.countDown(), 50, 50, TimeUnit.MILLISECONDS); + assertTrue(scheduler.hasExecutor()); + } +} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java deleted file mode 100644 index 3e77ba04c35..00000000000 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ControllerTest.java +++ /dev/null @@ -1,307 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.informers.cache; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; -import static org.junit.jupiter.api.Assertions.assertThrows; - -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -import org.junit.jupiter.api.DisplayName; -import org.junit.jupiter.api.Test; -import org.junit.jupiter.api.function.Executable; -import org.mockito.Mockito; - -import io.fabric8.kubernetes.api.model.Pod; -import io.fabric8.kubernetes.api.model.PodList; -import io.fabric8.kubernetes.client.dsl.base.OperationContext; -import io.fabric8.kubernetes.client.informers.ListerWatcher; -import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; - -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.eq; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -class ControllerTest { - private final Store deltaFIFO = Mockito.mock(Store.class, Mockito.RETURNS_DEEP_STUBS); - private abstract static class AbstractPodListerWatcher implements ListerWatcher {}; - private static final Long WAIT_TIME = 500L; - private final ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); - private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); - private final ConcurrentLinkedQueue eventListeners = Mockito.mock(ConcurrentLinkedQueue.class, Mockito.RETURNS_DEEP_STUBS); - - @Test - @DisplayName("Controller initialized with resync period greater than zero should use provided resync period") - void testControllerCreationWithResyncPeriodMoreThanZero() { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1000L, operationContext, eventListeners); - - // Then - assertEquals(1000L, controller.getFullResyncPeriod()); - } - - @Test - @DisplayName("Controller initialized with resync period less than zero should throw exception") - void testControllerCreationWithResyncPeriodLessThanZero() { - assertThrows(IllegalArgumentException.class, () -> new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - -1000L, operationContext, eventListeners)); - } - - @Test - @DisplayName("Controller initialized with resync period 0 should use provided resync period") - void testControllerCreationWithResyncPeriodZero() { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 0L, operationContext, eventListeners); - - // Then - assertEquals(0L, controller.getFullResyncPeriod()); - } - - @Test - @DisplayName("Controller stop shut downs/cancels all executor services") - void testStop() { - // Given - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1000L, operationContext, eventListeners); - - // When - controller.stop(); - // Then - assertThat(controller.getResyncExecutor().isShutdown()).isTrue(); - } - - @Test - @DisplayName("Controller initialized with resync period should have synced") - void testControllerHasSync() { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 10L, operationContext, eventListeners); - Thread controllerThread = newControllerThread(controller); - controllerThread.start(); - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(controller.hasSynced()).isFalse(); - assertThat(controller.lastSyncResourceVersion()).isNull(); - } - - @Test - @DisplayName("Controller with interrupted thread should not shutdown resyncExecutor") - void testControllerRunWithInterruptedThread() throws InterruptedException { - // Given + When - // used to be able to interrupt the thread in the lambda - ThreadWrapper controllerThreadWrapper = new ThreadWrapper(); - long fullResyncPeriod = 1L; - int numberOfResyncs = 1; - final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> { - controllerThreadWrapper.interrupt(); - return true; - }, - fullResyncPeriod, operationContext, eventListeners); - Thread controllerThread = newControllerThread(controller); - controllerThreadWrapper.thread = controllerThread; // put the thread in the wrapper, so the lamba can interrupt it - controllerThread.start(); - countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); // a too short value does not allow the processLoop to start. - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isFalse(); - } - - @Test - @DisplayName("Controller initialized with resync period should initialize resyncExecutor") - void testControllerRunWithResyncPeriodGreaterThanZero() throws InterruptedException { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1L, operationContext, eventListeners); - Thread controllerThread = newControllerThread(controller); - controllerThread.start(); - controller.stop(); - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isTrue(); - } - - @Test - @DisplayName("Controller with resync function throwing exception") - void testControllerRunsResyncFunctionThrowingException() throws InterruptedException { - // Given + When - long fullResyncPeriod = 10L; - int numberOfResyncs = 10; - final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> { - countDown.countDown(); - if( countDown.getCount() == 2 ) { - throw new RuntimeException("make it fail"); - } - return true; - }, - fullResyncPeriod, operationContext, eventListeners); - - Executable controllerRun = newControllerRun(controller); - assertDoesNotThrow(controllerRun); - countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); - controller.stop(); - // Then - assertThat(countDown.getCount()).isLessThanOrEqualTo(2); - } - - @Test - @DisplayName("Controller initialized with resync period should initialize resyncExecutor") - void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorNotShutdown() throws InterruptedException { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1L, operationContext, eventListeners); - Executable controllerRun = newControllerRun(controller); - assertDoesNotThrow(controllerRun); - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - // Then - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isFalse(); - } - - @Test - @DisplayName("Controller initialized with resync period should initialize resyncExecutor") - void testControllerRunWithResyncPeriodGreaterThanZeroAndExecutorForcedShutdown() throws InterruptedException { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1L, operationContext, eventListeners); - Executable controllerRun = newControllerRun(controller); - assertDoesNotThrow(controllerRun); - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - // Then - assertNotNull(resyncExecutor); - resyncExecutor.shutdown(); - assertThat(resyncExecutor.isShutdown()).isTrue(); - } - - @Test - @DisplayName("Controller initialized with resync period to 0 should initialize resyncExecutor") - void testControllerRunWithResyncPeriodToZero() throws InterruptedException { - // Given + When - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 0L, operationContext, eventListeners); - Thread controllerThread = newControllerThread(controller); - controllerThread.start(); - controller.stop(); - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isTrue(); - assertThat(resyncExecutor.isTerminated()).isTrue(); - } - - @Test - @DisplayName("Controller initialized with resync period should run, initialize resyncExecutor and resync at least a given number of times") - void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedException { - // Given + When - long fullResyncPeriod = 10L; - int numberOfResyncs = 10; - final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> {countDown.countDown(); return true;}, - fullResyncPeriod, operationContext, eventListeners); - - Executable controllerRun = newControllerRun(controller); - assertDoesNotThrow(controllerRun); - // We give an extra cycle to avoid clock inaccurracy interruptions - countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); - controller.stop(); - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isTrue(); - assertThat(countDown.getCount()).isLessThanOrEqualTo(1); - } - - @Test - @DisplayName("Controller initialized with resync period to 0 should run but never resync") - void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedException { - // Given + When - int count = 10; - final CountDownLatch countDown = new CountDownLatch(count); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> {countDown.countDown(); return true;}, - 0, operationContext, eventListeners); - Executable controllerRun = newControllerRun(controller); - assertDoesNotThrow(controllerRun); - countDown.await(1000, TimeUnit.MILLISECONDS); - controller.stop(); - // Then - ScheduledExecutorService resyncExecutor = controller.getResyncExecutor(); - assertNotNull(resyncExecutor); - assertThat(resyncExecutor.isShutdown()).isTrue(); - assertThat(countDown.getCount()).isEqualTo(count); - } - - private static class ThreadWrapper{ - public Thread thread; - public void interrupt() { - if( thread != null) { - thread.interrupt(); - } - } - } - - private Executable newControllerRun(Controller controller) { - return () -> { - Thread controllerThread = newControllerThread(controller); - controllerThread.start(); - }; - } - - private Thread newControllerThread(Controller controller) { - return new Thread(controller::run); - } - - @Test - @DisplayName("Controller schedules resync tasks with fixed delay") - void testControllerRunSchedulesResyncTaskWithFixedDelay() { - // Given - ScheduledExecutorService scheduledExecutorService = Mockito.mock(ScheduledExecutorService.class, Mockito.RETURNS_DEEP_STUBS); - Controller controller = new Controller<>(Pod.class, deltaFIFO, listerWatcher, - () -> true, - 1L, operationContext, eventListeners, scheduledExecutorService); - - // When - controller.scheduleResync(); - - // Then - verify(scheduledExecutorService, times(1)).scheduleWithFixedDelay(any(), eq(1L), eq(1L), any()); - } -} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListenerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListenerTest.java index 3edb5fd6527..9c9ebddb51e 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListenerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListenerTest.java @@ -54,13 +54,6 @@ public void onDelete(Pod obj, boolean deletedFinalStateUnknown) { listener.add(new ProcessorListener.UpdateNotification<>(null, pod)); listener.add(new ProcessorListener.DeleteNotification<>(pod)); - Thread listenerThread = new Thread(listener::run); - listenerThread.setDaemon(true); - listenerThread.start(); - - // Sleep 1 second for consuming notifications from queue - Thread.sleep(1000); - assertTrue(addNotificationReceived); assertTrue(updateNotificationReceived); assertTrue(deleteNotificationReceived); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessorTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessorTest.java index 728f89066a7..5816bb076e1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessorTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessorTest.java @@ -36,15 +36,13 @@ void testListenerAddition() throws InterruptedException { ExpectingNotificationHandler expectUpdateHandler = new ExpectingNotificationHandler<>(updateNotification); ExpectingNotificationHandler expectDeleteHandler = new ExpectingNotificationHandler<>(deleteNotification); - sharedProcessor.addAndStartListener(expectAddHandler); - sharedProcessor.addAndStartListener(expectUpdateHandler); - sharedProcessor.addAndStartListener(expectDeleteHandler); + sharedProcessor.addListener(expectAddHandler); + sharedProcessor.addListener(expectUpdateHandler); + sharedProcessor.addListener(expectDeleteHandler); sharedProcessor.distribute(addNotification, false); sharedProcessor.distribute(updateNotification, false); sharedProcessor.distribute(deleteNotification, false); - // Sleep for 1 second for distributing notifications - Thread.sleep(1000); assertTrue(expectAddHandler.isSatisfied()); assertTrue(expectUpdateHandler.isSatisfied()); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java new file mode 100644 index 00000000000..770d53e28ec --- /dev/null +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -0,0 +1,174 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.fabric8.kubernetes.client.informers.impl; + +import io.fabric8.kubernetes.api.model.Pod; +import io.fabric8.kubernetes.api.model.PodList; +import io.fabric8.kubernetes.client.dsl.base.OperationContext; +import io.fabric8.kubernetes.client.informers.ListerWatcher; +import io.fabric8.kubernetes.client.informers.SharedScheduler; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.DisplayName; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; +import static org.junit.Assert.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; + +class DefaultSharedIndexInformerResyncTest { + private abstract static class AbstractPodListerWatcher implements ListerWatcher {}; + private static final Long WAIT_TIME = 500L; + private final ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); + private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); + private SharedScheduler sharedScheduler = new SharedScheduler(); + DefaultSharedIndexInformer defaultSharedIndexInformer; + + private DefaultSharedIndexInformer createDefaultSharedIndexInformer(long resyncPeriod) { + defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Collections.emptyList(), sharedScheduler); + return defaultSharedIndexInformer; + } + + @AfterEach + void afterEach() { + if (defaultSharedIndexInformer != null) { + defaultSharedIndexInformer.stop(); + } + } + + @Test + @DisplayName("Controller initialized with resync period greater than zero should use provided resync period") + void testControllerCreationWithResyncPeriodMoreThanZero() { + // Given + When + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(1000L); + + // Then + assertEquals(1000L, controller.getFullResyncPeriod()); + } + + @Test + @DisplayName("Controller initialized with resync period less than zero should throw exception") + void testControllerCreationWithResyncPeriodLessThanZero() { + assertThrows(IllegalArgumentException.class, () -> createDefaultSharedIndexInformer(-1)); + } + + @Test + @DisplayName("Controller initialized with resync period 0 should use provided resync period") + void testControllerCreationWithResyncPeriodZero() { + // Given + When + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(0); + + // Then + assertEquals(0L, controller.getFullResyncPeriod()); + } + + @Test + @DisplayName("Controller stop shut downs/cancels all executor services") + void testStop() { + // Given + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(0); + controller.run(); + // When + controller.stop(); + // Then + assertThat(controller.isRunning()).isFalse(); + } + + @Test + @DisplayName("Controller initialized with resync period should initialize resyncExecutor") + void testControllerRunWithResyncPeriodGreaterThanZero() throws InterruptedException { + // Given + When + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(1); + controller.run(); + + assertNotNull(controller.getResyncFuture()); + + controller.stop(); + + assertNull(controller.getResyncFuture()); + } + + @Test + @DisplayName("Controller with resync function throwing exception") + void testControllerRunsResyncFunctionThrowingException() throws InterruptedException { + // Given + When + long fullResyncPeriod = 10L; + int numberOfResyncs = 10; + final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(fullResyncPeriod); + controller.scheduleResync(() -> { + countDown.countDown(); + if( countDown.getCount() == 2 ) { + throw new RuntimeException("make it fail"); + } + return true; + }); + countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); + controller.stop(); + // Then + assertThat(countDown.getCount()).isLessThanOrEqualTo(2); + } + + @Test + @DisplayName("Controller initialized with resync period to 0 should not initialize resyncExecutor") + void testControllerRunWithResyncPeriodToZero() throws InterruptedException { + // Given + When + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(0); + controller.run(); + + // Then + assertNull(controller.getResyncFuture()); + } + + @Test + @DisplayName("Controller initialized with resync period should run, initialize resyncExecutor and resync at least a given number of times") + void testControllerRunsReyncFunctionExpectedNumberOfTime() throws InterruptedException { + // Given + When + long fullResyncPeriod = 10L; + int numberOfResyncs = 10; + final CountDownLatch countDown = new CountDownLatch(numberOfResyncs); + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(fullResyncPeriod); + controller.scheduleResync(() -> {countDown.countDown(); return true;}); + // We give an extra cycle to avoid clock inaccurracy interruptions + countDown.await(WAIT_TIME, TimeUnit.MILLISECONDS); + controller.stop(); + // Then + assertNull(controller.getResyncFuture()); + assertThat(countDown.getCount()).isLessThanOrEqualTo(1); + } + + @Test + @DisplayName("Controller initialized with resync period to 0 should run but never resync") + void testControllerNeverRunsReyncFunctionWhenPeriodIsZero() throws InterruptedException { + // Given + When + int count = 10; + final CountDownLatch countDown = new CountDownLatch(count); + DefaultSharedIndexInformer controller = createDefaultSharedIndexInformer(0); + controller.scheduleResync(() -> {countDown.countDown(); return true;}); + countDown.await(1000, TimeUnit.MILLISECONDS); + controller.stop(); + // Then + assertNull(controller.getResyncFuture()); + assertThat(countDown.getCount()).isEqualTo(count); + } + +} diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/internal/UtilsTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/internal/UtilsTest.java index d253ac45cb7..98985454aa1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/internal/UtilsTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/internal/UtilsTest.java @@ -78,6 +78,8 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; +import java.util.concurrent.ThreadFactory; + import org.apache.commons.lang.SystemUtils; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.DisplayName; @@ -372,4 +374,12 @@ void testGetCommandPlatformPrefix() { assertEquals("-c", commandPrefix.get(1)); } } + + @Test + void testDaemonThreadFactory() { + ThreadFactory tf = Utils.daemonThreadFactory(this); + Thread t = tf.newThread(()->{}); + assertTrue(t.isDaemon()); + assertTrue(t.getName().startsWith(UtilsTest.class.getSimpleName())); + } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 6a4b5fa1911..6fc6febf70b 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -852,7 +852,6 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { @Test void testRunAfterStop() { SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 0); - podInformer.run(); podInformer.stop(); assertThrows(IllegalStateException.class, podInformer::run); } From b4b6e3ab74e9727940b5cc7e5f9f6e490857d532 Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Wed, 12 May 2021 14:35:44 -0400 Subject: [PATCH 2/9] Update kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java Co-authored-by: Rohan Kumar --- .../kubernetes/client/informers/cache/SharedProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java index 11a0ac2864e..c1c3062ef82 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java @@ -108,7 +108,7 @@ public boolean shouldResync() { public void stop() { lock.writeLock().lock(); try { - toClose.forEach(c->c.run()); + toClose.forEach(Runnable::run); toClose.clear(); syncingListeners = null; listeners = null; From 82b8932a04471121e953293544ecf93b5eca7454 Mon Sep 17 00:00:00 2001 From: Steven Hawkins Date: Wed, 12 May 2021 14:36:08 -0400 Subject: [PATCH 3/9] Update kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java Co-authored-by: Rohan Kumar --- .../kubernetes/client/informers/SharedSchedulerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java index 2963d7f399b..2af6d506d42 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java @@ -32,7 +32,7 @@ void testAutoShutdown() throws InterruptedException { SharedScheduler scheduler = new SharedScheduler(50); assertFalse(scheduler.hasExecutor()); CountDownLatch latch = new CountDownLatch(2); - ScheduledFuture future = scheduler.scheduleWithFixedDelay(()->latch.countDown(), 50, 50, TimeUnit.MILLISECONDS); + ScheduledFuture future = scheduler.scheduleWithFixedDelay(latch::countDown, 50, 50, TimeUnit.MILLISECONDS); assertTrue(scheduler.hasExecutor()); latch.await(); assertFalse(future.isDone()); From 750c9fa455c297b7eaaf3b746592658e906c9fa8 Mon Sep 17 00:00:00 2001 From: shawkins Date: Wed, 12 May 2021 14:49:48 -0400 Subject: [PATCH 4/9] switching to Awaitility --- kubernetes-client/pom.xml | 4 ++++ .../client/informers/SharedSchedulerTest.java | 13 ++++--------- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/kubernetes-client/pom.xml b/kubernetes-client/pom.xml index de37286fca3..2c703ed5031 100644 --- a/kubernetes-client/pom.xml +++ b/kubernetes-client/pom.xml @@ -255,6 +255,10 @@ assertj-core test + + org.awaitility + awaitility + diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java index 2af6d506d42..f08fb55f94f 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java @@ -25,6 +25,8 @@ import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.awaitility.Awaitility.await; + public class SharedSchedulerTest { @Test @@ -37,16 +39,9 @@ void testAutoShutdown() throws InterruptedException { latch.await(); assertFalse(future.isDone()); future.cancel(true); - for (int i = 0; i < 10; i++) { - if (!scheduler.hasExecutor()) { - break; - } - Thread.sleep(100); - } - // should be shutdown - assertFalse(scheduler.hasExecutor()); + await().atMost(1, TimeUnit.SECONDS).until(()->!scheduler.hasExecutor()); // should start again - future = scheduler.scheduleWithFixedDelay(()->latch.countDown(), 50, 50, TimeUnit.MILLISECONDS); + future = scheduler.scheduleWithFixedDelay(latch::countDown, 50, 50, TimeUnit.MILLISECONDS); assertTrue(scheduler.hasExecutor()); } } From ae2f1e461260e41f3468ddb623868ad1e3b10e94 Mon Sep 17 00:00:00 2001 From: shawkins Date: Wed, 12 May 2021 15:08:03 -0400 Subject: [PATCH 5/9] removing code smells --- .../io/fabric8/kubernetes/client/informers/ResyncRunnable.java | 2 +- .../io/fabric8/kubernetes/client/informers/SharedScheduler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java index 90fc3843224..d4c78da48c9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java @@ -42,7 +42,7 @@ public void run() { log.debug("ResyncRunnable#resync .. .."); } - // TODO if there is a concern that this processing could overwhelm the single + // if there is a concern that this processing could overwhelm the single // thread, then hand this off to the common pool if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) { if (log.isDebugEnabled()) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java index 82eeb2607a5..bc280bd4246 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java @@ -28,7 +28,7 @@ */ public class SharedScheduler { - public static long DEFAULT_TTL_MILLIS = TimeUnit.SECONDS.toMillis(10); + public static final long DEFAULT_TTL_MILLIS = TimeUnit.SECONDS.toMillis(10); private final long ttlMillis; private ScheduledThreadPoolExecutor executor; From ae0ff7c810d81c60283bdd1ea96d21ee5b0914e0 Mon Sep 17 00:00:00 2001 From: shawkins Date: Fri, 21 May 2021 09:47:08 -0400 Subject: [PATCH 6/9] rebased and updated the handler threading to reuse the informer executor --- .../informers/ResourceEventHandler.java | 7 --- .../client/informers/SerialExecutor.java | 57 +++++++++++++++++++ .../informers/SharedInformerFactory.java | 2 +- .../client/informers/SharedScheduler.java | 6 +- .../informers/cache/ProcessorListener.java | 19 ++----- .../informers/cache/SharedProcessor.java | 40 ++++++------- .../impl/DefaultSharedIndexInformer.java | 7 ++- .../DefaultSharedIndexInformerResyncTest.java | 2 +- 8 files changed, 92 insertions(+), 48 deletions(-) create mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java index 39cbf005a64..16f4a66bc4f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResourceEventHandler.java @@ -55,11 +55,4 @@ public interface ResourceEventHandler { */ void onDelete(T obj, boolean deletedFinalStateUnknown); - /** - * If the event handler should be called in a separate thread - * @return - */ - default boolean isCalledAsync() { - return true; - } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java new file mode 100644 index 00000000000..c079b9e3df7 --- /dev/null +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java @@ -0,0 +1,57 @@ +/** + * Copyright (C) 2015 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.fabric8.kubernetes.client.informers; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.Executor; + +/** + * See {@link Executor} docs + * + *
Effectively creates a derived single thread executor + */ +public class SerialExecutor implements Executor { + final Queue tasks = new ArrayDeque(); + final Executor executor; + Runnable active; + + public SerialExecutor(Executor executor) { + this.executor = executor; + } + + public synchronized void execute(final Runnable r) { + tasks.offer(new Runnable() { + public void run() { + try { + r.run(); + } finally { + scheduleNext(); + } + } + }); + if (active == null) { + scheduleNext(); + } + } + + protected synchronized void scheduleNext() { + if ((active = tasks.poll()) != null) { + executor.execute(active); + } + } +} \ No newline at end of file diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index 0963777e34d..d60f0699ee9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -231,7 +231,7 @@ private synchronized context = context.withIsNamespaceConfiguredFromGlobalConfig(false); } } - SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners, resyncExecutor); + SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners, informerExecutor, resyncExecutor); this.informers.put(getInformerKey(context), informer); return informer; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java index bc280bd4246..b5a3a786872 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java @@ -24,7 +24,11 @@ /** * Maintains a single thread daemon scheduler. - * It is not intended for long-running tasks + * + *
It is not intended for long-running tasks, + * but it does not assume the task can be handed off to the common pool + * + *
This is very similar to the CompletableFuture.Delayer, but provides a scheduler method */ public class SharedScheduler { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java index a6bc8f2eb75..9744b594ff5 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/ProcessorListener.java @@ -21,7 +21,6 @@ import java.time.ZonedDateTime; import java.time.temporal.ChronoUnit; -import java.util.concurrent.Executor; /** * ProcessorListener implements Runnable interface. It's supposed to run in background @@ -39,28 +38,20 @@ public class ProcessorListener { private long resyncPeriodInMillis; private ZonedDateTime nextResync; private ResourceEventHandler handler; - private Executor executor; public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMillis) { - this(handler, resyncPeriodInMillis, Runnable::run); - } - - public ProcessorListener(ResourceEventHandler handler, long resyncPeriodInMillis, Executor executorService) { this.resyncPeriodInMillis = resyncPeriodInMillis; this.handler = handler; - this.executor = executorService; determineNextResync(ZonedDateTime.now()); } public void add(Notification notification) { - executor.execute(() -> { - try { - notification.handle(handler); - } catch (Exception ex) { - log.error("Failed invoking {} event handler: {}", handler, ex.getMessage(), ex); - } - }); + try { + notification.handle(handler); + } catch (Exception ex) { + log.error("Failed invoking {} event handler: {}", handler, ex.getMessage(), ex); + } } public void determineNextResync(ZonedDateTime now) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java index c1c3062ef82..c8c37ebedca 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/SharedProcessor.java @@ -16,14 +16,11 @@ package io.fabric8.kubernetes.client.informers.cache; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; -import io.fabric8.kubernetes.client.utils.Utils; import java.time.ZonedDateTime; import java.util.ArrayList; import java.util.List; import java.util.concurrent.Executor; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -33,18 +30,23 @@ * * This has been taken from official-client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java * - *
Modified to not submit tasks directly + *
Modified to simplify threading */ public class SharedProcessor { private ReadWriteLock lock = new ReentrantReadWriteLock(); - private List toClose = new ArrayList<>(); private List> listeners; private List> syncingListeners; - + private final Executor executor; + public SharedProcessor() { + this(Runnable::run); + } + + public SharedProcessor(Executor executor) { this.listeners = new ArrayList<>(); this.syncingListeners = new ArrayList<>(); + this.executor = executor; } /** @@ -69,20 +71,23 @@ public void addListener(final ProcessorListener processorListener) { * @param isSync whether in sync or not */ public void distribute(ProcessorListener.Notification obj, boolean isSync) { + // obtain the list to call outside before submitting lock.readLock().lock(); + List> toCall; try { if (isSync) { - for (ProcessorListener listener : syncingListeners) { - listener.add(obj); - } + toCall = new ArrayList<>(syncingListeners); } else { - for (ProcessorListener listener : listeners) { - listener.add(obj); - } + toCall = new ArrayList<>(listeners); } } finally { lock.readLock().unlock(); } + executor.execute(() -> { + for (ProcessorListener listener : toCall) { + listener.add(obj); + } + }); } public boolean shouldResync() { @@ -108,8 +113,6 @@ public boolean shouldResync() { public void stop() { lock.writeLock().lock(); try { - toClose.forEach(Runnable::run); - toClose.clear(); syncingListeners = null; listeners = null; } finally { @@ -120,14 +123,7 @@ public void stop() { public ProcessorListener addProcessorListener(ResourceEventHandler handler, long resyncPeriodMillis) { lock.writeLock().lock(); try { - Executor executor = Runnable::run; - if (handler.isCalledAsync()) { - ExecutorService service = Executors.newSingleThreadExecutor(Utils.daemonThreadFactory(handler)); - executor = service; - toClose.add(service::shutdownNow); - } - - ProcessorListener listener = new ProcessorListener<>(handler, resyncPeriodMillis, executor); + ProcessorListener listener = new ProcessorListener<>(handler, resyncPeriodMillis); addListener(listener); return listener; } finally { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 2cb6306a5ce..6e3d28cad95 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -21,6 +21,7 @@ import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.ResyncRunnable; +import io.fabric8.kubernetes.client.informers.SerialExecutor; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; import io.fabric8.kubernetes.client.informers.SharedScheduler; @@ -36,6 +37,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -72,7 +74,7 @@ public class DefaultSharedIndexInformer resyncFuture; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Collection eventListeners, SharedScheduler resyncExecutor) { + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Collection eventListeners, Executor informerExecutor, SharedScheduler resyncExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); } @@ -82,7 +84,8 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.resyncExecutor = resyncExecutor; this.apiTypeClass = apiTypeClass; - this.processor = new SharedProcessor<>(); + // reuse the informer executor, but ensure serial processing + this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor)); this.indexer = new Cache<>(); this.indexer.setIsRunning(this::isRunning); diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index 770d53e28ec..5f3cc09b982 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -44,7 +44,7 @@ private abstract static class AbstractPodListerWatcher implements ListerWatcher< DefaultSharedIndexInformer defaultSharedIndexInformer; private DefaultSharedIndexInformer createDefaultSharedIndexInformer(long resyncPeriod) { - defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Collections.emptyList(), sharedScheduler); + defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Collections.emptyList(), Runnable::run, sharedScheduler); return defaultSharedIndexInformer; } From 5da3a98b37307176a9ecbb2658435ded760293c2 Mon Sep 17 00:00:00 2001 From: shawkins Date: Fri, 21 May 2021 10:16:57 -0400 Subject: [PATCH 7/9] fix #3143 adding a more general listener onException also adding a method to get the api class from the informer --- CHANGELOG.md | 4 ++- .../client/informers/SerialExecutor.java | 14 ++++----- .../client/informers/SharedInformer.java | 5 +++ .../SharedInformerEventListener.java | 4 +++ .../informers/SharedInformerFactory.java | 16 ++++++++-- .../impl/DefaultSharedIndexInformer.java | 31 ++++++++----------- .../client/informers/SharedSchedulerTest.java | 2 +- .../DefaultSharedIndexInformerResyncTest.java | 3 +- 8 files changed, 46 insertions(+), 33 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index f3a7f1213f7..006a151acfa 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,7 +5,9 @@ #### Bugs #### Improvements -* Fix #3135 added mock crud support for patch status, and will return exceptions for unsupported patch types +* Fix #3135: added mock crud support for patch status, and will return exceptions for unsupported patch types +* Fix #3072: various changes to refine how threads are handled by informers. Note that the SharedInformer.run call is now blocking when starting the informer. +* Fix #3143: a new SharedInformerEventListener.onException(SharedIndexInformer, Exception) method is available to determine which informer could not start. #### Dependency Upgrade diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java index c079b9e3df7..3aee8c27a2e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java @@ -26,7 +26,7 @@ *
Effectively creates a derived single thread executor */ public class SerialExecutor implements Executor { - final Queue tasks = new ArrayDeque(); + final Queue tasks = new ArrayDeque<>(); final Executor executor; Runnable active; @@ -35,13 +35,11 @@ public SerialExecutor(Executor executor) { } public synchronized void execute(final Runnable r) { - tasks.offer(new Runnable() { - public void run() { - try { - r.run(); - } finally { - scheduleNext(); - } + tasks.offer(() -> { + try { + r.run(); + } finally { + scheduleNext(); } }); if (active == null) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java index 8785dbb567f..03becd2c52a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformer.java @@ -66,4 +66,9 @@ public interface SharedInformer { * Return true if the informer is running */ boolean isRunning(); + + /** + * Return the class this informer is watching + */ + Class getApiTypeClass(); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java index 2b345bf6ac9..0b4702e3dd0 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java @@ -17,4 +17,8 @@ public interface SharedInformerEventListener { void onException(Exception exception); + + default void onException(SharedIndexInformer informer, Exception e) { + onException(e); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index d60f0699ee9..dc37d1cf7c1 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -40,6 +40,8 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import okhttp3.OkHttpClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * SharedInformerFactory class constructs and caches informers for api types. @@ -48,6 +50,7 @@ * which is ported from offical go client https://github.com/kubernetes/client-go/blob/master/informers/factory.go */ public class SharedInformerFactory extends BaseOperation { + private static final Logger log = LoggerFactory.getLogger(SharedInformerFactory.class); private final Map informers = new HashMap<>(); private final Map startedInformers = new HashMap<>(); @@ -231,7 +234,7 @@ private synchronized context = context.withIsNamespaceConfiguredFromGlobalConfig(false); } } - SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, eventListeners, informerExecutor, resyncExecutor); + SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor, resyncExecutor); this.informers.put(getInformerKey(context), informer); return informer; } @@ -285,8 +288,15 @@ public synchronized void startAllRegisteredInformers() { if (!informerExecutor.isShutdown()) { informers.forEach( - (informerType, informer) -> - startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit(informer::run))); + (informerType, informer) -> startedInformers.computeIfAbsent(informerType, + key -> informerExecutor.submit(() -> { + try { + informer.run(); + } catch (RuntimeException e) { + this.eventListeners.forEach(listener -> listener.onException(informer, e)); + log.warn("Informer start did not complete", e); + } + }))); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 6e3d28cad95..2ce034e32ff 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -23,7 +23,6 @@ import io.fabric8.kubernetes.client.informers.ResyncRunnable; import io.fabric8.kubernetes.client.informers.SerialExecutor; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.SharedInformerEventListener; import io.fabric8.kubernetes.client.informers.SharedScheduler; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; @@ -34,7 +33,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.Collection; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -58,7 +56,6 @@ public class DefaultSharedIndexInformer eventListeners; private final SharedScheduler resyncExecutor; private final Reflector reflector; private final Class apiTypeClass; @@ -74,13 +71,12 @@ public class DefaultSharedIndexInformer resyncFuture; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Collection eventListeners, Executor informerExecutor, SharedScheduler resyncExecutor) { + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor, SharedScheduler resyncExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); } this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; - this.eventListeners = eventListeners; this.resyncExecutor = resyncExecutor; this.apiTypeClass = apiTypeClass; @@ -136,8 +132,8 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon } List objectList = this.indexer.list(); - for (Object item : objectList) { - listener.add(new ProcessorListener.AddNotification(item)); + for (T item : objectList) { + listener.add(new ProcessorListener.AddNotification<>(item)); } } @@ -155,17 +151,11 @@ public void run() { return; } - try { - log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); + log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); - scheduleResync(processor::shouldResync); - - reflector.listSyncAndWatch(); - } catch (RuntimeException exception) { - log.warn("Informer start did not complete", exception); - this.eventListeners.forEach(listener -> listener.onException(exception)); - throw exception; - } + scheduleResync(processor::shouldResync); + + reflector.listSyncAndWatch(); // stop called while run is called could be ineffective, check for it afterwards synchronized (this) { if (stopped) { @@ -200,7 +190,7 @@ public void addIndexers(Map>> indexers) { } @Override - public Indexer getIndexer() { + public Indexer getIndexer() { return this.indexer; } @@ -237,5 +227,10 @@ public long getFullResyncPeriod() { ScheduledFuture getResyncFuture() { return resyncFuture; } + + @Override + public Class getApiTypeClass() { + return apiTypeClass; + } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java index f08fb55f94f..4cd419c1179 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java @@ -27,7 +27,7 @@ import static org.awaitility.Awaitility.await; -public class SharedSchedulerTest { +class SharedSchedulerTest { @Test void testAutoShutdown() throws InterruptedException { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index 5f3cc09b982..ee1b0875805 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -25,7 +25,6 @@ import org.junit.jupiter.api.Test; import org.mockito.Mockito; -import java.util.Collections; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -44,7 +43,7 @@ private abstract static class AbstractPodListerWatcher implements ListerWatcher< DefaultSharedIndexInformer defaultSharedIndexInformer; private DefaultSharedIndexInformer createDefaultSharedIndexInformer(long resyncPeriod) { - defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Collections.emptyList(), Runnable::run, sharedScheduler); + defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run, sharedScheduler); return defaultSharedIndexInformer; } From 76c6c9d4973ea7d1429e0d2835735772ba5b889a Mon Sep 17 00:00:00 2001 From: shawkins Date: Mon, 24 May 2021 08:27:03 -0400 Subject: [PATCH 8/9] further consolidating threading concerns moved some logic into util, creating a common scheduler and thread pool --- .../client/DefaultKubernetesClient.java | 4 +- .../dsl/internal/AbstractWatchManager.java | 57 +++++++------- .../client/dsl/internal/NamedRunnable.java | 46 ------------ .../client/dsl/internal/WatchHTTPManager.java | 46 ++---------- .../internal/WatcherWebSocketListener.java | 47 +++--------- .../client/informers/ResyncRunnable.java | 10 +-- .../informers/SharedInformerFactory.java | 15 +++- .../impl/DefaultSharedIndexInformer.java | 23 +++--- .../CachedSingleThreadScheduler.java} | 26 +++++-- .../{informers => utils}/SerialExecutor.java | 4 +- .../kubernetes/client/utils/Utils.java | 36 ++++++++- .../internal/AbstractWatchManagerTest.java | 75 ------------------- .../DefaultSharedIndexInformerResyncTest.java | 4 +- .../CachedSingleThreadSchedulerTest.java} | 9 +-- 14 files changed, 136 insertions(+), 266 deletions(-) delete mode 100644 kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java rename kubernetes-client/src/main/java/io/fabric8/kubernetes/client/{informers/SharedScheduler.java => utils/CachedSingleThreadScheduler.java} (80%) rename kubernetes-client/src/main/java/io/fabric8/kubernetes/client/{informers => utils}/SerialExecutor.java (96%) rename kubernetes-client/src/test/java/io/fabric8/kubernetes/client/{informers/SharedSchedulerTest.java => utils/CachedSingleThreadSchedulerTest.java} (90%) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java index a807bee5931..6b6016d9a1a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/DefaultKubernetesClient.java @@ -122,13 +122,11 @@ import io.fabric8.kubernetes.client.extended.run.RunOperations; import io.fabric8.kubernetes.client.informers.SharedInformerFactory; import io.fabric8.kubernetes.client.utils.Serialization; -import io.fabric8.kubernetes.client.utils.Utils; import java.io.InputStream; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import okhttp3.OkHttpClient; @@ -577,7 +575,7 @@ public AutoscalingAPIGroupDSL autoscaling() { */ @Override public SharedInformerFactory informers() { - return new SharedInformerFactory(Executors.newCachedThreadPool(Utils.daemonThreadFactory(this)), httpClient, getConfiguration()); + return new SharedInformerFactory(httpClient, getConfiguration()); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 0ebf076dafd..e54f54a9689 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -16,7 +16,6 @@ package io.fabric8.kubernetes.client.dsl.internal; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; @@ -27,8 +26,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -46,11 +44,12 @@ public abstract class AbstractWatchManager implements Watch { private final int reconnectInterval; private final int maxIntervalExponent; final AtomicInteger currentReconnectAttempt; - private final ScheduledExecutorService executorService; + private ScheduledFuture reconnectAttempt; private final RequestBuilder requestBuilder; protected ClientRunner runner; + private final AtomicBoolean reconnectPending = new AtomicBoolean(false); AbstractWatchManager( Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder @@ -62,7 +61,6 @@ public abstract class AbstractWatchManager implements Watch { this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); - this.executorService = Executors.newSingleThreadScheduledExecutor(Utils.daemonThreadFactory(AbstractWatchManager.this)); this.requestBuilder = requestBuilder; } @@ -90,30 +88,35 @@ final void closeEvent() { watcher.onClose(); } - final void closeExecutorService() { - if (executorService != null && !executorService.isShutdown()) { - logger.debug("Closing ExecutorService"); - try { - executorService.shutdown(); - if (!executorService.awaitTermination(1, TimeUnit.SECONDS)) { - logger.warn("Executor didn't terminate in time after shutdown in close(), killing it."); - executorService.shutdownNow(); - } - } catch (Exception t) { - throw KubernetesClientException.launderThrowable(t); - } + final synchronized void cancelReconnect() { + if (reconnectAttempt != null) { + reconnectAttempt.cancel(true); } } - - void submit(Runnable task) { - if (!executorService.isShutdown()) { - executorService.submit(task); + + void scheduleReconnect(Runnable command, boolean shouldBackoff) { + if (!reconnectPending.compareAndSet(false, true)) { + logger.debug("Reconnect already scheduled"); + return; } - } - - void schedule(Runnable command, long delay, TimeUnit timeUnit) { - if (!executorService.isShutdown()) { - executorService.schedule(command, delay, timeUnit); + + logger.debug("Scheduling reconnect task"); + + long delay = shouldBackoff + ? nextReconnectInterval() + : 0; + + synchronized (this) { + reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> { + try { + command.run(); + } finally { + reconnectPending.set(false); + } + }, delay, TimeUnit.MILLISECONDS); + if (forceClosed.get()) { + cancelReconnect(); + } } } @@ -179,7 +182,7 @@ public void close() { logger.debug("Force closing the watch {}", this); closeEvent(); runner.close(); - closeExecutorService(); + cancelReconnect(); } @FunctionalInterface diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java deleted file mode 100644 index 3928cde14d1..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamedRunnable.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal; - -import java.util.Objects; - -abstract class NamedRunnable implements Runnable { - private final String name; - - NamedRunnable(String name) { - this.name = Objects.requireNonNull(name); - } - - private void tryToSetName(String value) { - try { - Thread.currentThread().setName(value); - } catch (SecurityException ignored) { - // Ignored - } - } - - public final void run() { - String oldName = Thread.currentThread().getName(); - tryToSetName(this.name + "|" + oldName); - try { - execute(); - } finally { - tryToSetName(oldName); - } - } - - protected abstract void execute(); -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index 0ab76a81497..a21bb6e4858 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -18,9 +18,7 @@ import java.io.IOException; import java.net.MalformedURLException; import java.util.List; -import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResource; @@ -96,7 +94,6 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) { private abstract static class HTTPClientRunner extends AbstractWatchManager.ClientRunner { private final AbstractWatchManager manager; - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); public HTTPClientRunner(OkHttpClient client, AbstractWatchManager manager) { super(client); @@ -155,43 +152,16 @@ private void scheduleReconnect(boolean shouldBackoff) { return; } - logger.debug("Submitting reconnect task to the executor"); - - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor. - manager.submit(() -> { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } + manager.scheduleReconnect(() -> { try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - - long delay = shouldBackoff - ? manager.nextReconnectInterval() - : 0; - - manager.schedule(() -> { - try { - manager.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - close(); - manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); - } - }, delay, TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - // This is a standard exception if we close the scheduler. We should not print it - if (!manager.isForceClosed()) { - logger.error("Exception in reconnect", e); - } - reconnectPending.set(false); + manager.runWatch(); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(); + manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); } - }); + }, shouldBackoff); } public void onMessage(String messageSource) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index b42dccab410..617cdb1d9ea 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -16,8 +16,6 @@ package io.fabric8.kubernetes.client.dsl.internal; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -43,7 +41,7 @@ abstract class WatcherWebSocketListener extends WebSocketListener { * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. */ private final AtomicBoolean started = new AtomicBoolean(false); - private final AtomicBoolean reconnectPending = new AtomicBoolean(false); + /** * Blocking queue for startup exceptions. */ @@ -152,40 +150,17 @@ public void onClosed(WebSocket webSocket, int code, String reason) { } private void scheduleReconnect() { - logger.debug("Submitting reconnect task to the executor"); - // make sure that whichever thread calls this method, the tasks are - // performed serially in the executor - manager.submit(new NamedRunnable("scheduleReconnect") { - @Override - public void execute() { - if (!reconnectPending.compareAndSet(false, true)) { - logger.debug("Reconnect already scheduled"); - return; - } + webSocketRef.set(null); + manager.scheduleReconnect(() -> { + try { + manager.runWatch(); + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); webSocketRef.set(null); - try { - // actual reconnect only after the back-off time has passed, without - // blocking the thread - logger.debug("Scheduling reconnect task"); - manager.schedule(new NamedRunnable("reconnectAttempt") { - @Override - public void execute() { - try { - manager.runWatch(); - reconnectPending.set(false); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - manager.close(); - } - } - }, manager.nextReconnectInterval(), TimeUnit.MILLISECONDS); - } catch (RejectedExecutionException e) { - reconnectPending.set(false); - } + manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); + manager.close(); } - }); + }, true); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java index d4c78da48c9..bd0730862b9 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/ResyncRunnable.java @@ -19,7 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.function.BooleanSupplier; +import java.util.function.Supplier; /** * Calls the resync function of store interface which is always implemented @@ -30,9 +30,9 @@ public class ResyncRunnable implements Runnable { private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class); private Store store; - private BooleanSupplier shouldResyncFunc; + private Supplier shouldResyncFunc; - public ResyncRunnable(Store store, BooleanSupplier shouldResyncFunc) { + public ResyncRunnable(Store store, Supplier shouldResyncFunc) { this.store = store; this.shouldResyncFunc = shouldResyncFunc; } @@ -42,9 +42,7 @@ public void run() { log.debug("ResyncRunnable#resync .. .."); } - // if there is a concern that this processing could overwhelm the single - // thread, then hand this off to the common pool - if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) { + if (shouldResyncFunc == null || shouldResyncFunc.get()) { if (log.isDebugEnabled()) { log.debug("ResyncRunnable#force resync"); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index dc37d1cf7c1..f2bf14ca1d6 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -56,12 +56,21 @@ public class SharedInformerFactory extends BaseOperation { private final Map startedInformers = new HashMap<>(); private final ExecutorService informerExecutor; - private final SharedScheduler resyncExecutor = new SharedScheduler(); private final BaseOperation baseOperation; private final ConcurrentLinkedQueue eventListeners = new ConcurrentLinkedQueue<>(); + private boolean allowShutdown = true; + + public SharedInformerFactory(OkHttpClient okHttpClient, Config configuration) { + // ideally this should be bounded. The current implication is that there + // can be 1 thread used (not dedicated to) per informer - which + // could be problematic for a large number of informers. however + // there is already a superceding issue there of thread utilization by okhttp + this(Utils.getCommonExecutorSerive(), okHttpClient, configuration); + this.allowShutdown = false; + } /** * Constructor with thread pool specified. * @@ -234,7 +243,7 @@ private synchronized context = context.withIsNamespaceConfiguredFromGlobalConfig(false); } } - SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor, resyncExecutor); + SharedIndexInformer informer = new DefaultSharedIndexInformer<>(apiTypeClass, listerWatcher, resyncPeriodInMillis, context, informerExecutor); this.informers.put(getInformerKey(context), informer); return informer; } @@ -322,7 +331,7 @@ public synchronized void stopAllRegisteredInformers(boolean shutDownThreadPool) informer.stop(); } }); - if (shutDownThreadPool) { + if (shutDownThreadPool && allowShutdown) { informerExecutor.shutdown(); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index 2ce034e32ff..e44bf825633 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -21,15 +21,15 @@ import io.fabric8.kubernetes.client.informers.ListerWatcher; import io.fabric8.kubernetes.client.informers.ResourceEventHandler; import io.fabric8.kubernetes.client.informers.ResyncRunnable; -import io.fabric8.kubernetes.client.informers.SerialExecutor; import io.fabric8.kubernetes.client.informers.SharedIndexInformer; -import io.fabric8.kubernetes.client.informers.SharedScheduler; import io.fabric8.kubernetes.client.informers.cache.Cache; import io.fabric8.kubernetes.client.informers.cache.Indexer; import io.fabric8.kubernetes.client.informers.cache.ProcessorListener; import io.fabric8.kubernetes.client.informers.cache.ProcessorStore; import io.fabric8.kubernetes.client.informers.cache.Reflector; import io.fabric8.kubernetes.client.informers.cache.SharedProcessor; +import io.fabric8.kubernetes.client.utils.SerialExecutor; +import io.fabric8.kubernetes.client.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,8 +39,8 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.function.BooleanSupplier; import java.util.function.Function; +import java.util.function.Supplier; public class DefaultSharedIndexInformer> implements SharedIndexInformer { private static final Logger log = LoggerFactory.getLogger(DefaultSharedIndexInformer.class); @@ -56,30 +56,27 @@ public class DefaultSharedIndexInformer reflector; private final Class apiTypeClass; - private final ProcessorStore processorStore; - - private Cache indexer; - - private SharedProcessor processor; + private final Cache indexer; + private final SharedProcessor processor; + private final Executor informerExecutor; private AtomicBoolean started = new AtomicBoolean(); private volatile boolean stopped = false; private ScheduledFuture resyncFuture; - public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor, SharedScheduler resyncExecutor) { + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); } this.resyncCheckPeriodMillis = resyncPeriod; this.defaultEventHandlerResyncPeriod = resyncPeriod; - this.resyncExecutor = resyncExecutor; this.apiTypeClass = apiTypeClass; + this.informerExecutor = informerExecutor; // reuse the informer executor, but ensure serial processing this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor)); this.indexer = new Cache<>(); @@ -210,11 +207,11 @@ public boolean isRunning() { return !stopped && started.get() && reflector.isRunning(); } - synchronized void scheduleResync(BooleanSupplier resyncFunc) { + synchronized void scheduleResync(Supplier resyncFunc) { // schedule the resync runnable if (resyncCheckPeriodMillis > 0) { ResyncRunnable resyncRunnable = new ResyncRunnable<>(processorStore, resyncFunc); - resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS); + resyncFuture = Utils.scheduleAtFixedRate(informerExecutor, resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS); } else { log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java similarity index 80% rename from kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java rename to kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java index b5a3a786872..c813f76fef2 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedScheduler.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadScheduler.java @@ -14,34 +14,33 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; - -import io.fabric8.kubernetes.client.utils.Utils; +package io.fabric8.kubernetes.client.utils; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; /** - * Maintains a single thread daemon scheduler. + * Maintains a single thread daemon scheduler, which will terminate the thread + * when not in use. * *
It is not intended for long-running tasks, * but it does not assume the task can be handed off to the common pool * *
This is very similar to the CompletableFuture.Delayer, but provides a scheduler method */ -public class SharedScheduler { +public class CachedSingleThreadScheduler { public static final long DEFAULT_TTL_MILLIS = TimeUnit.SECONDS.toMillis(10); private final long ttlMillis; private ScheduledThreadPoolExecutor executor; - public SharedScheduler() { + public CachedSingleThreadScheduler() { this(DEFAULT_TTL_MILLIS); } - public SharedScheduler(long ttlMillis) { + public CachedSingleThreadScheduler(long ttlMillis) { this.ttlMillis = ttlMillis; } @@ -49,13 +48,24 @@ public synchronized ScheduledFuture scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) { + startExecutor(); + return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); + } + + public synchronized ScheduledFuture schedule(Runnable command, + long delay, + TimeUnit unit) { + startExecutor(); + return executor.schedule(command, delay, unit); + } + + private void startExecutor() { if (executor == null) { // start the executor and add a ttl task executor = new ScheduledThreadPoolExecutor(1, Utils.daemonThreadFactory(this)); executor.setRemoveOnCancelPolicy(true); executor.scheduleWithFixedDelay(this::shutdownCheck, ttlMillis, ttlMillis, TimeUnit.MILLISECONDS); } - return executor.scheduleWithFixedDelay(command, initialDelay, delay, unit); } /** diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java similarity index 96% rename from kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java rename to kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java index 3aee8c27a2e..a167361f64f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SerialExecutor.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/SerialExecutor.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; +package io.fabric8.kubernetes.client.utils; import java.util.ArrayDeque; import java.util.Queue; @@ -52,4 +52,4 @@ protected synchronized void scheduleNext() { executor.execute(active); } } -} \ No newline at end of file +} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 87db484be71..26fb4d556c7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -41,8 +41,10 @@ import java.util.Optional; import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.function.Function; @@ -59,6 +61,9 @@ public class Utils { public static final String PATH_WINDOWS = "Path"; public static final String PATH_UNIX = "PATH"; private static final Random random = new Random(); + + private static final ExecutorService SHARED_POOL = Executors.newCachedThreadPool(); + private static final CachedSingleThreadScheduler SHARED_SCHEDULER = new CachedSingleThreadScheduler(); private Utils() { } @@ -458,17 +463,46 @@ private static String getOperatingSystemFromSystemProperty() { * name based upon the object passed in. */ public static ThreadFactory daemonThreadFactory(Object forObject) { + String name = forObject.getClass().getSimpleName() + "-" + System.identityHashCode(forObject); + return daemonThreadFactory(name); + } + + static ThreadFactory daemonThreadFactory(String name) { return new ThreadFactory() { ThreadFactory threadFactory = Executors.defaultThreadFactory(); @Override public Thread newThread(Runnable r) { Thread ret = threadFactory.newThread(r); - ret.setName(forObject.getClass().getSimpleName() + "-" + System.identityHashCode(forObject) + "-" + ret.getName()); + ret.setName(name + "-" + ret.getName()); ret.setDaemon(true); return ret; } }; } + + /** + * Schedule a task to run in the given {@link Executor} - which should run the task in a different thread as to not + * hold the scheduling thread + */ + public static ScheduledFuture schedule(Executor executor, Runnable command, long delay, TimeUnit unit) { + return SHARED_SCHEDULER.schedule(() -> executor.execute(command), delay, unit); + } + /** + * Schedule a repeated task to run in the given {@link Executor} - which should run the task in a different thread as to not + * hold the scheduling thread + */ + public static ScheduledFuture scheduleAtFixedRate(Executor executor, Runnable command, long initialDelay, long delay, TimeUnit unit) { + // because of the hand-off to the other executor, there's no difference between rate and delay + return SHARED_SCHEDULER.scheduleWithFixedDelay(() -> executor.execute(command), initialDelay, delay, unit); + } + + /** + * Get the common executor service - callers should not shutdown this service + */ + public static ExecutorService getCommonExecutorSerive() { + return SHARED_POOL; + } + } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index a1a482b2ab8..ecd15b2a5fa 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -29,18 +29,15 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; class AbstractWatchManagerTest { @@ -98,78 +95,6 @@ void closeWebSocket() { verify(webSocket, times(1)).close(1000, null); } - @Test - @DisplayName("closeExecutorService, with graceful termination") - void closeExecutorServiceGracefully() throws InterruptedException{ - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(true); - // When - awm.closeExecutorService(); - // Then - verify(executorService, times(1)).shutdown(); - verify(executorService, times(0)).shutdownNow(); - } - - @Test - @DisplayName("closeExecutorService, with shutdownNow") - void closeExecutorServiceNow() throws InterruptedException { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.awaitTermination(1, TimeUnit.SECONDS)).thenReturn(false); - // When - awm.closeExecutorService(); - // Then - verify(executorService, times(1)).shutdown(); - verify(executorService, times(1)).shutdownNow(); - } - - @Test - @DisplayName("submit, executor not shutdown, should submit") - void submitWhenIsNotShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - // When - awm.submit(() -> {}); - // Then - verify(executorService, times(1)).submit(any(Runnable.class)); - } - - @Test - @DisplayName("submit, executor shutdown, should NOT submit") - void submitWhenIsShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.isShutdown()).thenReturn(true); - // When - awm.submit(() -> {}); - // Then - verify(executorService, times(0)).submit(any(Runnable.class)); - } - - @Test - @DisplayName("schedule, executor not shutdown, should submit") - void scheduleWhenIsNotShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - // When - awm.schedule(() -> {}, 0, TimeUnit.SECONDS); - // Then - verify(executorService, times(1)).schedule(any(Runnable.class), anyLong(), any()); - } - - @Test - @DisplayName("schedule, executor shutdown, should NOT submit") - void scheduleWhenIsShutdown() { - // Given - final WatchManager awm = withDefaultWatchManager(null); - when(executorService.isShutdown()).thenReturn(true); - // When - awm.schedule(() -> {}, 0, TimeUnit.SECONDS); - // Then - verify(executorService, times(0)).schedule(any(Runnable.class), anyLong(), any()); - } - @Test @DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit") void nextReconnectInterval() { diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java index ee1b0875805..fa438e99eb1 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformerResyncTest.java @@ -19,7 +19,6 @@ import io.fabric8.kubernetes.api.model.PodList; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.informers.ListerWatcher; -import io.fabric8.kubernetes.client.informers.SharedScheduler; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -39,11 +38,10 @@ private abstract static class AbstractPodListerWatcher implements ListerWatcher< private static final Long WAIT_TIME = 500L; private final ListerWatcher listerWatcher = Mockito.mock(AbstractPodListerWatcher.class, Mockito.RETURNS_DEEP_STUBS); private final OperationContext operationContext = Mockito.mock(OperationContext.class, Mockito.RETURNS_DEEP_STUBS); - private SharedScheduler sharedScheduler = new SharedScheduler(); DefaultSharedIndexInformer defaultSharedIndexInformer; private DefaultSharedIndexInformer createDefaultSharedIndexInformer(long resyncPeriod) { - defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run, sharedScheduler); + defaultSharedIndexInformer = new DefaultSharedIndexInformer<>(Pod.class, listerWatcher, resyncPeriod, operationContext, Runnable::run); return defaultSharedIndexInformer; } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java similarity index 90% rename from kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java rename to kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java index 4cd419c1179..7915b3b0e73 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/SharedSchedulerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/utils/CachedSingleThreadSchedulerTest.java @@ -14,7 +14,7 @@ * limitations under the License. */ -package io.fabric8.kubernetes.client.informers; +package io.fabric8.kubernetes.client.utils; import org.junit.jupiter.api.Test; @@ -22,16 +22,15 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; -import static org.awaitility.Awaitility.await; - -class SharedSchedulerTest { +class CachedSingleThreadSchedulerTest { @Test void testAutoShutdown() throws InterruptedException { - SharedScheduler scheduler = new SharedScheduler(50); + CachedSingleThreadScheduler scheduler = new CachedSingleThreadScheduler(50); assertFalse(scheduler.hasExecutor()); CountDownLatch latch = new CountDownLatch(2); ScheduledFuture future = scheduler.scheduleWithFixedDelay(latch::countDown, 50, 50, TimeUnit.MILLISECONDS); From 5b24b95dcd65ffe7adcac759cf0a92e9192c28d8 Mon Sep 17 00:00:00 2001 From: manusa Date: Mon, 24 May 2021 17:39:12 +0200 Subject: [PATCH 9/9] test: Update outdated tests --- .../SharedInformerEventListener.java | 5 + .../impl/DefaultSharedIndexInformer.java | 26 ++--- .../internal/AbstractWatchManagerTest.java | 104 +++++++++++------- .../mock/DefaultSharedIndexInformerTest.java | 8 +- 4 files changed, 90 insertions(+), 53 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java index 0b4702e3dd0..6bda783673f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerEventListener.java @@ -16,6 +16,11 @@ package io.fabric8.kubernetes.client.informers; public interface SharedInformerEventListener { + + /** + * @deprecated Use {@link #onException(SharedIndexInformer, Exception)} instead + */ + @Deprecated void onException(Exception exception); default void onException(SharedIndexInformer informer, Exception e) { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index e44bf825633..1e58746b48b 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -54,8 +54,8 @@ public class DefaultSharedIndexInformer reflector; private final Class apiTypeClass; private final ProcessorStore processorStore; @@ -63,11 +63,11 @@ public class DefaultSharedIndexInformer processor; private final Executor informerExecutor; - private AtomicBoolean started = new AtomicBoolean(); + private final AtomicBoolean started = new AtomicBoolean(); private volatile boolean stopped = false; private ScheduledFuture resyncFuture; - + public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) { if (resyncPeriod < 0) { throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value"); @@ -81,7 +81,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(new SerialExecutor(informerExecutor)); this.indexer = new Cache<>(); this.indexer.setIsRunning(this::isRunning); - + processorStore = new ProcessorStore<>(this.indexer, this.processor); this.reflector = new Reflector<>(apiTypeClass, listerWatcher, processorStore, context); } @@ -121,9 +121,9 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon } } } - + ProcessorListener listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis)); - + if (!started.get()) { return; } @@ -151,7 +151,7 @@ public void run() { log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis); scheduleResync(processor::shouldResync); - + reflector.listSyncAndWatch(); // stop called while run is called could be ineffective, check for it afterwards synchronized (this) { @@ -204,9 +204,9 @@ private long determineResyncPeriod(long desired, long check) { @Override public boolean isRunning() { - return !stopped && started.get() && reflector.isRunning(); + return !stopped && started.get() && reflector.isRunning(); } - + synchronized void scheduleResync(Supplier resyncFunc) { // schedule the resync runnable if (resyncCheckPeriodMillis > 0) { @@ -216,15 +216,15 @@ synchronized void scheduleResync(Supplier resyncFunc) { log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass); } } - + public long getFullResyncPeriod() { return resyncCheckPeriodMillis; } - + ScheduledFuture getResyncFuture() { return resyncFuture; } - + @Override public Class getApiTypeClass() { return apiTypeClass; diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index ecd15b2a5fa..bfce1d4357e 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -18,44 +18,27 @@ import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; -import okhttp3.OkHttpClient; -import okhttp3.Request; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.WebSocket; -import org.junit.jupiter.api.AfterEach; -import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; import org.mockito.MockedStatic; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.atomic.AtomicInteger; import static org.assertj.core.api.Assertions.assertThat; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.Mockito.RETURNS_DEEP_STUBS; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; class AbstractWatchManagerTest { - private MockedStatic executors; - private ScheduledExecutorService executorService; - - @BeforeEach - void setUp() { - executorService = mock(ScheduledExecutorService.class, RETURNS_DEEP_STUBS); - executors = mockStatic(Executors.class); - executors.when(() -> Executors.newSingleThreadScheduledExecutor(any())).thenReturn(executorService); - } - - @AfterEach - void tearDown() { - executors.close(); - } - @Test @DisplayName("closeEvent, is idempotent, multiple calls only close watcher once") void closeEventIsIdempotent() { @@ -71,7 +54,7 @@ void closeEventIsIdempotent() { } @Test - @DisplayName("closeEvent with Exception, is idempotent, multiple calls only close watcher once") + @DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once") void closeEventWithExceptionIsIdempotent() { // Given final WatcherAdapter watcher = new WatcherAdapter<>(); @@ -100,7 +83,7 @@ void closeWebSocket() { void nextReconnectInterval() { // Given final WatchManager awm = new WatchManager<>( - null, mock(ListOptions.class), 0, 10, 5, null); + null, mock(ListOptions.class), 0, 10, 5); // When-Then assertThat(awm.nextReconnectInterval()).isEqualTo(10); assertThat(awm.nextReconnectInterval()).isEqualTo(20); @@ -111,10 +94,65 @@ void nextReconnectInterval() { assertThat(awm.nextReconnectInterval()).isEqualTo(320); } + @Test + @DisplayName("cancelReconnect, with null attempt, should do nothing") + void cancelReconnectNullAttempt() { + // Given + final ScheduledFuture sf = spy(ScheduledFuture.class); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + // When + awm.cancelReconnect(); + // Then + verify(sf, times(0)).cancel(true); + } + + @Test + @DisplayName("cancelReconnect, with non-null attempt, should cancel") + void cancelReconnectNonNullAttempt() { + // Given + final ScheduledFuture sf = mock(ScheduledFuture.class); + final MockedStatic utils = mockStatic(Utils.class); + utils.when(() -> Utils.schedule(any(), any(), anyLong(), any())).thenReturn(sf); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.scheduleReconnect(null, false); + // When + awm.cancelReconnect(); + // Then + verify(sf, times(1)).cancel(true); + } + + @Test + @DisplayName("isClosed, after close invocation, should return true") + void isForceClosedWhenClosed() { + // Given + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.initRunner(mock(AbstractWatchManager.ClientRunner.class)); + // When + awm.close(); + // Then + assertThat(awm.isForceClosed()).isTrue(); + } + + @Test + @DisplayName("close, after close invocation, should return true") + void closeWithNonNullRunnerShouldCancelRunner() { + // Given + final AbstractWatchManager.ClientRunner clientRunner = mock(AbstractWatchManager.ClientRunner.class); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.initRunner(clientRunner); + // When + awm.close(); + // Then + verify(clientRunner, times(1)).close(); + } + private static WatchManager withDefaultWatchManager(Watcher watcher) { return new WatchManager<>( - watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0, - mock(OkHttpClient.class)); + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0); } private static class WatcherAdapter implements Watcher { @@ -122,7 +160,7 @@ private static class WatcherAdapter implements Watcher { @Override public void eventReceived(Action action, T resource) {} - + @Override public void onClose(WatcherException cause) { closeCount.addAndGet(1); @@ -136,20 +174,8 @@ public void onClose() { private static final class WatchManager extends AbstractWatchManager { - public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, OkHttpClient clonedClient) { + public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null); - initRunner(new ClientRunner(clonedClient) { - @Override - void run(Request request) {} - - @Override - OkHttpClient cloneAndCustomize(OkHttpClient client) { - return clonedClient; - } - }); - } - @Override - public void close() { } } } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 6fc6febf70b..b19015b057b 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -64,6 +64,7 @@ import java.util.function.BiFunction; import java.util.function.Function; +import static org.assertj.core.api.Assertions.assertThat; import static org.awaitility.Awaitility.await; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -851,9 +852,14 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { @Test void testRunAfterStop() { + // Given SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 0); podInformer.stop(); - assertThrows(IllegalStateException.class, podInformer::run); + // When + final IllegalStateException result = assertThrows(IllegalStateException.class, podInformer::run); + // Then + assertThat(result) + .hasMessage("Cannot restart a stopped informer"); } private KubernetesResource getAnimal(String name, String order, String resourceVersion) {