Skip to content

Commit

Permalink
Proposal to implement a more efficient webserver shutdown strategy (#…
Browse files Browse the repository at this point in the history
…5876)

* Proposal to implement a more efficient webserver shutdown strategy. For more information see issue #5717. This approach is based on that described in PR #5748. It includes a simple executor SPI for tasks that are immediately interruptable, such as those waiting to read connection preambles. Some new tests that verify faster webserver shutdown.

* Updated copyright year.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Removed print statements and fixed logging.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Avoid using @servertest when stopping server manually.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Dropped SPI for tasks, moved interfaces into webserver package.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Fixed checkstyle.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

* Make interface package private.

Signed-off-by: Santiago Pericasgeertsen <[email protected]>

---------

Signed-off-by: Santiago Pericasgeertsen <[email protected]>
  • Loading branch information
spericas authored Jan 27, 2023
1 parent 80abd33 commit 64d5f73
Show file tree
Hide file tree
Showing 10 changed files with 626 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2022, 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -67,4 +67,16 @@ public static <T> T assertThatWithRetry(String reason, Supplier<T> actualSupplie

throw new AssertionError(description.toString());
}

/**
* Checks the matcher, possibly multiple times after configured delays, invoking the supplier of the matched value each time,
* until either the matcher passes or the maximum retry expires.
* @param actualSupplier {@code Supplier} that furnishes the value to submit to the matcher
* @param matcher Hamcrest matcher which evaluates the supplied value
* @return the supplied value
* @param <T> type of the supplied value
*/
public static <T> T assertThatWithRetry(Supplier<T> actualSupplier, Matcher<? super T> matcher) {
return assertThatWithRetry("", actualSupplier, matcher);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.tests.integration.server;

import io.helidon.common.http.Http;
import io.helidon.nima.webclient.http1.Http1Client;
import io.helidon.nima.webserver.WebServer;
import org.junit.jupiter.api.Test;

import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;

class WebServerStopIdleTest {

@Test
void stopWhenIdleExpectTimelyStop() {
WebServer webServer = WebServer.builder()
.routing(router -> router.get("ok", (req, res) -> res.send("ok")))
.build();
webServer.start();

Http1Client client = Http1Client.builder()
.baseUri("http://localhost:" + webServer.port())
.build();
try (var response = client.get("/ok").request()) {
assertThat(response.status(), is(Http.Status.OK_200));
assertThat(response.entity().as(String.class), is("ok"));
}

long startMillis = System.currentTimeMillis();
webServer.stop();
int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis);
assertThat(stopExecutionTimeInMillis, is(lessThan(500)));
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2022 Oracle and/or its affiliates.
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,25 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.helidon.nima.webserver;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
package io.helidon.nima.tests.integration.server;

import io.helidon.nima.webserver.WebServer;
import org.junit.jupiter.api.Test;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThan;

class WebServerStopOnlyTest {

class ExecutorShutdownTest {
@Test
void testShutdown() {
try (ExecutorService executor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(false)
.factory())) {
ServerListener.shutdownExecutor(executor);
assertThat(executor.isShutdown(), is(true));
}
void stoWhenNoRequestsExpectTimelyStop() {
WebServer webServer = WebServer.builder()
.routing(router -> router.get("ok", (req, res) -> res.send("ok")))
.build();
webServer.start();

long startMillis = System.currentTimeMillis();
webServer.stop();
int stopExecutionTimeInMillis = (int) (System.currentTimeMillis() - startMillis);
assertThat(stopExecutionTimeInMillis, is(lessThan(500)));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
* Representation of a single channel between client and server.
* Everything in this class runs in the channel reader virtual thread
*/
class ConnectionHandler implements Runnable {
class ConnectionHandler implements InterruptableTask<Void> {
private static final System.Logger LOGGER = System.getLogger(ConnectionHandler.class.getName());

private final ConnectionProviders connectionProviders;
Expand Down Expand Up @@ -82,6 +82,11 @@ class ConnectionHandler implements Runnable {
maxPayloadSize);
}

@Override
public boolean canInterrupt() {
return connection instanceof InterruptableTask<?> task && task.canInterrupt();
}

@Override
public final void run() {
Thread.currentThread().setName("[" + socket.socketId() + " " + socket.childSocketId() + "] Nima socket");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.webserver;

import java.io.Closeable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

/**
* A simplified {@link java.util.concurrent.ExecutorService} that can execute
* {@link InterruptableTask}s and can be efficiently terminated. A thread that is
* waiting to read on an open connection cannot be efficiently stopped. This
* executor will query the thread and interrupt it if possible. It is important
* to efficiently shut down the Nima webserver in certain environments.
*/
interface HelidonTaskExecutor extends Closeable {

/**
* Executes a task.
*
* @param task an interruptable task
* @param <T> type ov value returned by task
* @return a future for a value returned by the task
*/
<T> Future<T> execute(InterruptableTask<T> task);

/**
* Verifies if the executor is terminated.
*
* @return outcome of test
*/
boolean isTerminated();

/**
* Terminate executor waiting for any running task to complete for a specified
* timeout period. It will only wait for those {@link InterruptableTask}s that
* are not interruptable.
*
* @param timeout timeout period
* @param unit timeout period unit
* @return outcome of shutdown process
*/
boolean terminate(long timeout, TimeUnit unit);

/**
* Force termination by forcefully interrupting all tasks. Shall only be called
* if {@link #terminate} returns {@code false}.
*/
void forceTerminate();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright (c) 2023 Oracle and/or its affiliates.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.helidon.nima.webserver;

import java.util.concurrent.Callable;

/**
* An interruptable task that can implements both {@link Runnable} and
* {@link Callable}.
*
* @param <T> type of value returned by task
*/
public interface InterruptableTask<T> extends Runnable, Callable<T> {

/**
* Signals if a task can be interrupted at the time this method is called.
*
* @return outcome of interruptable test
*/
boolean canInterrupt();

@Override
default void run() {
try {
call();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
default T call() throws Exception {
run();
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import io.helidon.nima.webserver.http.DirectHandlers;
import io.helidon.nima.webserver.spi.ServerConnectionSelector;

import static java.lang.System.Logger.Level.DEBUG;
import static java.lang.System.Logger.Level.ERROR;
import static java.lang.System.Logger.Level.INFO;
import static java.lang.System.Logger.Level.TRACE;
Expand All @@ -57,7 +58,7 @@ class ServerListener {
private final String socketName;
private final ListenerConfiguration listenerConfig;
private final Router router;
private final ExecutorService readerExecutor;
private final HelidonTaskExecutor readerExecutor;
private final ExecutorService sharedExecutor;
private final Thread serverThread;
private final DirectHandlers simpleHandlers;
Expand Down Expand Up @@ -93,7 +94,7 @@ class ServerListener {
.name("server-" + socketName + "-listener")
.unstarted(this::listen);
this.simpleHandlers = simpleHandlers;
this.readerExecutor = Executors.newThreadPerTaskExecutor(Thread.ofVirtual()
this.readerExecutor = ThreadPerTaskExecutor.create(Thread.ofVirtual()
.allowSetThreadLocals(true)
.inheritInheritableThreadLocals(inheritThreadLocals)
.factory());
Expand Down Expand Up @@ -131,11 +132,30 @@ void stop() {
}
running = false;
try {
// Attempt to wait until existing channels finish execution
shutdownExecutor(readerExecutor);
shutdownExecutor(sharedExecutor);

// Stop listening for connections
serverSocket.close();

// Shutdown reader executor
readerExecutor.terminate(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!readerExecutor.isTerminated()) {
LOGGER.log(DEBUG, "Some tasks in reader executor did not terminate gracefully");
readerExecutor.forceTerminate();
}

// Shutdown shared executor
try {
sharedExecutor.shutdown();
boolean done = sharedExecutor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!done) {
List<Runnable> running = sharedExecutor.shutdownNow();
if (!running.isEmpty()) {
LOGGER.log(DEBUG, running.size() + " tasks in shared executor did not terminate gracefully");
}
}
} catch (InterruptedException e) {
// falls through
}

} catch (IOException e) {
LOGGER.log(INFO, "Exception thrown on socket close", e);
}
Expand Down Expand Up @@ -252,7 +272,7 @@ private void listen() {
listenerConfig.maxPayloadSize(),
simpleHandlers);

readerExecutor.submit(handler);
readerExecutor.execute(handler);
} catch (RejectedExecutionException e) {
LOGGER.log(ERROR, "Executor rejected handler for new connection");
} catch (Exception e) {
Expand All @@ -277,23 +297,4 @@ private void listen() {
LOGGER.log(INFO, String.format("[%s] %s socket closed.", serverChannelId, socketName));
closeFuture.complete(null);
}

/**
* Shutdown an executor by waiting for a period of time.
*
* @param executor executor to shut down
*/
static void shutdownExecutor(ExecutorService executor) {
try {
boolean terminate = executor.awaitTermination(EXECUTOR_SHUTDOWN_MILLIS, TimeUnit.MILLISECONDS);
if (!terminate) {
List<Runnable> running = executor.shutdownNow();
if (!running.isEmpty()) {
LOGGER.log(INFO, running.size() + " channel tasks did not terminate gracefully");
}
}
} catch (InterruptedException e) {
LOGGER.log(INFO, "InterruptedException caught while shutting down channel tasks");
}
}
}
Loading

0 comments on commit 64d5f73

Please sign in to comment.