diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index 33bfa259ade..71e4eb25106 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -15,26 +15,42 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.api.model.KubernetesResource; +import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.utils.ExponentialBackoffIntervalCalculator; +import io.fabric8.kubernetes.api.model.Status; +import io.fabric8.kubernetes.api.model.WatchEvent; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watch; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; +import io.fabric8.kubernetes.client.Watcher.Action; +import io.fabric8.kubernetes.client.utils.Serialization; import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.WebSocket; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.List; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; -public abstract class AbstractWatchManager implements Watch { +import static java.net.HttpURLConnection.HTTP_GONE; +public abstract class AbstractWatchManager implements Watch { + + @FunctionalInterface + interface RequestBuilder { + Request build(final String resourceVersion); + } + private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class); final Watcher watcher; @@ -47,12 +63,12 @@ public abstract class AbstractWatchManager implements Watch { private ScheduledFuture reconnectAttempt; private final RequestBuilder requestBuilder; - protected ClientRunner runner; + protected final OkHttpClient client; private final AtomicBoolean reconnectPending = new AtomicBoolean(false); AbstractWatchManager( - Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder + Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier clientSupplier ) { this.watcher = watcher; this.reconnectLimit = reconnectLimit; @@ -60,23 +76,26 @@ public abstract class AbstractWatchManager implements Watch { this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion()); this.currentReconnectAttempt = new AtomicInteger(0); this.forceClosed = new AtomicBoolean(); - this.requestBuilder = requestBuilder; + this.client = clientSupplier.get(); + + runWatch(); } - protected void initRunner(ClientRunner runner) { - if (this.runner != null) { - throw new IllegalStateException("ClientRunner has already been initialized"); - } - this.runner = runner; - } - - final void closeEvent(WatcherException cause) { + protected abstract void run(Request request); + + protected abstract void closeRequest(); + + final void close(WatcherException cause) { + // proactively close the request (it will be called again in close) + // for reconnecting watchers, we may not complete onClose for a while + closeRequest(); if (!watcher.reconnecting() && forceClosed.getAndSet(true)) { logger.debug("Ignoring duplicate firing of onClose event"); - return; + } else { + watcher.onClose(cause); } - watcher.onClose(cause); + close(); } final void closeEvent() { @@ -93,27 +112,42 @@ final synchronized void cancelReconnect() { } } - void scheduleReconnect(Runnable command, boolean shouldBackoff) { + void scheduleReconnect() { if (!reconnectPending.compareAndSet(false, true)) { logger.debug("Reconnect already scheduled"); return; } + if (isForceClosed()) { + logger.warn("Ignoring error for already closed/closing connection"); + return; + } + + if (cannotReconnect()) { + close(new WatcherException("Exhausted reconnects")); + return; + } + logger.debug("Scheduling reconnect task"); - long delay = shouldBackoff - ? nextReconnectInterval() - : 0; + long delay = nextReconnectInterval(); synchronized (this) { reconnectAttempt = Utils.schedule(Utils.getCommonExecutorSerive(), () -> { try { - command.run(); + runWatch(); + if (isForceClosed()) { + closeRequest(); + } + } catch (Exception e) { + // An unexpected error occurred and we didn't even get an onFailure callback. + logger.error("Exception in reconnect", e); + close(new WatcherException("Unhandled exception in reconnect attempt", e)); } finally { reconnectPending.set(false); } }, delay, TimeUnit.MILLISECONDS); - if (forceClosed.get()) { + if (isForceClosed()) { cancelReconnect(); } } @@ -142,10 +176,6 @@ void eventReceived(Watcher.Action action, T resource) { watcher.eventReceived(action, resource); } - void onClose(WatcherException cause) { - watcher.onClose(cause); - } - void updateResourceVersion(final String newResourceVersion) { resourceVersion.set(newResourceVersion); } @@ -154,52 +184,89 @@ protected void runWatch() { final Request request = requestBuilder.build(resourceVersion.get()); logger.debug("Watching {}...", request.url()); - runner.run(request); - } - - public void waitUntilReady() { - runner.waitUntilReady(); - } - - static void closeWebSocket(WebSocket webSocket) { - if (webSocket != null) { - logger.debug("Closing websocket {}", webSocket); - try { - if (!webSocket.close(1000, null)) { - logger.warn("Failed to close websocket"); - } - } catch (IllegalStateException e) { - logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage()); - } - } + closeRequest(); // only one can be active at a time + run(request); } @Override public void close() { logger.debug("Force closing the watch {}", this); closeEvent(); - runner.close(); + closeRequest(); cancelReconnect(); } - @FunctionalInterface - interface RequestBuilder { - Request build(final String resourceVersion); + protected WatchEvent readWatchEvent(String messageSource) { + WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); + KubernetesResource object = null; + if (event != null) { + object = event.getObject(); + } + // when watching API Groups we don't get a WatchEvent resource + // so the object will be null + // so lets try parse the message as a KubernetesResource + // as it will probably be a list of resources like a BuildList + if (object == null) { + object = Serialization.unmarshal(messageSource, KubernetesResource.class); + if (event == null) { + event = new WatchEvent(object, "MODIFIED"); + } else { + event.setObject(object); + } + } + if (event.getType() == null) { + event.setType("MODIFIED"); + } + return event; } - abstract static class ClientRunner { - private final OkHttpClient client; - - protected ClientRunner(OkHttpClient client) { - this.client = cloneAndCustomize(client); + protected void onMessage(String message) { + try { + WatchEvent event = readWatchEvent(message); + Object object = event.getObject(); + if (object instanceof HasMetadata) { + @SuppressWarnings("unchecked") + T obj = (T) object; + updateResourceVersion(obj.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + eventReceived(action, obj); + } else if (object instanceof KubernetesResourceList) { + // Dirty cast - should always be valid though + KubernetesResourceList list = (KubernetesResourceList) object; + updateResourceVersion(list.getMetadata().getResourceVersion()); + Action action = Action.valueOf(event.getType()); + List items = list.getItems(); + if (items != null) { + for (HasMetadata item : items) { + eventReceived(action, (T) item); + } + } + } else if (object instanceof Status) { + Status status = (Status) object; + + onStatus(status); + } else { + logger.error("Unknown message received: {}", message); + } + } catch (ClassCastException e) { + logger.error("Received wrong type of object for watch", e); + } catch (IllegalArgumentException e) { + logger.error("Invalid event type", e); + } catch (Exception e) { + logger.error("Unhandled exception encountered in watcher event handler", e); } + } - abstract void run(Request request); - void close() {} - void waitUntilReady() {} - abstract OkHttpClient cloneAndCustomize(OkHttpClient client); - OkHttpClient client() { - return client; + protected boolean onStatus(Status status) { + // The resource version no longer exists - this has to be handled by the caller. + if (status.getCode() == HTTP_GONE) { + close(new WatcherException(status.getMessage(), new KubernetesClientException(status))); + return true; } + + eventReceived(Action.ERROR, null); + logger.error("Error received: {}", status); + return false; } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index d0ee13a1088..f90ae37bd10 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -38,7 +38,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final InputStreamPumper pumper; - private final AtomicBoolean started = new AtomicBoolean(false); - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + private final CompletableFuture startedFuture = new CompletableFuture<>(); private final ExecListener listener; private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false); @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) { } public void waitUntilReady() { - Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS); + Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS); } @Override @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) { webSocketRef.set(webSocket); if (!executorService.isShutdown()) { executorService.submit(pumper); - started.set(true); - queue.add(true); + startedFuture.complete(null); } } catch (IOException e) { - queue.add(new KubernetesClientException(OperationSupport.createStatus(response))); + startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response))); } finally { if (listener != null) { listener.onOpen(response); @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { try { Status status = OperationSupport.createStatus(response); LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t); - //We only need to queue startup failures. - if (!started.get()) { - queue.add(new KubernetesClientException(status)); - } + startedFuture.completeExceptionally(new KubernetesClientException(status)); cleanUpOnce(); } finally { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index c8cd4e4ea1b..79890b78471 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -35,7 +35,7 @@ import java.io.PipedOutputStream; import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable { private final PipedInputStream output; private final Set toClose = new LinkedHashSet<>(); - private final AtomicBoolean started = new AtomicBoolean(false); - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + private final CompletableFuture startedFuture = new CompletableFuture<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -116,7 +115,7 @@ private void cleanUp() { } public void waitUntilReady() { - if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) { + if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) { if (LOGGER.isDebugEnabled()) { LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis."); } @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) { LOGGER.error("Log Callback Failure.", ioe); cleanUp(); - //We only need to queue startup failures. - if (!started.get()) { - queue.add(ioe); - } + startedFuture.completeExceptionally(ioe); } @Override @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException { if (!executorService.isShutdown()) { executorService.submit(pumper); - started.set(true); - queue.add(true); + startedFuture.complete(null); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 97a450116cc..b263ce0fdb7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -15,49 +15,56 @@ */ package io.fabric8.kubernetes.client.dsl.internal; -import java.net.MalformedURLException; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.api.model.WatchEvent; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.Watcher.Action; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; import okhttp3.WebSocket; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; -import static io.fabric8.kubernetes.client.dsl.internal.WatchHTTPManager.readWatchEvent; -import static java.net.HttpURLConnection.HTTP_GONE; +import java.net.MalformedURLException; +import java.util.concurrent.TimeUnit; +/** + * Manages a WebSocket and listener per request + */ public class WatchConnectionManager> extends AbstractWatchManager { - public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { - super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) - ); - - initRunner(new WebSocketClientRunner(client) { - @Override - WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { - return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef); - } + private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class); + + protected WatcherWebSocketListener listener; + private WebSocket websocket; - @Override - OkHttpClient cloneAndCustomize(OkHttpClient client) { - return client.newBuilder() - .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) - .build(); + static void closeWebSocket(WebSocket webSocket) { + if (webSocket != null) { + logger.debug("Closing websocket {}", webSocket); + try { + if (!webSocket.close(1000, null)) { + logger.debug("Websocket already closed {}", webSocket); + } + } catch (IllegalStateException e) { + logger.error("invalid code for websocket: {} {}", e.getClass(), e.getMessage()); } - }); - runWatch(); + } + } + + static void closeBody(Response response) { + if (response != null && response.body() != null) { + response.body().close(); + } + } + + public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException { + super( + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> client.newBuilder() + .readTimeout(websocketTimeout, TimeUnit.MILLISECONDS) + .build()); } public WatchConnectionManager(final OkHttpClient client, final BaseOperation baseOperation, final ListOptions listOptions, final Watcher watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout) throws MalformedURLException { @@ -65,58 +72,24 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation extends WatcherWebSocketListener { - public TypedWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { - super(manager, queue, webSocketRef); - } - - @Override - public void onMessage(WebSocket webSocket, String message) { - try { - WatchEvent event = readWatchEvent(message); - Object object = event.getObject(); - if (object instanceof HasMetadata) { - @SuppressWarnings("unchecked") - T obj = (T) object; - manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - manager.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - // Dirty cast - should always be valid though - KubernetesResourceList list = (KubernetesResourceList) object; - manager.updateResourceVersion(list.getMetadata().getResourceVersion()); - Action action = Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - manager.eventReceived(action, (T) item); - } - } - } else if (object instanceof Status) { - Status status = (Status) object; - - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe - // exception - // shut down executor, etc. - manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - manager.close(); - return; - } - - manager.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status); - } else { - logger.error("Unknown message received: {}", message); - } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); - } catch (Throwable e) { - logger.error("Unhandled exception encountered in watcher event handler", e); - } - } + @Override + protected synchronized void closeRequest() { + closeWebSocket(websocket); + websocket = null; + } + + public void waitUntilReady() { + getListener().waitUntilReady(); + } + + synchronized WatcherWebSocketListener getListener() { + return listener; + } + + @Override + protected synchronized void run(Request request) { + this.listener = new WatcherWebSocketListener<>(this); + this.websocket = client.newWebSocket(request, this.listener); } + } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java index a21bb6e4858..35a92dc494f 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java @@ -17,22 +17,14 @@ import java.io.IOException; import java.net.MalformedURLException; -import java.util.List; import java.util.concurrent.TimeUnit; import io.fabric8.kubernetes.api.model.HasMetadata; -import io.fabric8.kubernetes.api.model.KubernetesResource; import io.fabric8.kubernetes.api.model.KubernetesResourceList; import io.fabric8.kubernetes.api.model.ListOptions; -import io.fabric8.kubernetes.api.model.Status; -import io.fabric8.kubernetes.api.model.WatchEvent; -import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.Watcher; -import io.fabric8.kubernetes.client.Watcher.Action; -import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.BaseOperation; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; -import io.fabric8.kubernetes.client.utils.Serialization; import okhttp3.Call; import okhttp3.Callback; import okhttp3.Interceptor; @@ -44,11 +36,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static java.net.HttpURLConnection.HTTP_GONE; - public class WatchHTTPManager> extends AbstractWatchManager { private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class); - + private Call call; public WatchHTTPManager(final OkHttpClient client, final BaseOperation baseOperation, @@ -66,175 +56,67 @@ public WatchHTTPManager(final OkHttpClient client, throws MalformedURLException { super( - watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, new BaseOperationRequestBuilder<>(baseOperation, listOptions) - ); - - initRunner(new HTTPClientRunner(client, this) { - @Override - OkHttpClient cloneAndCustomize(OkHttpClient client) { - final OkHttpClient clonedClient = client.newBuilder() - .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) - .readTimeout(0, TimeUnit.MILLISECONDS) - .cache(null) - .build(); - // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does - // not let us stream responses from the server. - for (Interceptor i : clonedClient.networkInterceptors()) { - if (i instanceof HttpLoggingInterceptor) { - HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; - interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); + watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, + new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> { + final OkHttpClient clonedClient = client.newBuilder() + .connectTimeout(connectTimeout, TimeUnit.MILLISECONDS) + .readTimeout(0, TimeUnit.MILLISECONDS) + .cache(null) + .build(); + // If we set the HttpLoggingInterceptor's logging level to Body (as it is by default), it does + // not let us stream responses from the server. + for (Interceptor i : clonedClient.networkInterceptors()) { + if (i instanceof HttpLoggingInterceptor) { + HttpLoggingInterceptor interceptor = (HttpLoggingInterceptor) i; + interceptor.setLevel(HttpLoggingInterceptor.Level.BASIC); + } } - } - return clonedClient; - } - }); + return clonedClient; + }); - runWatch(); } - private abstract static class HTTPClientRunner extends AbstractWatchManager.ClientRunner { - private final AbstractWatchManager manager; - - public HTTPClientRunner(OkHttpClient client, AbstractWatchManager manager) { - super(client); - this.manager = manager; - } - - @Override - void run(Request request) { - client().newCall(request).enqueue(new Callback() { - @Override - public void onFailure(Call call, IOException e) { - logger.info("Watch connection failed. reason: {}", e.getMessage()); - scheduleReconnect(true); - } - - @Override - public void onResponse(Call call, Response response) throws IOException { - if (!response.isSuccessful()) { - onStatus(OperationSupport.createStatus(response.code(), response.message())); - } - - boolean shouldBackoff = true; - - try { - BufferedSource source = response.body().source(); - while (!source.exhausted()) { - String message = source.readUtf8LineStrict(); - onMessage(message); - } - // the normal operation of a long poll get is to return once a response is available. - // in that case we should reconnect immediately. - shouldBackoff = false; - } catch (Exception e) { - logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); - } - - // if we get here, the source is exhausted, so, we have lost our "watch". - // we must reconnect. - if (response != null) { - response.body().close(); - } - - scheduleReconnect(shouldBackoff); - } - }); - } - - private void scheduleReconnect(boolean shouldBackoff) { - if (manager.isForceClosed()) { - logger.warn("Ignoring error for already closed/closing connection"); - return; - } - - if (manager.cannotReconnect()) { - manager.onClose(new WatcherException("Connection unexpectedly closed")); - return; + @Override + protected synchronized void run(Request request) { + call = client.newCall(request); + call.enqueue(new Callback() { + @Override + public void onFailure(Call call, IOException e) { + logger.info("Watch connection failed. reason: {}", e.getMessage()); + scheduleReconnect(); } - manager.scheduleReconnect(() -> { + @Override + public void onResponse(Call call, Response response) throws IOException { try { - manager.runWatch(); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - close(); - manager.onClose(new WatcherException("Unhandled exception in reconnect attempt", e)); - } - }, shouldBackoff); - } - - public void onMessage(String messageSource) { - try { - WatchEvent event = readWatchEvent(messageSource); - KubernetesResource object = event.getObject(); - if (object instanceof HasMetadata) { - // Dirty cast - should always be valid though - @SuppressWarnings("unchecked") - T obj = (T) object; - manager.updateResourceVersion(obj.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - manager.eventReceived(action, obj); - } else if (object instanceof KubernetesResourceList) { - KubernetesResourceList list = (KubernetesResourceList) object; - // Dirty cast - should always be valid though - manager.updateResourceVersion(list.getMetadata().getResourceVersion()); - Watcher.Action action = Watcher.Action.valueOf(event.getType()); - List items = list.getItems(); - if (items != null) { - for (HasMetadata item : items) { - manager.eventReceived(action, (T) item); - } + resetReconnectAttempts(); + if (!response.isSuccessful() + && onStatus(OperationSupport.createStatus(response.code(), response.message()))) { + return; + } + BufferedSource source = response.body().source(); + while (!source.exhausted()) { + String message = source.readUtf8LineStrict(); + onMessage(message); } - } else if (object instanceof Status) { - onStatus((Status) object); - } else { - logger.error("Unknown message received: {}", messageSource); + } catch (Exception e) { + logger.info("Watch terminated unexpectedly. reason: {}", e.getMessage()); + } finally { + WatchConnectionManager.closeBody(response); } - } catch (ClassCastException e) { - logger.error("Received wrong type of object for watch", e); - } catch (IllegalArgumentException e) { - logger.error("Invalid event type", e); - } - } - - private void onStatus(Status status) { - // The resource version no longer exists - this has to be handled by the caller. - if (status.getCode() == HTTP_GONE) { - // exception - // shut down executor, etc. - close(); - manager.onClose(new WatcherException(status.getMessage(), new KubernetesClientException(status))); - return; + + // if we get here, the source is exhausted, so, we have lost our "watch". + // we must reconnect. + scheduleReconnect(); } - - manager.eventReceived(Action.ERROR, null); - logger.error("Error received: {}", status.toString()); - } + }); } - - protected static WatchEvent readWatchEvent(String messageSource) { - WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class); - KubernetesResource object = null; - if (event != null) { - object = event.getObject(); - } - // when watching API Groups we don't get a WatchEvent resource - // so the object will be null - // so lets try parse the message as a KubernetesResource - // as it will probably be a list of resources like a BuildList - if (object == null) { - object = Serialization.unmarshal(messageSource, KubernetesResource.class); - if (event == null) { - event = new WatchEvent(object, "MODIFIED"); - } else { - event.setObject(object); - } - } - if (event.getType() == null) { - event.setType("MODIFIED"); + @Override + protected synchronized void closeRequest() { + if (call != null) { + call.cancel(); + call = null; } - return event; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 617cdb1d9ea..d7eb8c99afc 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -15,14 +15,12 @@ */ package io.fabric8.kubernetes.client.dsl.internal; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; @@ -30,97 +28,74 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; + import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; -abstract class WatcherWebSocketListener extends WebSocketListener { +class WatcherWebSocketListener extends WebSocketListener { protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); - protected final AtomicReference webSocketRef; - /** - * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. - */ - private final AtomicBoolean started = new AtomicBoolean(false); - - /** - * Blocking queue for startup exceptions. - */ - private final BlockingQueue queue; + private final CompletableFuture startedFuture = new CompletableFuture<>(); protected final AbstractWatchManager manager; - protected WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + protected WatcherWebSocketListener(AbstractWatchManager manager) { this.manager = manager; - this.queue = queue; - this.webSocketRef = webSocketRef; } @Override public void onOpen(final WebSocket webSocket, Response response) { - if (response != null && response.body() != null) { - response.body().close(); - } + WatchConnectionManager.closeBody(response); logger.debug("WebSocket successfully opened"); - webSocketRef.set(webSocket); manager.resetReconnectAttempts(); - started.set(true); - queue.clear(); - queue.add(true); + startedFuture.complete(null); } @Override public void onFailure(WebSocket webSocket, Throwable t, Response response) { - if (manager.isForceClosed()) { - logger.debug("Ignoring onFailure for already closed/closing websocket", t); - // avoid resource leak though - if (response != null && response.body() != null) { - response.body().close(); - } - return; - } - - if (response != null) { - final int code = response.code(); - // We do not expect a 200 in response to the websocket connection. If it occurs, we throw - // an exception and try the watch via a persistent HTTP Get. - // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported - if (HTTP_OK == code || HTTP_UNAVAILABLE == code) { - pushException(new KubernetesClientException("Received " + code + " on websocket", code, null)); - closeBody(response); + try { + if (manager.isForceClosed()) { + logger.debug("Ignoring onFailure for already closed/closing websocket", t); return; - } else { - // We only need to queue startup failures. - Status status = OperationSupport.createStatus(response); - closeBody(response); - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t); - if (!started.get()) { - pushException(new KubernetesClientException(status)); - } } - } else { - logger.warn("Exec Failure", t); - if (!started.get()) { + + if (response != null) { + final int code = response.code(); + // We do not expect a 200 in response to the websocket connection. If it occurs, we throw + // an exception and try the watch via a persistent HTTP Get. + // Newer Kubernetes might also return 503 Service Unavailable in case WebSockets are not supported + if (HTTP_OK == code || HTTP_UNAVAILABLE == code) { + pushException(new KubernetesClientException("Received " + code + " on websocket", code, null)); + return; + } + Status status = OperationSupport.createStatus(response); + logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage()); + pushException(new KubernetesClientException(status)); + } else { + logger.warn("Exec Failure {} {}", t.getClass().getName(), t.getMessage()); pushException(new KubernetesClientException("Failed to start websocket", t)); } + } finally { + WatchConnectionManager.closeBody(response); } if (manager.cannotReconnect()) { - manager.closeEvent(new WatcherException("Connection failure", t)); + manager.close(new WatcherException("Connection failure", t)); return; } - scheduleReconnect(); + manager.scheduleReconnect(); } - private void pushException(KubernetesClientException exception) { - queue.clear(); - if (!queue.offer(exception)) { - logger.debug("Couldn't add exception {} to queue", exception.getLocalizedMessage()); - } + @Override + public void onMessage(WebSocket webSocket, String text) { + manager.onMessage(text); } - private void closeBody(Response response) { - if (response.body() != null) { - response.body().close(); + private void pushException(KubernetesClientException exception) { + if (!startedFuture.completeExceptionally(exception)) { + logger.debug("Couldn't report exception", exception); } } @@ -138,29 +113,10 @@ public void onClosing(WebSocket webSocket, int code, String reason) { @Override public void onClosed(WebSocket webSocket, int code, String reason) { logger.debug("WebSocket close received. code: {}, reason: {}", code, reason); - if (manager.isForceClosed()) { - logger.debug("Ignoring onClose for already closed/closing websocket"); - return; - } - if (manager.cannotReconnect()) { - manager.closeEvent(new WatcherException("Connection unexpectedly closed")); - return; - } - scheduleReconnect(); + manager.scheduleReconnect(); } - private void scheduleReconnect() { - webSocketRef.set(null); - manager.scheduleReconnect(() -> { - try { - manager.runWatch(); - } catch (Exception e) { - // An unexpected error occurred and we didn't even get an onFailure callback. - logger.error("Exception in reconnect", e); - webSocketRef.set(null); - manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); - manager.close(); - } - }, true); + protected void waitUntilReady() { + Utils.waitUntilReadyOrFail(startedFuture, 10, TimeUnit.SECONDS); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java deleted file mode 100644 index 228ef6b4f7e..00000000000 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2015 Red Hat, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.fabric8.kubernetes.client.dsl.internal; - -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import io.fabric8.kubernetes.client.utils.Utils; -import okhttp3.OkHttpClient; -import okhttp3.Request; -import okhttp3.WebSocket; - -abstract class WebSocketClientRunner extends AbstractWatchManager.ClientRunner { - private final AtomicReference webSocketRef = new AtomicReference<>(); - private final BlockingQueue queue = new ArrayBlockingQueue<>(1); - - protected WebSocketClientRunner(OkHttpClient client) { - super(client); - } - - @Override - public void run(Request request) { - client().newWebSocket(request, newListener(queue, webSocketRef)); - } - - abstract WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef); - - @Override - public void close() { - AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null)); - } - - @Override - public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); - } -} diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java index aba20819c39..8a5e657ba16 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java @@ -47,8 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -263,7 +262,7 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -275,10 +274,10 @@ private void waitUntilDeploymentIsScaled(final int count) { //If the deployment is gone, we shouldn't wait. if (deployment == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } @@ -288,7 +287,7 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); @@ -301,7 +300,7 @@ private void waitUntilDeploymentIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java index 3b47d0d0e33..3860b6b3042 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -93,7 +93,7 @@ public Scale scale(Scale scaleParam) { * Let's wait until there are enough Ready pods. */ private void waitUntilScaled(final int count) { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -105,9 +105,9 @@ private void waitUntilScaled(final int count) { //If the resource is gone, we shouldn't wait. if (t == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); } else { - queue.put(new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available.")); } return; } @@ -117,7 +117,7 @@ private void waitUntilScaled(final int count) { long generation = t.getMetadata().getGeneration() != null ? t.getMetadata().getGeneration() : -1; long observedGeneration = getObservedGeneration(t); if (observedGeneration >= generation && Objects.equals(desiredReplicas, currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } Log.debug("Only {}/{} replicas scheduled for {}: {} in namespace: {} seconds so waiting...", currentReplicas, desiredReplicas, t.getKind(), t.getMetadata().getName(), namespace); @@ -129,7 +129,7 @@ private void waitUntilScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(tPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { Log.debug("{}/{} pod(s) ready for {}: {} in namespace: {}.", replicasRef.get(), count, getType().getSimpleName(), name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java index 9e3e12e4363..5f393356194 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java @@ -53,8 +53,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -269,7 +268,7 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -281,10 +280,10 @@ private void waitUntilDeploymentIsScaled(final int count) { //If the deployment is gone, we shouldn't wait. if (deployment == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } @@ -294,7 +293,7 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); @@ -307,7 +306,7 @@ private void waitUntilDeploymentIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 26fb4d556c7..7a4af2d5e7e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -40,13 +40,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Random; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Stream; import org.slf4j.Logger; @@ -142,25 +144,36 @@ public static String join(final Object[] array, final char separator) { /** * Wait until an other thread signals the completion of a task. * If an exception is passed, it will be propagated to the caller. - * @param queue The communication channel. + * @param future The communication channel. * @param amount The amount of time to wait. * @param timeUnit The time unit. * * @return a boolean value indicating resource is ready or not. */ - public static boolean waitUntilReady(BlockingQueue queue, long amount, TimeUnit timeUnit) { + public static boolean waitUntilReady(Future future, long amount, TimeUnit timeUnit) { try { - Object obj = queue.poll(amount, timeUnit); - if (obj instanceof Boolean) { - return (Boolean) obj; - } else if (obj instanceof Throwable) { - Throwable t = (Throwable) obj; - t.addSuppressed(new Throwable("waiting here")); - throw t; - } + future.get(amount, timeUnit); + return true; + } catch (TimeoutException e) { return false; - } catch (Throwable t) { - throw KubernetesClientException.launderThrowable(t); + } catch (ExecutionException e) { + Throwable t = e; + if (e.getCause() != null) { + t = e.getCause(); + } + t.addSuppressed(new Throwable("waiting here")); + throw KubernetesClientException.launderThrowable(t); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + /** + * Similar to {@link #waitUntilReady(Future, long, TimeUnit)}, but will always throw an exception if not ready + */ + public static void waitUntilReadyOrFail(Future future, long amount, TimeUnit timeUnit) { + if (!waitUntilReady(future, amount, timeUnit)) { + throw new KubernetesClientException("not ready after " + amount + " " + timeUnit); } } diff --git a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java index bfce1d4357e..3e623025808 100644 --- a/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java +++ b/kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java @@ -15,10 +15,12 @@ */ package io.fabric8.kubernetes.client.dsl.internal; +import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.ListOptions; import io.fabric8.kubernetes.client.Watcher; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.utils.Utils; +import okhttp3.Request; import okhttp3.WebSocket; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; @@ -43,8 +45,8 @@ class AbstractWatchManagerTest { @DisplayName("closeEvent, is idempotent, multiple calls only close watcher once") void closeEventIsIdempotent() { // Given - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); // When for (int it = 0; it < 10; it++) { awm.closeEvent(); @@ -57,11 +59,11 @@ void closeEventIsIdempotent() { @DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once") void closeEventWithExceptionIsIdempotent() { // Given - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); // When for (int it = 0; it < 10; it++) { - awm.closeEvent(new WatcherException("Mock")); + awm.close(new WatcherException("Mock")); } // Then assertThat(watcher.closeCount.get()).isEqualTo(1); @@ -73,7 +75,7 @@ void closeWebSocket() { // Given final WebSocket webSocket = mock(WebSocket.class); // When - AbstractWatchManager.closeWebSocket(webSocket); + WatchConnectionManager.closeWebSocket(webSocket); // Then verify(webSocket, times(1)).close(1000, null); } @@ -82,7 +84,7 @@ void closeWebSocket() { @DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit") void nextReconnectInterval() { // Given - final WatchManager awm = new WatchManager<>( + final WatchManager awm = new WatchManager<>( null, mock(ListOptions.class), 0, 10, 5); // When-Then assertThat(awm.nextReconnectInterval()).isEqualTo(10); @@ -99,8 +101,8 @@ void nextReconnectInterval() { void cancelReconnectNullAttempt() { // Given final ScheduledFuture sf = spy(ScheduledFuture.class); - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); // When awm.cancelReconnect(); // Then @@ -114,9 +116,9 @@ void cancelReconnectNonNullAttempt() { final ScheduledFuture sf = mock(ScheduledFuture.class); final MockedStatic utils = mockStatic(Utils.class); utils.when(() -> Utils.schedule(any(), any(), anyLong(), any())).thenReturn(sf); - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); - awm.scheduleReconnect(null, false); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); + awm.scheduleReconnect(); // When awm.cancelReconnect(); // Then @@ -127,9 +129,8 @@ void cancelReconnectNonNullAttempt() { @DisplayName("isClosed, after close invocation, should return true") void isForceClosedWhenClosed() { // Given - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); - awm.initRunner(mock(AbstractWatchManager.ClientRunner.class)); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); // When awm.close(); // Then @@ -140,19 +141,17 @@ void isForceClosedWhenClosed() { @DisplayName("close, after close invocation, should return true") void closeWithNonNullRunnerShouldCancelRunner() { // Given - final AbstractWatchManager.ClientRunner clientRunner = mock(AbstractWatchManager.ClientRunner.class); - final WatcherAdapter watcher = new WatcherAdapter<>(); - final WatchManager awm = withDefaultWatchManager(watcher); - awm.initRunner(clientRunner); + final WatcherAdapter watcher = new WatcherAdapter<>(); + final WatchManager awm = withDefaultWatchManager(watcher); // When awm.close(); // Then - verify(clientRunner, times(1)).close(); + assertThat(awm.closeCount.get()).isEqualTo(1); } - private static WatchManager withDefaultWatchManager(Watcher watcher) { + private static WatchManager withDefaultWatchManager(Watcher watcher) { return new WatchManager<>( - watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 0, 0, 0); + watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0); } private static class WatcherAdapter implements Watcher { @@ -172,10 +171,26 @@ public void onClose() { } } - private static final class WatchManager extends AbstractWatchManager { + private static final class WatchManager extends AbstractWatchManager { + + private final AtomicInteger closeCount = new AtomicInteger(0); public WatchManager(Watcher watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) { - super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null); + super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null, ()->null); + } + + @Override + protected void run(Request request) { + + } + + @Override + protected void closeRequest() { + closeCount.addAndGet(1); + } + + @Override + protected void runWatch() { } } } diff --git a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java index 2661905dfe8..02aca30541a 100644 --- a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java +++ b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java @@ -19,16 +19,23 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.autoscaling.v1.Scale; import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.dsl.internal.LogWatchCallback; -import io.fabric8.kubernetes.client.utils.PodOperationUtil; import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext; +import io.fabric8.kubernetes.client.utils.PodOperationUtil; import io.fabric8.kubernetes.client.utils.URLUtils; +import io.fabric8.kubernetes.client.utils.Utils; +import io.fabric8.openshift.api.model.DeploymentConfig; +import io.fabric8.openshift.api.model.DeploymentConfigBuilder; +import io.fabric8.openshift.api.model.DeploymentConfigList; +import io.fabric8.openshift.client.dsl.DeployableScalableResource; import io.fabric8.openshift.client.dsl.internal.OpenShiftOperation; import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.ResponseBody; @@ -43,24 +50,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.UnaryOperator; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.utils.Utils; -import io.fabric8.openshift.api.model.DeploymentConfig; -import io.fabric8.openshift.api.model.DeploymentConfigBuilder; -import io.fabric8.openshift.api.model.DeploymentConfigList; -import io.fabric8.openshift.client.dsl.DeployableScalableResource; -import okhttp3.OkHttpClient; - import static io.fabric8.openshift.client.OpenShiftAPIGroups.APPS; public class DeploymentConfigOperationsImpl extends OpenShiftOperation queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -185,17 +183,17 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { //If the rs is gone, we shouldn't wait. if (deploymentConfig == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } replicasRef.set(deploymentConfig.getStatus().getReplicas()); int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0; if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...", deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace); @@ -208,7 +206,7 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { @@ -221,10 +219,12 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { } } + @Override public String getLog() { return getLog(false); } + @Override public String getLog(Boolean isPretty) { try(ResponseBody body = doGetLog(isPretty)) { return doGetLog(isPretty).string();