Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removing the deltafifo to simplify the informer code #3061

Merged
merged 4 commits into from
May 12, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
* Fix #3015: Thread interruption in a nominal case (like closing the client) are now logged in debug
* Fix #3057: Removed debug calls for CustomResource during deserialization
* Fix #3050: More enforcement of the informer lifecycle
* Fix #3061: Removed the deltafifo from the informer logic

#### Dependency Upgrade
* Fix #2979: Update Kubernetes Model to v1.21.0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -382,10 +382,6 @@ public void addIndexFunc(String indexName, Function<T, List<String>> indexFunc)
* @return the key
*/
public static <T> String deletionHandlingMetaNamespaceKeyFunc(T object) {
if (object instanceof DeltaFIFO.DeletedFinalStateUnknown) {
DeltaFIFO.DeletedFinalStateUnknown deleteObj = (DeltaFIFO.DeletedFinalStateUnknown) object;
return deleteObj.getKey();
}
return metaNamespaceKeyFunc(object);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.AbstractMap;
import java.util.Deque;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Supplier;

/**
Expand All @@ -48,19 +45,14 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<
*/
private final long fullResyncPeriod;

/**
* Queue stores deltas produced by reflector.
*/
private final DeltaFIFO<T> queue;
private final Store<T> store;

private final ListerWatcher<T, L> listerWatcher;

private Reflector<T, L> reflector;

private final Supplier<Boolean> resyncFunc;

private final Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc;

private final ScheduledExecutorService resyncExecutor;

private ScheduledFuture resyncFuture;
Expand All @@ -71,13 +63,10 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<

private final Class<T> apiTypeClass;

private volatile boolean running;

Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.queue = queue;
Controller(Class<T> apiTypeClass, Store<T> store, ListerWatcher<T, L> listerWatcher, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners, ScheduledExecutorService resyncExecutor) {
this.store = store;
this.listerWatcher = listerWatcher;
this.apiTypeClass = apiTypeClass;
this.processFunc = processFunc;
this.resyncFunc = resyncFunc;
if (fullResyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
Expand All @@ -86,39 +75,32 @@ public class Controller<T extends HasMetadata, L extends KubernetesResourceList<
this.operationContext = context;
this.eventListeners = eventListeners;

// Starts one daemon thread for resync
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, queue, operationContext);
this.reflector = new Reflector<>(apiTypeClass, listerWatcher, store, operationContext);
this.resyncExecutor = resyncExecutor;
}

public Controller(Class<T> apiTypeClass, DeltaFIFO<T> queue, ListerWatcher<T, L> listerWatcher, Consumer<Deque<AbstractMap.SimpleEntry<DeltaFIFO.DeltaType, Object>>> processFunc, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, queue, listerWatcher, processFunc, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
public Controller(Class<T> apiTypeClass, Store<T> store, ListerWatcher<T, L> listerWatcher, Supplier<Boolean> resyncFunc, long fullResyncPeriod, OperationContext context, ConcurrentLinkedQueue<SharedInformerEventListener> eventListeners) {
this(apiTypeClass, store, listerWatcher, resyncFunc, fullResyncPeriod, context, eventListeners, Executors.newSingleThreadScheduledExecutor());
}

public void run() {
log.info("informer#Controller: ready to run resync and reflector runnable");

scheduleResync();

try {
running = true;
log.info("Started Reflector watch for {}", apiTypeClass);
reflector.listSyncAndWatch();

// Start the process loop
this.processLoop();
} catch (Exception exception) {
log.warn("Reflector list-watching job exiting because the thread-pool is shutting down", exception);
this.eventListeners.forEach(listener -> listener.onException(exception));
} finally {
running = false;
}
}

void scheduleResync() {
// Start the resync runnable
if (fullResyncPeriod > 0) {
ResyncRunnable resyncRunnable = new ResyncRunnable(queue, resyncFunc);
ResyncRunnable resyncRunnable = new ResyncRunnable(store, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, fullResyncPeriod, fullResyncPeriod, TimeUnit.MILLISECONDS);
} else {
log.info("informer#Controller: resync skipped due to 0 full resync period");
Expand All @@ -143,7 +125,7 @@ public void stop() {
* @return boolean value about queue sync status
*/
public boolean hasSynced() {
return this.queue.hasSynced();
return this.store.hasSynced();
}

/**
Expand All @@ -158,30 +140,12 @@ Reflector<T, L> getReflector() {
return reflector;
}

/**
* drains the work queue.
*/
private void processLoop() throws Exception {
while (!Thread.currentThread().isInterrupted()) {
try {
this.queue.pop(this.processFunc);
} catch (InterruptedException t) {
log.debug("DefaultController#processLoop got interrupted: {}", t.getMessage());
Thread.currentThread().interrupt();
return;
} catch (Exception e) {
log.error("DefaultController#processLoop recovered from crashing {} ", e.getMessage(), e);
throw e;
}
}
}

ScheduledExecutorService getResyncExecutor() {
return this.resyncExecutor;
}

public boolean isRunning() {
return running && this.reflector.isRunning();
return this.reflector.isRunning();
}

public long getFullResyncPeriod() {
Expand Down
Loading