diff --git a/pom.xml b/pom.xml index b84b2de49..bacff7c19 100644 --- a/pom.xml +++ b/pom.xml @@ -72,6 +72,11 @@ THE SOFTWARE. + + io.github.resilience4j + resilience4j-retry + 1.7.1 + args4j args4j diff --git a/src/main/java/hudson/remoting/Engine.java b/src/main/java/hudson/remoting/Engine.java index af6fe2132..38aa232eb 100644 --- a/src/main/java/hudson/remoting/Engine.java +++ b/src/main/java/hudson/remoting/Engine.java @@ -1,18 +1,18 @@ /* * The MIT License - * + * * Copyright (c) 2004-2009, Sun Microsystems, Inc., Kohsuke Kawaguchi - * + * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal * in the Software without restriction, including without limitation the rights * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell * copies of the Software, and to permit persons to whom the Software is * furnished to do so, subject to the following conditions: - * + * * The above copyright notice and this permission notice shall be included in * all copies or substantial portions of the Software. - * + * * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE @@ -31,12 +31,9 @@ import java.io.FileInputStream; import java.io.FileNotFoundException; import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.Socket; -import java.net.URI; -import java.net.URL; +import java.net.*; import java.nio.ByteBuffer; +import java.nio.channels.UnresolvedAddressException; import java.nio.file.Path; import java.security.AccessController; import java.security.KeyManagementException; @@ -62,6 +59,7 @@ import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; import java.util.logging.Level; import java.util.logging.Logger; import java.util.stream.Collectors; @@ -70,13 +68,11 @@ import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManager; import javax.net.ssl.TrustManagerFactory; -import javax.websocket.ClientEndpointConfig; -import javax.websocket.CloseReason; -import javax.websocket.ContainerProvider; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.HandshakeResponse; -import javax.websocket.Session; +import javax.websocket.*; + +import io.github.resilience4j.core.IntervalFunction; +import io.github.resilience4j.retry.Retry; +import io.github.resilience4j.retry.RetryConfig; import net.jcip.annotations.NotThreadSafe; import org.jenkinsci.remoting.engine.Jnlp4ConnectionState; import org.jenkinsci.remoting.engine.JnlpAgentEndpoint; @@ -180,7 +176,7 @@ public Thread newThread(@NonNull final Runnable r) { * @since 2.62.1 */ private boolean keepAlive = true; - + @CheckForNull private JarCache jarCache = null; @@ -192,14 +188,14 @@ public Thread newThread(@NonNull final Runnable r) { */ @CheckForNull private Path agentLog; - + /** * Specified location of the property file with JUL settings. * @since 3.8 */ @CheckForNull private Path loggingConfigFilePath = null; - + /** * Specifies a default working directory of the remoting instance. * If specified, this directory will be used to store logs, JAR cache, etc. @@ -236,6 +232,8 @@ public Thread newThread(@NonNull final Runnable r) { private final String instanceIdentity; private final Set protocols; + private Integer retryAttempts; + public Engine(EngineListener listener, List hudsonUrls, String secretKey, String agentName) { this(listener, hudsonUrls, secretKey, agentName, null, null, null); } @@ -280,7 +278,7 @@ private static URL ensureTrailingSlash(URL u) { public synchronized void startEngine() throws IOException { startEngine(false); } - + /** * Starts engine. * @param dryRun If {@code true}, do not actually start the engine. @@ -289,7 +287,7 @@ public synchronized void startEngine() throws IOException { /*package*/ void startEngine(boolean dryRun) throws IOException { LOGGER.log(Level.INFO, "Using Remoting version: {0}", Launcher.VERSION); @CheckForNull File jarCacheDirectory = null; - + // Prepare the working directory if required if (workDir != null) { final WorkDirManager workDirManager = WorkDirManager.getInstance(); @@ -297,11 +295,11 @@ public synchronized void startEngine() throws IOException { // Somebody has already specificed Jar Cache, hence we do not need it in the workspace. workDirManager.disable(WorkDirManager.DirType.JAR_CACHE_DIR); } - + if (loggingConfigFilePath != null) { workDirManager.setLoggingConfig(loggingConfigFilePath.toFile()); } - + final Path path = workDirManager.initializeWorkDir(workDir.toFile(), internalDir, failIfWorkDirIsMissing); jarCacheDirectory = workDirManager.getLocation(WorkDirManager.DirType.JAR_CACHE_DIR); workDirManager.setupLogging(path, agentLog); @@ -309,7 +307,7 @@ public synchronized void startEngine() throws IOException { LOGGER.log(Level.WARNING, "No Working Directory. Using the legacy JAR Cache location: {0}", JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION); jarCacheDirectory = JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION; } - + if (jarCache == null){ if (jarCacheDirectory == null) { // Should never happen in the current code @@ -324,7 +322,7 @@ public synchronized void startEngine() throws IOException { } else { LOGGER.log(Level.INFO, "Using custom JAR Cache: {0}", jarCache); } - + // Start the engine thread if (!dryRun) { this.start(); @@ -340,7 +338,7 @@ public synchronized void startEngine() throws IOException { public void setJarCache(@NonNull JarCache jarCache) { this.jarCache = jarCache; } - + /** * Sets path to the property file with JUL settings. * @param filePath JAR Cache to be used @@ -660,8 +658,32 @@ public void closeRead() throws IOException { } hudsonUrl = candidateUrls.get(0); String wsUrl = hudsonUrl.toString().replaceFirst("^http", "ws"); - ContainerProvider.getWebSocketContainer().connectToServer(new AgentEndpoint(), - ClientEndpointConfig.Builder.create().configurator(headerHandler).build(), URI.create(wsUrl + "wsagents/")); + ClientEndpointConfig endpointConfig = ClientEndpointConfig.Builder.create() + .configurator(headerHandler).build(); + URI wsAgentsUri = URI.create(wsUrl + "wsagents/"); + + Supplier wsConnectSupplier = () -> { + AgentEndpoint endpoint = new AgentEndpoint(); + + try { + ContainerProvider.getWebSocketContainer() + .connectToServer(endpoint, endpointConfig, wsAgentsUri); + } catch (UnresolvedAddressException x) { + LOGGER.log(Level.WARNING, "Error connect to WS server", x); + return Boolean.TRUE; + } catch (DeploymentException | IOException x) { + throw new RuntimeException(x); + } + + return Boolean.FALSE; + }; + + Boolean retryResult = exponentialRetry(retryAttempts, wsConnectSupplier); + + if (retryResult) { + throw new IllegalStateException("Can't connect to WebSocket instance"); + } + while (ch.get() == null) { Thread.sleep(100); } @@ -697,6 +719,26 @@ public void closeRead() throws IOException { } } + /** + * Tries to perform supplier function with exponential retry + * + * @param attempts Retry attempts + * @param supplier Supplier function + * @return + */ + public static Boolean exponentialRetry(Integer attempts, Supplier supplier) { + IntervalFunction fn = IntervalFunction.ofExponentialBackoff(); + RetryConfig rc = RetryConfig.custom() + .maxAttempts(attempts) + .intervalFunction(fn) + .retryOnResult(result -> (Boolean) result) + .build(); + + Retry retry = Retry.of("retry", rc); + + return retry.executeSupplier(supplier); + } + private void reconnect() { try { events.status("Performing onReconnect operation."); @@ -735,33 +777,42 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) { } events.status("Locating server among " + candidateUrls); - final JnlpAgentEndpoint endpoint; - try { - endpoint = resolver.resolve(); - } catch (Exception e) { - if (Boolean.getBoolean(Engine.class.getName() + ".nonFatalJnlpAgentEndpointResolutionExceptions")) { - events.status("Could not resolve JNLP agent endpoint", e); - } else { - events.error(e); + AtomicReference endpointRef = new AtomicReference<>(); + + Supplier endpointSupplier = () -> { + try { + JnlpAgentEndpoint endpoint = resolver.resolve(); + + endpointRef.set(endpoint); + } catch (Exception x) { + LOGGER.log(Level.WARNING, "Can't resolve JNLP endpoint", x); + return Boolean.TRUE; } - return; - } - if (endpoint == null) { + + return Boolean.FALSE; + }; + + Boolean retryResult = exponentialRetry(retryAttempts, endpointSupplier); + + JnlpAgentEndpoint jnlpAgentEndpoint = endpointRef.get(); + + if (retryResult || jnlpAgentEndpoint == null) { events.status("Could not resolve server among " + candidateUrls); return; } - hudsonUrl = endpoint.getServiceUrl(); + + hudsonUrl = jnlpAgentEndpoint.getServiceUrl(); events.status(String.format("Agent discovery successful%n" + " Agent address: %s%n" + " Agent port: %d%n" + " Identity: %s", - endpoint.getHost(), - endpoint.getPort(), - KeyUtils.fingerprint(endpoint.getPublicKey())) + jnlpAgentEndpoint.getHost(), + jnlpAgentEndpoint.getPort(), + KeyUtils.fingerprint(jnlpAgentEndpoint.getPublicKey())) ); PublicKeyMatchingX509ExtendedTrustManager delegate = new PublicKeyMatchingX509ExtendedTrustManager(); - RSAPublicKey publicKey = endpoint.getPublicKey(); + RSAPublicKey publicKey = jnlpAgentEndpoint.getPublicKey(); if (publicKey != null) { // This is so that JNLP4-connect will only connect if the public key matches // if the public key is not published then JNLP4-connect will refuse to connect @@ -770,7 +821,7 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) { agentTrustManager.setDelegate(delegate); events.status("Handshaking"); - Socket jnlpSocket = connectTcp(endpoint); + Socket jnlpSocket = connectTcp(jnlpAgentEndpoint); Channel channel = null; try { @@ -782,16 +833,16 @@ private void innerRun(IOHub hub, SSLContext context, ExecutorService service) { continue; } if (jnlpSocket == null) { - jnlpSocket = connectTcp(endpoint); + jnlpSocket = connectTcp(jnlpAgentEndpoint); } - if (!endpoint.isProtocolSupported(protocol.getName())) { + if (!jnlpAgentEndpoint.isProtocolSupported(protocol.getName())) { events.status("Server reports protocol " + protocol.getName() + " not supported, skipping"); continue; } triedAtLeastOneProtocol = true; events.status("Trying protocol: " + protocol.getName()); try { - channel = protocol.connect(jnlpSocket, headers, new EngineJnlpConnectionStateListener(endpoint.getPublicKey(), headers)).get(); + channel = protocol.connect(jnlpSocket, headers, new EngineJnlpConnectionStateListener(jnlpAgentEndpoint.getPublicKey(), headers)).get(); } catch (IOException ioe) { events.status("Protocol " + protocol.getName() + " failed to establish channel", ioe); } catch (RuntimeException e) { @@ -879,24 +930,37 @@ private void onConnectionRejected(String greeting) throws InterruptedException { * @param endpoint Connection endpoint * @throws IOException Connection failure or invalid parameter specification */ - private Socket connectTcp(@NonNull JnlpAgentEndpoint endpoint) throws IOException, InterruptedException { - + private Socket connectTcp(@NonNull JnlpAgentEndpoint endpoint) { String msg = "Connecting to " + endpoint.getHost() + ':' + endpoint.getPort(); events.status(msg); - int retry = 1; - while(true) { + + AtomicReference socketRef = new AtomicReference<>(); + + Supplier tcpSupplier = () -> { try { - final Socket s = endpoint.open(SOCKET_TIMEOUT); // default is 30 mins. See PingThread for the ping interval - s.setKeepAlive(keepAlive); - return s; - } catch (IOException e) { - if(retry++>10) { - throw e; - } - TimeUnit.SECONDS.sleep(10); - events.status(msg+" (retrying:"+retry+")",e); + Socket socket = endpoint.open(SOCKET_TIMEOUT); + socket.setKeepAlive(keepAlive); + + socketRef.set(socket); + } catch (IOException x) { + LOGGER.log(Level.WARNING, "Can't open TCP connection", x); + return Boolean.TRUE; } + + return Boolean.FALSE; + }; + + Boolean retryResult = exponentialRetry(retryAttempts, tcpSupplier); + + if (retryResult) { + throw new IllegalStateException("TCP socket is not initialized"); + } + + if (socketRef.get() != null) { + return socketRef.get(); } + + throw new IllegalStateException("TCP socket is not initialized"); } /** @@ -1028,7 +1092,7 @@ private SSLSocketFactory getSSLSocketFactory() } return sslSocketFactory; } - + /** * Socket read timeout. * A {@link SocketInputStream#read()} call associated with underlying Socket will block for only this amount of time @@ -1059,6 +1123,10 @@ public String getProtocolName() { return this.protocolName; } + public void setRetryAttempts(Integer retryAttempts) { + this.retryAttempts = retryAttempts; + } + private class EngineJnlpConnectionStateListener extends JnlpConnectionStateListener { private final RSAPublicKey publicKey; diff --git a/src/main/java/hudson/remoting/jnlp/Main.java b/src/main/java/hudson/remoting/jnlp/Main.java index e7a0e3a40..c49b94d37 100644 --- a/src/main/java/hudson/remoting/jnlp/Main.java +++ b/src/main/java/hudson/remoting/jnlp/Main.java @@ -77,6 +77,10 @@ public class Main { usage="(deprecated; now always headless)") public boolean headlessMode; + @Option(name = "-retry-attempts", + usage = "Agent connect backoff retry attempts") + public int retryAttempts = 20; + @Option(name="-url", usage="Specify the Jenkins root URLs to connect to.") public List urls = new ArrayList<>(); @@ -305,6 +309,7 @@ public Engine createEngine() { engine.setJarCache(new FileSystemJarCache(jarCache,true)); engine.setNoReconnect(noReconnect); engine.setKeepAlive(!noKeepAlive); + engine.setRetryAttempts(retryAttempts); if (disableHttpsCertValidation) { LOGGER.log(WARNING, "Certificate validation for HTTPs endpoints is disabled"); diff --git a/src/test/java/hudson/remoting/EngineTest.java b/src/test/java/hudson/remoting/EngineTest.java index f285291be..8596d85f7 100644 --- a/src/test/java/hudson/remoting/EngineTest.java +++ b/src/test/java/hudson/remoting/EngineTest.java @@ -35,60 +35,71 @@ import java.net.URL; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.nullValue; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; /** * Tests of {@link Engine} * @author Oleg Nenashev */ public class EngineTest { - + private static final String SECRET_KEY = "Hello, world!"; private static final String AGENT_NAME = "testAgent"; private List jenkinsUrls; - + @Rule public TemporaryFolder tmpDir = new TemporaryFolder(); - + @Rule public WorkDirManagerRule mgr = new WorkDirManagerRule(); - + @Before public void init() throws Exception { jenkinsUrls = Collections.singletonList(new URL("http://my.jenkins.not.existent")); } - + @Test @Issue("JENKINS-44290") public void shouldInitializeCorrectlyWithDefaults() throws Exception { EngineListener l = new TestEngineListener(); Engine engine = new Engine(l, jenkinsUrls, SECRET_KEY, AGENT_NAME); engine.startEngine(true); - + // Cache will go to ~/.jenkins , we do not want to worry anbout this repo - assertTrue("Default JarCache should be touched: " + JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION.getAbsolutePath(), + assertTrue("Default JarCache should be touched: " + JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION.getAbsolutePath(), JarCache.DEFAULT_NOWS_JAR_CACHE_LOCATION.exists()); } - + @Test public void shouldInitializeCorrectlyWithCustomCache() throws Exception { File jarCache = new File(tmpDir.getRoot(), "jarCache"); - + EngineListener l = new TestEngineListener(); Engine engine = new Engine(l, jenkinsUrls, SECRET_KEY, AGENT_NAME); engine.setJarCache(new FileSystemJarCache(jarCache, true)); engine.startEngine(true); - + // Cache will go to ~/.jenkins , should be touched by default - assertTrue("The specified JarCache should be touched: " + jarCache.getAbsolutePath(), + assertTrue("The specified JarCache should be touched: " + jarCache.getAbsolutePath(), jarCache.exists()); } - + + @Test + public void shouldPerformRetryCount() { + AtomicInteger count = new AtomicInteger(0); + + Engine.exponentialRetry(15, + () -> count.getAndIncrement() < 3); + + assertEquals(4, count.get()); + } + @Test public void shouldInitializeCorrectlyWithWorkDir() throws Exception { File workDir = new File(tmpDir.getRoot(), "workDir"); @@ -96,14 +107,14 @@ public void shouldInitializeCorrectlyWithWorkDir() throws Exception { Engine engine = new Engine(l, jenkinsUrls, SECRET_KEY, AGENT_NAME); engine.setWorkDir(workDir.toPath()); engine.startEngine(true); - + WorkDirManager mgr = WorkDirManager.getInstance(); File workDirLoc = mgr.getLocation(WorkDirManager.DirType.WORK_DIR); - assertThat("The initialized work directory should equal to the one passed in parameters", + assertThat("The initialized work directory should equal to the one passed in parameters", workDirLoc, equalTo(workDir)); assertTrue("The work directory should exist", workDir.exists()); } - + @Test public void shouldUseCustomCacheDirIfRequired() throws Exception { File workDir = new File(tmpDir.getRoot(), "workDir"); @@ -113,7 +124,7 @@ public void shouldUseCustomCacheDirIfRequired() throws Exception { engine.setWorkDir(workDir.toPath()); engine.setJarCache(new FileSystemJarCache(jarCache, true)); engine.startEngine(true); - + WorkDirManager mgr = WorkDirManager.getInstance(); File location = mgr.getLocation(WorkDirManager.DirType.JAR_CACHE_DIR); assertThat("WorkDir manager should not be aware about external JAR cache location", location, nullValue()); @@ -153,6 +164,6 @@ public void onDisconnect() { public void onReconnect() { // Do nothing } - + } }