From 68d2716f67296d7ee2d27e9c2748d7431dbe6fba Mon Sep 17 00:00:00 2001 From: shawkins Date: Mon, 31 May 2021 12:31:17 -0400 Subject: [PATCH] fixing #3001 #3186 over logging of websocket exceptions and closure this localizes the state of a websocket to the listener, rather than the runner to have a clearer lifecycle. it also replaces the use of BlockingQueue with CompletableFuture. --- .../dsl/internal/AbstractWatchManager.java | 4 +- .../dsl/internal/ExecWebSocketListener.java | 17 ++---- .../client/dsl/internal/LogWatchCallback.java | 15 ++--- .../internal/RawWatchConnectionManager.java | 10 ++-- .../dsl/internal/WatchConnectionManager.java | 12 ++-- .../internal/WatcherWebSocketListener.java | 59 +++++++++---------- .../dsl/internal/WebSocketClientRunner.java | 45 +++++++------- .../apps/v1/DeploymentOperationsImpl.java | 13 ++-- .../v1/RollableScalableResourceOperation.java | 12 ++-- .../v1beta1/DeploymentOperationsImpl.java | 13 ++-- .../kubernetes/client/utils/Utils.java | 39 ++++++++---- .../apps/DeploymentConfigOperationsImpl.java | 38 ++++++------ 12 files changed, 138 insertions(+), 139 deletions(-) diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java index e54f54a9689..c39d82e98cb 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java @@ -169,10 +169,10 @@ static void closeWebSocket(WebSocket webSocket) { logger.debug("Closing websocket {}", webSocket); try { if (!webSocket.close(1000, null)) { - logger.warn("Failed to close websocket"); + logger.debug("Websocket already closed {}", webSocket); } } catch (IllegalStateException e) { - logger.error("Called close on already closed websocket: {} {}", e.getClass(), e.getMessage()); + logger.error("invalid code for websocket: {} {}", e.getClass(), e.getMessage()); } } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java index d0ee13a1088..f90ae37bd10 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/ExecWebSocketListener.java @@ -38,7 +38,7 @@ import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -81,8 +81,7 @@ public class ExecWebSocketListener extends WebSocketListener implements ExecWatc private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final InputStreamPumper pumper; - private final AtomicBoolean started = new AtomicBoolean(false); - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + private final CompletableFuture startedFuture = new CompletableFuture<>(); private final ExecListener listener; private final AtomicBoolean explicitlyClosed = new AtomicBoolean(false); @@ -185,7 +184,7 @@ private void closeWebSocketOnce(int code, String reason) { } public void waitUntilReady() { - Utils.waitUntilReady(queue, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS); + Utils.waitUntilReadyOrFail(startedFuture, config.getWebsocketTimeout(), TimeUnit.MILLISECONDS); } @Override @@ -207,11 +206,10 @@ public void onOpen(WebSocket webSocket, Response response) { webSocketRef.set(webSocket); if (!executorService.isShutdown()) { executorService.submit(pumper); - started.set(true); - queue.add(true); + startedFuture.complete(null); } } catch (IOException e) { - queue.add(new KubernetesClientException(OperationSupport.createStatus(response))); + startedFuture.completeExceptionally(new KubernetesClientException(OperationSupport.createStatus(response))); } finally { if (listener != null) { listener.onOpen(response); @@ -234,10 +232,7 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { try { Status status = OperationSupport.createStatus(response); LOGGER.error("Exec Failure: HTTP:" + status.getCode() + ". Message:" + status.getMessage(), t); - //We only need to queue startup failures. - if (!started.get()) { - queue.add(new KubernetesClientException(status)); - } + startedFuture.completeExceptionally(new KubernetesClientException(status)); cleanUpOnce(); } finally { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java index c8cd4e4ea1b..79890b78471 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/LogWatchCallback.java @@ -35,7 +35,7 @@ import java.io.PipedOutputStream; import java.util.LinkedHashSet; import java.util.Set; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -53,8 +53,7 @@ public class LogWatchCallback implements LogWatch, Callback, AutoCloseable { private final PipedInputStream output; private final Set toClose = new LinkedHashSet<>(); - private final AtomicBoolean started = new AtomicBoolean(false); - private final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + private final CompletableFuture startedFuture = new CompletableFuture<>(); private final ExecutorService executorService = Executors.newSingleThreadExecutor(); private final AtomicBoolean closed = new AtomicBoolean(false); @@ -116,7 +115,7 @@ private void cleanUp() { } public void waitUntilReady() { - if (!Utils.waitUntilReady(queue, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) { + if (!Utils.waitUntilReady(startedFuture, config.getRequestTimeout(), TimeUnit.MILLISECONDS)) { if (LOGGER.isDebugEnabled()) { LOGGER.warn("Log watch request has not been opened within: " + config.getRequestTimeout() + " millis."); } @@ -136,10 +135,7 @@ public void onFailure(Call call, IOException ioe) { LOGGER.error("Log Callback Failure.", ioe); cleanUp(); - //We only need to queue startup failures. - if (!started.get()) { - queue.add(ioe); - } + startedFuture.completeExceptionally(ioe); } @Override @@ -157,8 +153,7 @@ public void onResponse(Call call, final Response response) throws IOException { if (!executorService.isShutdown()) { executorService.submit(pumper); - started.set(true); - queue.add(true); + startedFuture.complete(null); } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java index 1d5e5fa48d1..53c55dfd9d7 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/RawWatchConnectionManager.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.atomic.AtomicReference; /** @@ -38,8 +38,8 @@ public RawWatchConnectionManager(OkHttpClient okHttpClient, HttpUrl.Builder watc initRunner(new WebSocketClientRunner(okHttpClient) { @Override - WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { - return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, queue, webSocketRef, objectMapper); + WatcherWebSocketListener newListener() { + return new RawWatcherWebSocketListener(RawWatchConnectionManager.this, objectMapper); } @Override @@ -53,8 +53,8 @@ OkHttpClient cloneAndCustomize(OkHttpClient client) { private static class RawWatcherWebSocketListener extends WatcherWebSocketListener { private final ObjectMapper objectMapper; - public RawWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef, ObjectMapper objectMapper) { - super(manager, queue, webSocketRef); + public RawWatcherWebSocketListener(AbstractWatchManager manager, ObjectMapper objectMapper) { + super(manager); this.objectMapper = objectMapper; } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java index 97a450116cc..be8f3121b8d 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java @@ -17,9 +17,7 @@ import java.net.MalformedURLException; import java.util.List; -import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; import io.fabric8.kubernetes.api.model.HasMetadata; import io.fabric8.kubernetes.api.model.KubernetesResourceList; @@ -46,8 +44,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation(client) { @Override - WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef) { - return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this, queue, webSocketRef); + WatcherWebSocketListener newListener() { + return new TypedWatcherWebSocketListener<>(WatchConnectionManager.this); } @Override @@ -66,8 +64,8 @@ public WatchConnectionManager(final OkHttpClient client, final BaseOperation extends WatcherWebSocketListener { - public TypedWatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { - super(manager, queue, webSocketRef); + public TypedWatcherWebSocketListener(AbstractWatchManager manager) { + super(manager); } @Override @@ -97,7 +95,7 @@ public void onMessage(WebSocket webSocket, String message) { // The resource version no longer exists - this has to be handled by the caller. if (status.getCode() == HTTP_GONE) { - webSocketRef.set(null); // lose the ref: closing in close() would only generate a Broken pipe + close(); // exception // shut down executor, etc. manager.closeEvent(new WatcherException(status.getMessage(), new KubernetesClientException(status))); diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java index 617cdb1d9ea..b0b3e79680a 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java @@ -15,14 +15,11 @@ */ package io.fabric8.kubernetes.client.dsl.internal; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - import io.fabric8.kubernetes.api.model.Status; import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.WatcherException; import io.fabric8.kubernetes.client.dsl.base.OperationSupport; +import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.Response; import okhttp3.WebSocket; import okhttp3.WebSocketListener; @@ -30,28 +27,23 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; abstract class WatcherWebSocketListener extends WebSocketListener { protected static final Logger logger = LoggerFactory.getLogger(WatcherWebSocketListener.class); - protected final AtomicReference webSocketRef; - /** - * True if an onOpen callback was received on the first connect attempt, ie. the watch was successfully started. - */ - private final AtomicBoolean started = new AtomicBoolean(false); + protected final AtomicReference webSocketRef = new AtomicReference<>(); - /** - * Blocking queue for startup exceptions. - */ - private final BlockingQueue queue; + private final CompletableFuture startedFuture = new CompletableFuture<>(); protected final AbstractWatchManager manager; - protected WatcherWebSocketListener(AbstractWatchManager manager, BlockingQueue queue, AtomicReference webSocketRef) { + protected WatcherWebSocketListener(AbstractWatchManager manager) { this.manager = manager; - this.queue = queue; - this.webSocketRef = webSocketRef; } @Override @@ -62,9 +54,10 @@ public void onOpen(final WebSocket webSocket, Response response) { logger.debug("WebSocket successfully opened"); webSocketRef.set(webSocket); manager.resetReconnectAttempts(); - started.set(true); - queue.clear(); - queue.add(true); + startedFuture.complete(null); + if (manager.isForceClosed()) { + close(); + } } @Override @@ -88,19 +81,14 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { closeBody(response); return; } else { - // We only need to queue startup failures. Status status = OperationSupport.createStatus(response); closeBody(response); - logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage(), t); - if (!started.get()) { - pushException(new KubernetesClientException(status)); - } + logger.warn("Exec Failure: HTTP {}, Status: {} - {}", code, status.getCode(), status.getMessage()); + pushException(new KubernetesClientException(status)); } } else { - logger.warn("Exec Failure", t); - if (!started.get()) { - pushException(new KubernetesClientException("Failed to start websocket", t)); - } + logger.warn("Exec Failure {} {}", t.getClass().getName(), t.getMessage()); + pushException(new KubernetesClientException("Failed to start websocket", t)); } if (manager.cannotReconnect()) { @@ -112,9 +100,8 @@ public void onFailure(WebSocket webSocket, Throwable t, Response response) { } private void pushException(KubernetesClientException exception) { - queue.clear(); - if (!queue.offer(exception)) { - logger.debug("Couldn't add exception {} to queue", exception.getLocalizedMessage()); + if (!startedFuture.completeExceptionally(exception)) { + logger.debug("Couldn't report exception", exception); } } @@ -157,10 +144,18 @@ private void scheduleReconnect() { } catch (Exception e) { // An unexpected error occurred and we didn't even get an onFailure callback. logger.error("Exception in reconnect", e); - webSocketRef.set(null); manager.closeEvent(new WatcherException("Unhandled exception in reconnect attempt", e)); manager.close(); } }, true); } + + protected void close() { + startedFuture.completeExceptionally(new IllegalStateException("already closed")); + AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null)); + } + + protected void waitUntilReady() { + Utils.waitUntilReadyOrFail(startedFuture, 10, TimeUnit.SECONDS); + } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java index 228ef6b4f7e..205fb7d2e4e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WebSocketClientRunner.java @@ -15,38 +15,43 @@ */ package io.fabric8.kubernetes.client.dsl.internal; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - -import io.fabric8.kubernetes.client.utils.Utils; import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.WebSocket; abstract class WebSocketClientRunner extends AbstractWatchManager.ClientRunner { - private final AtomicReference webSocketRef = new AtomicReference<>(); - private final BlockingQueue queue = new ArrayBlockingQueue<>(1); - + + protected WatcherWebSocketListener listener; + protected WebSocketClientRunner(OkHttpClient client) { super(client); } - + @Override - public void run(Request request) { - client().newWebSocket(request, newListener(queue, webSocketRef)); + public synchronized void run(Request request) { + close(); + this.listener = newListener(); + client().newWebSocket(request, this.listener); } - - abstract WatcherWebSocketListener newListener(BlockingQueue queue, AtomicReference webSocketRef); - + + abstract WatcherWebSocketListener newListener(); + @Override - public void close() { - AbstractWatchManager.closeWebSocket(webSocketRef.getAndSet(null)); + public synchronized void close() { + if (this.listener != null) { + listener.close(); + } } - + @Override public void waitUntilReady() { - Utils.waitUntilReady(queue, 10, TimeUnit.SECONDS); + WatcherWebSocketListener current = getListener(); + if (current == null) { + throw new IllegalStateException(); + } + current.waitUntilReady(); + } + + synchronized WatcherWebSocketListener getListener() { + return listener; } } diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java index aba20819c39..8a5e657ba16 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/DeploymentOperationsImpl.java @@ -47,8 +47,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -263,7 +262,7 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -275,10 +274,10 @@ private void waitUntilDeploymentIsScaled(final int count) { //If the deployment is gone, we shouldn't wait. if (deployment == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } @@ -288,7 +287,7 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); @@ -301,7 +300,7 @@ private void waitUntilDeploymentIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java index 3b47d0d0e33..3860b6b3042 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/apps/v1/RollableScalableResourceOperation.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import java.util.Objects; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; @@ -93,7 +93,7 @@ public Scale scale(Scale scaleParam) { * Let's wait until there are enough Ready pods. */ private void waitUntilScaled(final int count) { - final ArrayBlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -105,9 +105,9 @@ private void waitUntilScaled(final int count) { //If the resource is gone, we shouldn't wait. if (t == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); } else { - queue.put(new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for " + getType().getSimpleName() + ": " +name + " in namespace: " + namespace + " to scale. Resource is no longer available.")); } return; } @@ -117,7 +117,7 @@ private void waitUntilScaled(final int count) { long generation = t.getMetadata().getGeneration() != null ? t.getMetadata().getGeneration() : -1; long observedGeneration = getObservedGeneration(t); if (observedGeneration >= generation && Objects.equals(desiredReplicas, currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } Log.debug("Only {}/{} replicas scheduled for {}: {} in namespace: {} seconds so waiting...", currentReplicas, desiredReplicas, t.getKind(), t.getMetadata().getName(), namespace); @@ -129,7 +129,7 @@ private void waitUntilScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(tPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { Log.debug("{}/{} pod(s) ready for {}: {} in namespace: {}.", replicasRef.get(), count, getType().getSimpleName(), name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java index 9e3e12e4363..5f393356194 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/extensions/v1beta1/DeploymentOperationsImpl.java @@ -53,8 +53,7 @@ import java.util.Map; import java.util.Objects; import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -269,7 +268,7 @@ public ImageEditReplacePatchable withTimeout(long timeout, TimeUnit * Lets wait until there are enough Ready pods of the given Deployment */ private void waitUntilDeploymentIsScaled(final int count) { - final BlockingQueue queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -281,10 +280,10 @@ private void waitUntilDeploymentIsScaled(final int count) { //If the deployment is gone, we shouldn't wait. if (deployment == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for Deployment: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } @@ -294,7 +293,7 @@ private void waitUntilDeploymentIsScaled(final int count) { long generation = deployment.getMetadata().getGeneration() != null ? deployment.getMetadata().getGeneration() : 0; long observedGeneration = deployment.getStatus() != null && deployment.getStatus().getObservedGeneration() != null ? deployment.getStatus().getObservedGeneration() : -1; if (observedGeneration >= generation && Objects.equals(deployment.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for Deployment: {} in namespace: {} seconds so waiting...", deployment.getStatus().getReplicas(), deployment.getSpec().getReplicas(), deployment.getMetadata().getName(), namespace); @@ -307,7 +306,7 @@ private void waitUntilDeploymentIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for Deployment: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { diff --git a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java index 26fb4d556c7..7a4af2d5e7e 100644 --- a/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java +++ b/kubernetes-client/src/main/java/io/fabric8/kubernetes/client/utils/Utils.java @@ -40,13 +40,15 @@ import java.util.Objects; import java.util.Optional; import java.util.Random; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Function; import java.util.stream.Stream; import org.slf4j.Logger; @@ -142,25 +144,36 @@ public static String join(final Object[] array, final char separator) { /** * Wait until an other thread signals the completion of a task. * If an exception is passed, it will be propagated to the caller. - * @param queue The communication channel. + * @param future The communication channel. * @param amount The amount of time to wait. * @param timeUnit The time unit. * * @return a boolean value indicating resource is ready or not. */ - public static boolean waitUntilReady(BlockingQueue queue, long amount, TimeUnit timeUnit) { + public static boolean waitUntilReady(Future future, long amount, TimeUnit timeUnit) { try { - Object obj = queue.poll(amount, timeUnit); - if (obj instanceof Boolean) { - return (Boolean) obj; - } else if (obj instanceof Throwable) { - Throwable t = (Throwable) obj; - t.addSuppressed(new Throwable("waiting here")); - throw t; - } + future.get(amount, timeUnit); + return true; + } catch (TimeoutException e) { return false; - } catch (Throwable t) { - throw KubernetesClientException.launderThrowable(t); + } catch (ExecutionException e) { + Throwable t = e; + if (e.getCause() != null) { + t = e.getCause(); + } + t.addSuppressed(new Throwable("waiting here")); + throw KubernetesClientException.launderThrowable(t); + } catch (Exception e) { + throw KubernetesClientException.launderThrowable(e); + } + } + + /** + * Similar to {@link #waitUntilReady(Future, long, TimeUnit)}, but will always throw an exception if not ready + */ + public static void waitUntilReadyOrFail(Future future, long amount, TimeUnit timeUnit) { + if (!waitUntilReady(future, amount, timeUnit)) { + throw new KubernetesClientException("not ready after " + amount + " " + timeUnit); } } diff --git a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java index 2661905dfe8..cfd71434203 100644 --- a/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java +++ b/openshift-client/src/main/java/io/fabric8/openshift/client/dsl/internal/apps/DeploymentConfigOperationsImpl.java @@ -19,16 +19,23 @@ import io.fabric8.kubernetes.api.model.Pod; import io.fabric8.kubernetes.api.model.autoscaling.v1.Scale; import io.fabric8.kubernetes.client.Config; +import io.fabric8.kubernetes.client.KubernetesClientException; import io.fabric8.kubernetes.client.dsl.LogWatch; import io.fabric8.kubernetes.client.dsl.Loggable; import io.fabric8.kubernetes.client.dsl.PodResource; import io.fabric8.kubernetes.client.dsl.base.OperationContext; import io.fabric8.kubernetes.client.dsl.internal.LogWatchCallback; -import io.fabric8.kubernetes.client.utils.PodOperationUtil; import io.fabric8.kubernetes.client.dsl.internal.RollingOperationContext; +import io.fabric8.kubernetes.client.utils.PodOperationUtil; import io.fabric8.kubernetes.client.utils.URLUtils; +import io.fabric8.kubernetes.client.utils.Utils; +import io.fabric8.openshift.api.model.DeploymentConfig; +import io.fabric8.openshift.api.model.DeploymentConfigBuilder; +import io.fabric8.openshift.api.model.DeploymentConfigList; +import io.fabric8.openshift.client.dsl.DeployableScalableResource; import io.fabric8.openshift.client.dsl.internal.OpenShiftOperation; import okhttp3.HttpUrl; +import okhttp3.OkHttpClient; import okhttp3.Request; import okhttp3.Response; import okhttp3.ResponseBody; @@ -43,24 +50,15 @@ import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.function.Consumer; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import java.util.function.UnaryOperator; -import io.fabric8.kubernetes.client.KubernetesClientException; -import io.fabric8.kubernetes.client.utils.Utils; -import io.fabric8.openshift.api.model.DeploymentConfig; -import io.fabric8.openshift.api.model.DeploymentConfigBuilder; -import io.fabric8.openshift.api.model.DeploymentConfigList; -import io.fabric8.openshift.client.dsl.DeployableScalableResource; -import okhttp3.OkHttpClient; - import static io.fabric8.openshift.client.OpenShiftAPIGroups.APPS; public class DeploymentConfigOperationsImpl extends OpenShiftOperation queue = new ArrayBlockingQueue<>(1); + final CompletableFuture scaledFuture = new CompletableFuture<>(); final AtomicReference replicasRef = new AtomicReference<>(0); final String name = checkName(getItem()); @@ -185,17 +183,17 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { //If the rs is gone, we shouldn't wait. if (deploymentConfig == null) { if (count == 0) { - queue.put(true); + scaledFuture.complete(null); return; } else { - queue.put(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); + scaledFuture.completeExceptionally(new IllegalStateException("Can't wait for DeploymentConfig: " + checkName(getItem()) + " in namespace: " + checkName(getItem()) + " to scale. Resource is no longer available.")); return; } } replicasRef.set(deploymentConfig.getStatus().getReplicas()); int currentReplicas = deploymentConfig.getStatus().getReplicas() != null ? deploymentConfig.getStatus().getReplicas() : 0; if (deploymentConfig.getStatus().getObservedGeneration() >= deploymentConfig.getMetadata().getGeneration() && Objects.equals(deploymentConfig.getSpec().getReplicas(), currentReplicas)) { - queue.put(true); + scaledFuture.complete(null); } else { LOG.debug("Only {}/{} pods scheduled for DeploymentConfig: {} in namespace: {} seconds so waiting...", deploymentConfig.getStatus().getReplicas(), deploymentConfig.getSpec().getReplicas(), deploymentConfig.getMetadata().getName(), namespace); @@ -208,7 +206,7 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); ScheduledFuture poller = executor.scheduleWithFixedDelay(deploymentPoller, 0, POLL_INTERVAL_MS, TimeUnit.MILLISECONDS); try { - if (Utils.waitUntilReady(queue, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { + if (Utils.waitUntilReady(scaledFuture, getConfig().getScaleTimeout(), TimeUnit.MILLISECONDS)) { LOG.debug("{}/{} pod(s) ready for DeploymentConfig: {} in namespace: {}.", replicasRef.get(), count, name, namespace); } else { @@ -221,11 +219,13 @@ private void waitUntilDeploymentConfigIsScaled(final int count) { } } - public String getLog() { + @Override +public String getLog() { return getLog(false); } - public String getLog(Boolean isPretty) { + @Override +public String getLog(Boolean isPretty) { try(ResponseBody body = doGetLog(isPretty)) { return doGetLog(isPretty).string(); } catch (IOException e) {