Skip to content

Commit

Permalink
fix: #3050 adding more enforcement of the informer lifecycle
Browse files Browse the repository at this point in the history
also removing the empty string return for last version when not started
- it complicates checking the last resource version as you have to look
for null and empty string to check for invalid values.
  • Loading branch information
shawkins committed Apr 28, 2021
1 parent 5fe1b0d commit ff57558
Show file tree
Hide file tree
Showing 5 changed files with 27 additions and 10 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
* Fix #2910: Move crd-generator tests from kubernetes-itests to kubernetes-tests
* Fix #3005: Make it possible to select which CRD version is generated / improve output
* Fix #3015: Thread interruption in a nominal case (like closing the client) are now logged in debug
* Fix #3050: More enforcement of the informer lifecycle

#### Dependency Upgrade
* Fix #2979: Update Kubernetes Model to v1.21.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,7 +267,7 @@ public synchronized <T> SharedIndexInformer<T> getExistingSharedIndexInformer(Cl
SharedIndexInformer<T> foundSharedIndexInformer = null;
for (Map.Entry<String, SharedIndexInformer> entry : this.informers.entrySet()) {
if (isKeyOfType(entry.getKey(), apiTypeClass)) {
foundSharedIndexInformer = (SharedIndexInformer<T>) entry.getValue();
foundSharedIndexInformer = entry.getValue();
}
}
return foundSharedIndexInformer;
Expand All @@ -284,7 +284,7 @@ public synchronized void startAllRegisteredInformers() {
if (!informerExecutor.isShutdown()) {
informers.forEach(
(informerType, informer) ->
startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit(informer::run)));
startedInformers.computeIfAbsent(informerType, key -> informerExecutor.submit((Runnable)informer::run)));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Supplier;

/**
* It basically saves and indexes all the entries.
Expand All @@ -49,6 +50,8 @@ public class Cache<T> implements Indexer<T> {

// indices stores objects' key by their indices
private Map<String, Map<String, Set<String>>> indices = new HashMap<>();

private Supplier<Boolean> isRunning = () -> false;

public Cache() {
this(NAMESPACE_INDEX, Cache::metaNamespaceIndexFunc, Cache::deletionHandlingMetaNamespaceKeyFunc);
Expand All @@ -59,6 +62,10 @@ public Cache(String indexName, Function<T, List<String>> indexFunc, Function<T,
this.keyFunc = keyFunc;
this.indices.put(indexName, new HashMap<>());
}

public void setIsRunning(Supplier<Boolean> isRunning) {
this.isRunning = isRunning;
}

/**
* Add objects
Expand All @@ -85,6 +92,9 @@ public Map<String, Function<T, List<String>>> getIndexers() {

@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexersNew) {
if (isRunning.get()) {
throw new IllegalStateException("Cannot add indexers to a running informer.");
}
if (!items.isEmpty()) {
throw new IllegalStateException("Cannot add indexers to a Cache which is not empty");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
// value).
private long defaultEventHandlerResyncPeriod;

private Indexer<T> indexer;
private Cache<T> indexer;

private SharedProcessor<T> processor;

Expand All @@ -69,6 +69,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis

this.processor = new SharedProcessor<>();
this.indexer = new Cache();
this.indexer.setIsRunning(this::isRunning);

DeltaFIFO<T> fifo = new DeltaFIFO<>(Cache::metaNamespaceKeyFunc, this.indexer);

Expand Down Expand Up @@ -127,14 +128,14 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handler, lon

@Override
public String lastSyncResourceVersion() {
if (!started) {
return "";
}
return this.controller.lastSyncResourceVersion();
}

@Override
public void run() {
if (stopped) {
throw new IllegalStateException("Cannot restart a stopped informer");
}
if (started) {
return;
}
Expand All @@ -147,7 +148,7 @@ public void run() {

@Override
public void stop() {
if (!started) {
if (!started || stopped) {
return;
}

Expand Down Expand Up @@ -208,9 +209,6 @@ private void handleDeltas(Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Obj

@Override
public void addIndexers(Map<String, Function<T, List<String>>> indexers) {
if (started) {
throw new IllegalStateException("Cannot add indexers to a running informer.");
}
indexer.addIndexers(indexers);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import static org.awaitility.Awaitility.await;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

@EnableKubernetesMockClient
Expand Down Expand Up @@ -846,6 +847,13 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) {
assertFalse(podInformer.isRunning());
}

@Test public void testRunAfterStop() {
SharedIndexInformer<Pod> podInformer = factory.sharedIndexInformerFor(Pod.class, 0);
podInformer.run();
podInformer.stop();
assertThrows(IllegalStateException.class, podInformer::run);
}

private KubernetesResource getAnimal(String name, String order, String resourceVersion) {
AnimalSpec animalSpec = new AnimalSpec();
animalSpec.setOrder(order);
Expand Down

0 comments on commit ff57558

Please sign in to comment.