diff --git a/CHANGELOG.md b/CHANGELOG.md index f0595c568c9..68a388fbc15 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -20,6 +20,7 @@ * Fix #2910: Move crd-generator tests from kubernetes-itests to kubernetes-tests * Fix #3005: Make it possible to select which CRD version is generated / improve output * Fix #3015: Thread interruption in a nominal case (like closing the client) are now logged in debug +* Fix #3050: More enforcement of the informer lifecycle #### Dependency Upgrade * Fix #2979: Update Kubernetes Model to v1.21.0 diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java index dd08a4b08c7..0a4ecff224d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/SharedInformerFactory.java @@ -267,7 +267,7 @@ public synchronized SharedIndexInformer getExistingSharedIndexInformer(Cl SharedIndexInformer foundSharedIndexInformer = null; for (Map.Entry entry : this.informers.entrySet()) { if (isKeyOfType(entry.getKey(), apiTypeClass)) { - foundSharedIndexInformer = (SharedIndexInformer) entry.getValue(); + foundSharedIndexInformer = entry.getValue(); } } return foundSharedIndexInformer; @@ -284,7 +284,7 @@ public synchronized void startAllRegisteredInformers() { if (!informerExecutor.isShutdown()) { informers.forEach( (informerType, informer) -> - startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit(informer::run))); + startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit((Runnable)informer::run))); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java index 67baecdd0a6..cdda0c42505 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Cache.java @@ -28,6 +28,7 @@ import java.util.Map; import java.util.Set; import java.util.function.Function; +import java.util.function.Supplier; /** * It basically saves and indexes all the entries. @@ -49,6 +50,8 @@ public class Cache implements Indexer { // indices stores objects' key by their indices private Map>> indices = new HashMap<>(); + + private Supplier isRunning = () -> false; public Cache() { this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::deletionHandlingMetaNamespaceKeyFunc); @@ -59,6 +62,10 @@ public Cache(String indexName, Function> indexFunc, Function()); } + + public void setIsRunning(Supplier isRunning) { + this.isRunning = isRunning; + } /** * Add objects @@ -85,6 +92,9 @@ public Map>> getIndexers() { @Override public void addIndexers(Map>> indexersNew) { + if (isRunning.get()) { + throw new IllegalStateException("Cannot add indexers to a running informer."); + } if (!items.isEmpty()) { throw new IllegalStateException("Cannot add indexers to a Cache which is not empty"); } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java index e4378466a81..d82868a75ca 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/DefaultSharedIndexInformer.java @@ -52,7 +52,7 @@ public class DefaultSharedIndexInformer indexer; + private Cache indexer; private SharedProcessor processor; @@ -69,6 +69,7 @@ public DefaultSharedIndexInformer(Class apiTypeClass, ListerWatcher lis this.processor = new SharedProcessor<>(); this.indexer = new Cache(); + this.indexer.setIsRunning(this::isRunning); DeltaFIFO fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer); @@ -127,14 +128,14 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler handler, lon @Override public String lastSyncResourceVersion() { - if (!started) { - return ""; - } return this.controller.lastSyncResourceVersion(); } @Override public void run() { + if (stopped) { + throw new IllegalStateException("Cannot restart a stopped informer"); + } if (started) { return; } @@ -147,7 +148,7 @@ public void run() { @Override public void stop() { - if (!started) { + if (!started || stopped) { return; } @@ -208,9 +209,6 @@ private void handleDeltas(Deque>> indexers) { - if (started) { - throw new IllegalStateException("Cannot add indexers to a running informer."); - } indexer.addIndexers(indexers); } diff --git a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java index 68f93c09926..23deb335233 100644 --- a/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java +++ b/kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java @@ -67,6 +67,7 @@ import static org.awaitility.Awaitility.await; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @EnableKubernetesMockClient @@ -846,6 +847,13 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) { assertFalse(podInformer.isRunning()); } + @Test public void testRunAfterStop() { + SharedIndexInformer podInformer = factory.sharedIndexInformerFor(Pod.class, 0); + podInformer.run(); + podInformer.stop(); + assertThrows(IllegalStateException.class, podInformer::run); + } + private KubernetesResource getAnimal(String name, String order, String resourceVersion) { AnimalSpec animalSpec = new AnimalSpec(); animalSpec.setOrder(order);