Skip to content

Commit

Permalink
fix fabric8io#3159: simplifying the resync logic
Browse files Browse the repository at this point in the history
treats the informer resync period as the only meaningful resync
  • Loading branch information
shawkins committed May 24, 2021
1 parent df5d802 commit 58d58a9
Show file tree
Hide file tree
Showing 10 changed files with 78 additions and 137 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,10 @@
#### New Features
* Fix #3133: Add DSL Support for `authorization.openshift.io/v1` resources in OpenShiftClient

#### _**Note**_: Breaking changes in the API
##### DSL Changes:
- #3159 The resync period that an informer is created with is the only relevant period for that informer. Handlers that are added will use the informer resync period unless resync is disabled when the handler is added.

### 5.4.0 (2021-05-19)

#### Bugs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,17 @@ public interface SharedInformer<T> {

/**
* Adds an event handler to the shared informer using the specified resync period.
* The period may be equal to 0 to disable resync events for this listener. Any other value
* will become the same as the SharedInformer default.
*
* Events to a single handler are delivered sequentially, but there is no
* coordination between different handlers.
*
* @param handle the event handler
* @param resyncPeriod the specific resync period
* @deprecated
*/
@Deprecated
void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handle, long resyncPeriod);

/**
Expand Down Expand Up @@ -71,4 +76,15 @@ public interface SharedInformer<T> {
* Return the class this informer is watching
*/
Class<T> getApiTypeClass();

/**
* Adds an event handler to the shared informer without resync.
*
* Events to a single handler are delivered sequentially, but there is no
* coordination between different handlers.
*
* @param handle the event handler
* @param resyncPeriod the specific resync period
*/
void addEventHandlerWithoutResync(ResourceEventHandler<T> handler);
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ public SharedInformerFactory withName(String name) {
* <b>Note:</b>It watches for events in <b>ALL NAMESPACES</b>.
*
* @param apiTypeClass apiType class
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs. 1000 is the minimum value.
* @param <T> the type parameter (should extend {@link io.fabric8.kubernetes.api.model.HasMetadata} and implement {@link io.fabric8.kubernetes.api.model.Namespaced}) if Namespace scoped resource
* @return the shared index informer
*/
Expand All @@ -132,7 +132,7 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
*
* @param apiTypeClass apiType class
* @param operationContext {@link OperationContext} Operation Context
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs. 1000 is the minimum value.
* @param <T> the type parameter (should extend {@link io.fabric8.kubernetes.api.model.HasMetadata} and implement {@link io.fabric8.kubernetes.api.model.Namespaced}) if Namespace scoped resource
* @return the shared index informer
*/
Expand All @@ -148,7 +148,7 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
* @param customResourceContext basic information about the Custom Resource Definition corresponding to that custom resource
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs. 1000 is the minimum value.
* @param <T> the type parameter (should extend {@link io.fabric8.kubernetes.api.model.HasMetadata} and implement {@link io.fabric8.kubernetes.api.model.Namespaced})
* @param <L> the type's list parameter (should extend {@link io.fabric8.kubernetes.api.model.KubernetesResourceList}
* @return the shared index informer
Expand All @@ -172,7 +172,7 @@ public synchronized <T extends HasMetadata> SharedIndexInformer<T> sharedIndexIn
* Constructs and returns a shared index informer with resync period specified for custom resources.
*
* @param apiTypeClass apiType class
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs. 1000 is the minimum value.
* @param <T> the type parameter (should extend {@link CustomResource} and implement {@link io.fabric8.kubernetes.api.model.Namespaced})
* @return the shared index informer
*/
Expand All @@ -186,7 +186,7 @@ public synchronized <T extends CustomResource<?,?>> SharedIndexInformer<T> share
* POJO
*
* @param apiTypeClass apiType class
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs.
* @param <T> the type parameter (should extend {@link io.fabric8.kubernetes.api.model.HasMetadata} and implement {@link io.fabric8.kubernetes.api.model.Namespaced})
* @return the shared index informer
*/
Expand All @@ -202,7 +202,7 @@ public synchronized <T extends CustomResource<?,?>> SharedIndexInformer<T> share
*
* @param apiTypeClass apiType class
* @param apiListTypeClass api list type class
* @param resyncPeriodInMillis resync period in milliseconds
* @param resyncPeriodInMillis resync period in milliseconds. Use 0 to disable resyncs. 1000 is the minimum value.
* @param <T> the type parameter (should extend {@link io.fabric8.kubernetes.api.model.HasMetadata} and implement {@link io.fabric8.kubernetes.api.model.Namespaced})
* @param <L> the type's list parameter (should extend {@link io.fabric8.kubernetes.api.model.KubernetesResourceList}
* @return the shared index informer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.ZonedDateTime;
import java.time.temporal.ChronoUnit;

/**
* ProcessorListener implements Runnable interface. It's supposed to run in background
* and actually executes its event handler on notification.
Expand All @@ -35,15 +32,10 @@
*/
public class ProcessorListener<T> {
private static final Logger log = LoggerFactory.getLogger(ProcessorListener.class);
private long resyncPeriodInMillis;
private ZonedDateTime nextResync;
private ResourceEventHandler<T> handler;

public ProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodInMillis) {
this.resyncPeriodInMillis = resyncPeriodInMillis;
public ProcessorListener(ResourceEventHandler<T> handler) {
this.handler = handler;

determineNextResync(ZonedDateTime.now());
}

public void add(Notification<T> notification) {
Expand All @@ -54,14 +46,6 @@ public void add(Notification<T> notification) {
}
}

public void determineNextResync(ZonedDateTime now) {
this.nextResync = now.plus(this.resyncPeriodInMillis, ChronoUnit.MILLIS);
}

public boolean shouldResync(ZonedDateTime now) {
return this.resyncPeriodInMillis != 0 && (now.isAfter(this.nextResync) || now.equals(this.nextResync));
}

public abstract static class Notification<T> {
private final T oldObject;
private final T newObject;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@

package io.fabric8.kubernetes.client.informers.cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -25,6 +28,8 @@
* @param <T>
*/
public class ProcessorStore<T> implements Store<T> {

private static final Logger log = LoggerFactory.getLogger(ProcessorStore.class);

private Store<T> actualStore;
private SharedProcessor<T> processor;
Expand Down Expand Up @@ -106,6 +111,7 @@ public synchronized void replace(List<T> list, String resourceVersion) {

@Override
public void resync() {
log.debug("resync");
this.actualStore.list()
.forEach(i -> this.processor.distribute(new ProcessorListener.UpdateNotification<T>(i, i), true));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@
*/
package io.fabric8.kubernetes.client.informers.cache;

import io.fabric8.kubernetes.client.informers.ResourceEventHandler;

import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
Expand All @@ -30,7 +27,7 @@
*
* This has been taken from official-client: https://github.com/kubernetes-client/java/blob/master/util/src/main/java/io/kubernetes/client/informer/cache/SharedProcessor.java
*
* <br>Modified to simplify threading
* <br>Modified to simplify threading and synching
*/
public class SharedProcessor<T> {
private ReadWriteLock lock = new ReentrantReadWriteLock();
Expand All @@ -54,11 +51,13 @@ public SharedProcessor(Executor executor) {
*
* @param processorListener specific processor listener
*/
public void addListener(final ProcessorListener<T> processorListener) {
public void addListener(final ProcessorListener<T> processorListener, boolean synching) {
lock.writeLock().lock();
try {
this.listeners.add(processorListener);
this.syncingListeners.add(processorListener);
if (synching) {
this.syncingListeners.add(processorListener);
}
} finally {
lock.writeLock().unlock();
}
Expand Down Expand Up @@ -90,26 +89,6 @@ public void distribute(ProcessorListener.Notification<T> obj, boolean isSync) {
});
}

public boolean shouldResync() {
lock.writeLock().lock();
boolean resyncNeeded = false;
try {
this.syncingListeners = new ArrayList<>();

ZonedDateTime now = ZonedDateTime.now();
for (ProcessorListener<T> listener : this.listeners) {
if (listener.shouldResync(now)) {
resyncNeeded = true;
this.syncingListeners.add(listener);
listener.determineNextResync(now);
}
}
} finally {
lock.writeLock().unlock();
}
return resyncNeeded;
}

public void stop() {
lock.writeLock().lock();
try {
Expand All @@ -120,14 +99,4 @@ public void stop() {
}
}

public ProcessorListener<T> addProcessorListener(ResourceEventHandler<T> handler, long resyncPeriodMillis) {
lock.writeLock().lock();
try {
ProcessorListener<T> listener = new ProcessorListener<>(handler, resyncPeriodMillis);
addListener(listener);
return listener;
} finally {
lock.writeLock().unlock();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import io.fabric8.kubernetes.client.dsl.base.OperationContext;
import io.fabric8.kubernetes.client.informers.ListerWatcher;
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
import io.fabric8.kubernetes.client.informers.ResyncRunnable;
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
import io.fabric8.kubernetes.client.informers.cache.Cache;
import io.fabric8.kubernetes.client.informers.cache.Indexer;
Expand All @@ -40,22 +39,15 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import java.util.function.Supplier;

public class DefaultSharedIndexInformer<T extends HasMetadata, L extends KubernetesResourceList<T>> implements SharedIndexInformer<T> {
private static final Logger log = LoggerFactory.getLogger(DefaultSharedIndexInformer.class);

private static final long MINIMUM_RESYNC_PERIOD_MILLIS = 1000L;

// resyncCheckPeriod is how often we want the reflector's resync timer to fire so it can
// call shouldResync to check if any of our listeners need a resync.
private long resyncCheckPeriodMillis;

// defaultEventHandlerResyncPeriod is the default resync period for any handlers added via
// AddEventHandler(i.e they don't specify one and just want to use the shared informer's default
// value).
private final long defaultEventHandlerResyncPeriod;

private final Reflector<T, L> reflector;
private final Class<T> apiTypeClass;
private final ProcessorStore<T> processorStore;
Expand All @@ -71,9 +63,11 @@ public class DefaultSharedIndexInformer<T extends HasMetadata, L extends Kuberne
public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> listerWatcher, long resyncPeriod, OperationContext context, Executor informerExecutor) {
if (resyncPeriod < 0) {
throw new IllegalArgumentException("Invalid resync period provided, It should be a non-negative value");
} else if (resyncPeriod > 0 && resyncPeriod < MINIMUM_RESYNC_PERIOD_MILLIS) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is too small. Changing it to minimal allowed value of {}", resyncPeriod, MINIMUM_RESYNC_PERIOD_MILLIS);
resyncPeriod = MINIMUM_RESYNC_PERIOD_MILLIS;
}
this.resyncCheckPeriodMillis = resyncPeriod;
this.defaultEventHandlerResyncPeriod = resyncPeriod;
this.apiTypeClass = apiTypeClass;

this.informerExecutor = informerExecutor;
Expand All @@ -93,37 +87,33 @@ public DefaultSharedIndexInformer(Class<T> apiTypeClass, ListerWatcher<T, L> lis
*/
@Override
public void addEventHandler(ResourceEventHandler<T> handler) {
addEventHandlerWithResyncPeriod(handler, defaultEventHandlerResyncPeriod);
addEventHandlerWithResyncPeriod(handler, resyncCheckPeriodMillis);
}

@Override
public void addEventHandlerWithResyncPeriod(ResourceEventHandler<T> handler, long resyncPeriodMillis) {
if (resyncPeriodMillis != 0 && resyncPeriodMillis != this.resyncCheckPeriodMillis) {
log.warn("The specified resyncPeriod {} was changed to match the informer resync {}", this.resyncCheckPeriodMillis);
resyncPeriodMillis = this.resyncCheckPeriodMillis;
}

addEventHandler(handler, resyncPeriodMillis != 0);
}

@Override
public void addEventHandlerWithoutResync(ResourceEventHandler<T> handler) {
addEventHandler(handler, false);
}

synchronized void addEventHandler(ResourceEventHandler<T> handler, boolean resync) {
if (stopped) {
log.info("DefaultSharedIndexInformer#Handler was not added to shared informer because it has stopped already");
return;
}

if (resyncPeriodMillis > 0) {
if (resyncPeriodMillis < MINIMUM_RESYNC_PERIOD_MILLIS) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is too small. Changing it to minimal allowed value of {}", resyncPeriodMillis, MINIMUM_RESYNC_PERIOD_MILLIS);
resyncPeriodMillis = MINIMUM_RESYNC_PERIOD_MILLIS;
}

if (resyncPeriodMillis < this.resyncCheckPeriodMillis) {
if (started.get()) {
log.warn("DefaultSharedIndexInformer#resyncPeriod {} is smaller than resyncCheckPeriod {} and the informer has already started. Changing it to {}", resyncPeriodMillis, resyncCheckPeriodMillis);
resyncPeriodMillis = resyncCheckPeriodMillis;
} else {
// if the event handler's resyncPeriod is smaller than the current resyncCheckPeriod
// update resyncCheckPeriod to match resyncPeriod and adjust the resync periods of all
// the listeners accordingly.
this.resyncCheckPeriodMillis = resyncPeriodMillis;
}
}
}

ProcessorListener<T> listener = this.processor.addProcessorListener(handler, determineResyncPeriod(resyncPeriodMillis, this.resyncCheckPeriodMillis));

ProcessorListener<T> listener = new ProcessorListener<>(handler);
this.processor.addListener(listener, resync);

if (!started.get()) {
return;
}
Expand All @@ -150,8 +140,8 @@ public void run() {

log.info("informer#Controller: ready to run resync and reflector for {} with resync {}", apiTypeClass, resyncCheckPeriodMillis);

scheduleResync(processor::shouldResync);

scheduleResync(processorStore::resync);
reflector.listSyncAndWatch();
// stop called while run is called could be ineffective, check for it afterwards
synchronized (this) {
Expand Down Expand Up @@ -191,27 +181,15 @@ public Indexer<T> getIndexer() {
return this.indexer;
}

private long determineResyncPeriod(long desired, long check) {
if (desired == 0) {
return desired;
}

if (check == 0) {
return 0;
}
return desired < check ? check : desired;
}

@Override
public boolean isRunning() {
return !stopped && started.get() && reflector.isRunning();
}

synchronized void scheduleResync(Supplier<Boolean> resyncFunc) {
synchronized void scheduleResync(Runnable resyncRunnable) {
// schedule the resync runnable
if (resyncCheckPeriodMillis > 0) {
ResyncRunnable<T> resyncRunnable = new ResyncRunnable<>(processorStore, resyncFunc);
resyncFuture = Utils.scheduleAtFixedRate(informerExecutor, resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS);
resyncFuture = Utils.scheduleAtFixedRate(informerExecutor, resyncRunnable, resyncCheckPeriodMillis, resyncCheckPeriodMillis, TimeUnit.MILLISECONDS);
} else {
log.debug("informer#Controller: resync skipped due to 0 full resync period {}", apiTypeClass);
}
Expand All @@ -220,7 +198,11 @@ synchronized void scheduleResync(Supplier<Boolean> resyncFunc) {
public long getFullResyncPeriod() {
return resyncCheckPeriodMillis;
}


void setResyncCheckPeriodMillis(long resyncCheckPeriodMillis) {
this.resyncCheckPeriodMillis = resyncCheckPeriodMillis;
}

ScheduledFuture<?> getResyncFuture() {
return resyncFuture;
}
Expand Down
Loading

0 comments on commit 58d58a9

Please sign in to comment.