Skip to content

Commit

Permalink
fix fabric8io#3159: simplifying the resync logic
Browse files Browse the repository at this point in the history
  • Loading branch information
shawkins committed May 23, 2021
1 parent 5da3a98 commit 7fbf763
Show file tree
Hide file tree
Showing 9 changed files with 47 additions and 142 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,37 +19,26 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.function.BooleanSupplier;

/**
* Calls the resync function of store interface which is always implemented
* by DeltaFIFO.
* Calls the resync function of store interface
*/
public class ResyncRunnable<T> implements Runnable {

private static final Logger log = LoggerFactory.getLogger(ResyncRunnable.class);

private Store<T> store;
private BooleanSupplier shouldResyncFunc;

public ResyncRunnable(Store<T> store, BooleanSupplier shouldResyncFunc) {
public ResyncRunnable(Store<T> store) {
this.store = store;
this.shouldResyncFunc = shouldResyncFunc;
}

public void run() {
if (log.isDebugEnabled()) {
log.debug("ResyncRunnable#resync .. ..");
}

// if there is a concern that this processing could overwhelm the single
// thread, then hand this off to the common pool
if (shouldResyncFunc == null || shouldResyncFunc.getAsBoolean()) {
if (log.isDebugEnabled()) {
log.debug("ResyncRunnable#force resync");
}

this.store.resync();
if (log.isDebugEnabled()) {
log.debug("ResyncRunnable#force resync");
}

this.store.resync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ public interface SharedInformer<T> {

/**
* Adds an event handler to the shared informer using the specified resync period.
* The period may be equal to 0 to disable resync events for this listener. Any other value
* will become the same as the SharedInformer default.
*
* Events to a single handler are delivered sequentially, but there is no
* coordination between different handlers.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

/**
* ProcessorListener implements Runnable interface. It's supposed to run in background
* and actually executes its event handler on notification.
Expand All @@ -35,15 +32,10 @@
*/
public class ProcessorListener<T> {
private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class);
private long resyncPeriodInMillis;
private ZonedDateTime nextResync;
private ResourceEventHandler<T> handler;

public ProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodInMillis) {
this.resyncPeriodInMillis = resyncPeriodInMillis;
public ProcessorListener(ResourceEventHandler<T> handler) {
this.handler = handler;

determineNextResync(ZonedDateTime.now());
}

public void add(Notification<T> notification) {
Expand All @@ -54,14 +46,6 @@ public void add(Notification<T> notification) {
}
}

public void determineNextResync(ZonedDateTime now) {
this.nextResync = now.plus(this.resyncPeriodInMillis, ChronoUnit.MILLIS);
}

public boolean shouldResync(ZonedDateTime now) {
return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync));
}

public abstract static class Notification<T> {
private final T oldObject;
private final T newObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.fabric8.kubernetes.client.informers.cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +28,8 @@
* @param <T>
*/
public class ProcessorStore<T> implements Store<T> {

private static final Logger log = LoggerFactory.getLogger(ProcessorStore.class);

private Store<T> actualStore;
private SharedProcessor<T> processor;
Expand Down Expand Up @@ -106,6 +111,7 @@ public synchronized void replace(List<T> list, String resourceVersion) {

@Override
public void resync() {
log.debug("resync");
this.actualStore.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<T>(i, i), true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.client.informers.ResourceEventHandler;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
Expand All @@ -30,7 +27,7 @@
*
* This has been taken from official-client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java
*
* <br>Modified to simplify threading
* <br>Modified to simplify threading and synching
*/
public class SharedProcessor<T> {
private ReadWriteLock lock = new ReentrantReadWriteLock();
Expand All @@ -54,11 +51,13 @@ public SharedProcessor(Executor executor) {
*
* @param processorListener specific processor listener
*/
public void addListener(final ProcessorListener<T> processorListener) {
public void addListener(final ProcessorListener<T> processorListener, boolean synching) {
lock.writeLock().lock();
try {
this.listeners.add(processorListener);
this.syncingListeners.add(processorListener);
if (synching) {
this.syncingListeners.add(processorListener);
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -90,26 +89,6 @@ public void distribute(ProcessorListener.Notification<T> obj, boolean isSync) {
});
}

public boolean shouldResync() {
lock.writeLock().lock();
boolean resyncNeeded = false;
try {
this.syncingListeners = new ArrayList<>();

ZonedDateTime now = ZonedDateTime.now();
for (ProcessorListener<T> listener : this.listeners) {
if (listener.shouldResync(now)) {
resyncNeeded = true;
this.syncingListeners.add(listener);
listener.determineNextResync(now);
}
}
} finally {
lock.writeLock().unlock();
}
return resyncNeeded;
}

public void stop() {
lock.writeLock().lock();
try {
Expand All @@ -120,14 +99,4 @@ public void stop() {
}
}

public ProcessorListener<T> addProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodMillis) {
lock.writeLock().lock();
try {
ProcessorListener<T> listener = new ProcessorListener<>(handler, resyncPeriodMillis);
addListener(listener);
return listener;
} finally {
lock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.ResyncRunnable;
import io.fabric8.kubernetes.client.informers.SerialExecutor;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.SharedScheduler;
Expand All @@ -39,7 +38,6 @@
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;
import java.util.function.Function;

public class DefaultSharedIndexInformer<T extends HasMetadata, L extends KubernetesResourceList<T>> implements SharedIndexInformer<T> {
Expand All @@ -51,11 +49,6 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
// call shouldResync to check if any of our listeners need a resync.
private long resyncCheckPeriodMillis;

// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler(i.e they don't specify one and just want to use the shared informer's default
// value).
private long defaultEventHandlerResyncPeriod;

private final SharedScheduler resyncExecutor;
private final Reflector<T, L> reflector;
private final Class<T> apiTypeClass;
Expand All @@ -74,9 +67,11 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor, SharedScheduler resyncExecutor) {
if (resyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
} else if (resyncPeriod > 0 && resyncPeriod < MINIMUM_RESYNC_PERIOD_MILLIS) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is too small. Changing it to minimal allowed value of {}", resyncPeriod, MINIMUM_RESYNC_PERIOD_MILLIS);
resyncPeriod = MINIMUM_RESYNC_PERIOD_MILLIS;
}
this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;
this.resyncExecutor = resyncExecutor;
this.apiTypeClass = apiTypeClass;

Expand All @@ -96,7 +91,7 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
*/
@Override
public void addEventHandler(ResourceEventHandler<T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
addEventHandlerWithResyncPeriod(handler, resyncCheckPeriodMillis);
}

@Override
Expand All @@ -106,26 +101,13 @@ public void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handler, lon
return;
}

if (resyncPeriodMillis > 0) {
if (resyncPeriodMillis < MINIMUM_RESYNC_PERIOD_MILLIS) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is too small. Changing it to minimal allowed value of {}", resyncPeriodMillis, MINIMUM_RESYNC_PERIOD_MILLIS);
resyncPeriodMillis = MINIMUM_RESYNC_PERIOD_MILLIS;
}

if (resyncPeriodMillis < this.resyncCheckPeriodMillis) {
if (started.get()) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is smaller than resyncCheckPeriod {} and the informer has already started. Changing it to {}", resyncPeriodMillis, resyncCheckPeriodMillis);
resyncPeriodMillis = resyncCheckPeriodMillis;
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod
// update resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all
// the listeners accordingly.
this.resyncCheckPeriodMillis = resyncPeriodMillis;
}
}
if (resyncPeriodMillis != 0 && resyncPeriodMillis != this.resyncCheckPeriodMillis) {
log.warn("The specified resyncPeriod {} was changed to match the informer resync {}", this.resyncCheckPeriodMillis);
resyncPeriodMillis = this.resyncCheckPeriodMillis;
}

ProcessorListener<T> listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis));
ProcessorListener<T> listener = new ProcessorListener<>(handler);
this.processor.addListener(listener, resyncPeriodMillis != 0);

if (!started.get()) {
return;
Expand Down Expand Up @@ -153,7 +135,7 @@ public void run() {

log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis);

scheduleResync(processor::shouldResync);
scheduleResync(processorStore::resync);

reflector.listSyncAndWatch();
// stop called while run is called could be ineffective, check for it afterwards
Expand Down Expand Up @@ -194,27 +176,15 @@ public Indexer<T> getIndexer() {
return this.indexer;
}

private long determineResyncPeriod(long desired, long check) {
if (desired == 0) {
return desired;
}

if (check == 0) {
return 0;
}
return desired < check ? check : desired;
}

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
}

synchronized void scheduleResync(BooleanSupplier resyncFunc) {
synchronized void scheduleResync(Runnable resync) {
// schedule the resync runnable
if (resyncCheckPeriodMillis > 0) {
ResyncRunnable<T> resyncRunnable = new ResyncRunnable<>(processorStore, resyncFunc);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS);
resyncFuture = resyncExecutor.scheduleWithFixedDelay(resync, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS);
} else {
log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass);
}
Expand All @@ -224,6 +194,10 @@ public long getFullResyncPeriod() {
return resyncCheckPeriodMillis;
}

void setResyncCheckPeriodMillis(long resyncCheckPeriodMillis) {
this.resyncCheckPeriodMillis = resyncCheckPeriodMillis;
}

ScheduledFuture<?> getResyncFuture() {
return resyncFuture;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void onDelete(Pod obj, boolean deletedFinalStateUnknown) {
assertEquals(pod, obj);
deleteNotificationReceived = true;
}
}, 0);
});

listener.add(new ProcessorListener.AddNotification<>(pod));
listener.add(new ProcessorListener.UpdateNotification<>(null, pod));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ void testListenerAddition() throws InterruptedException {
ExpectingNotificationHandler<Pod> expectUpdateHandler = new ExpectingNotificationHandler<>(updateNotification);
ExpectingNotificationHandler<Pod> expectDeleteHandler = new ExpectingNotificationHandler<>(deleteNotification);

sharedProcessor.addListener(expectAddHandler);
sharedProcessor.addListener(expectUpdateHandler);
sharedProcessor.addListener(expectDeleteHandler);
sharedProcessor.addListener(expectAddHandler, false);
sharedProcessor.addListener(expectUpdateHandler, false);
sharedProcessor.addListener(expectDeleteHandler, false);

sharedProcessor.distribute(addNotification, false);
sharedProcessor.distribute(updateNotification, false);
Expand All @@ -60,12 +60,12 @@ public void onUpdate(T oldObj, T newObj) { }

@Override
public void onDelete(T obj, boolean deletedFinalStateUnknown) { }
}, 0);
});
this.expectingNotification = notification;
}

ExpectingNotificationHandler(ResourceEventHandler<T> handler, long resyncPeriod) {
super(handler, resyncPeriod);
ExpectingNotificationHandler(ResourceEventHandler<T> handler) {
super(handler);
}

private ProcessorListener.Notification<T> expectingNotification;
Expand Down
Loading

0 comments on commit 7fbf763

Please sign in to comment.