diff --git a/driver-core/src/main/com/mongodb/MongoInterruptedException.java b/driver-core/src/main/com/mongodb/MongoInterruptedException.java index e1aa7d79447..e0adce7978c 100644 --- a/driver-core/src/main/com/mongodb/MongoInterruptedException.java +++ b/driver-core/src/main/com/mongodb/MongoInterruptedException.java @@ -18,11 +18,32 @@ import com.mongodb.lang.Nullable; +import java.io.InputStream; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.net.Socket; +import java.net.SocketAddress; +import java.net.SocketException; +import java.nio.channels.ClosedByInterruptException; +import java.nio.channels.InterruptibleChannel; + /** - * A non-checked exception indicating that the driver has been interrupted by a call to Thread.interrupt. + * A driver-specific non-checked counterpart to {@link InterruptedException}. + * Before this exception is thrown, the {@linkplain Thread#isInterrupted() interrupt status} of the thread will have been set + * unless the {@linkplain #getCause() cause} is {@link InterruptedIOException}, in which case the driver leaves the status as is. + *

+ * The Java SE API uses exceptions different from {@link InterruptedException} to communicate the same information:

+ * + * The driver strives to wrap those in {@link MongoInterruptedException} where relevant. * * @see Thread#interrupt() - * @see InterruptedException */ public class MongoInterruptedException extends MongoException { private static final long serialVersionUID = -4110417867718417860L; diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index 4ec395fbf7b..9a21e85a41b 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -19,7 +19,6 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoException; import com.mongodb.MongoInternalException; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoSocketException; import com.mongodb.MongoSocketOpenException; import com.mongodb.MongoSocketReadTimeoutException; @@ -71,6 +70,7 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification; import static com.mongodb.internal.connection.SslHelper.enableSni; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.Optional.ofNullable; import static java.util.concurrent.TimeUnit.MILLISECONDS; @@ -436,7 +436,7 @@ private void addSslHandler(final SocketChannel channel) { private class InboundBufferHandler extends SimpleChannelInboundHandler { @Override - protected void channelRead0(final ChannelHandlerContext ctx, final io.netty.buffer.ByteBuf buffer) throws Exception { + protected void channelRead0(final ChannelHandlerContext ctx, final io.netty.buffer.ByteBuf buffer) { handleReadResponse(buffer, null); } @@ -499,7 +499,7 @@ public T get() throws IOException { } return t; } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted", e); + throw interruptAndCreateMongoInterruptedException("Interrupted", e); } } } diff --git a/driver-core/src/main/com/mongodb/internal/Locks.java b/driver-core/src/main/com/mongodb/internal/Locks.java index 25fbe199966..8e5b9fa997e 100644 --- a/driver-core/src/main/com/mongodb/internal/Locks.java +++ b/driver-core/src/main/com/mongodb/internal/Locks.java @@ -16,11 +16,11 @@ package com.mongodb.internal; -import com.mongodb.MongoInterruptedException; - import java.util.concurrent.locks.Lock; import java.util.function.Supplier; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; + /** *

This class is not part of the public API and may be removed or changed at any time

*/ @@ -45,8 +45,7 @@ public static V checkedWithLock(final Lock lock, final lock.unlock(); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MongoInterruptedException("Interrupted waiting for lock", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for lock", e); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java index 5a847659e1f..561979f5ad7 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java @@ -18,7 +18,6 @@ import com.mongodb.MongoException; import com.mongodb.MongoInternalException; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoSocketReadException; import com.mongodb.MongoSocketReadTimeoutException; import com.mongodb.ServerAddress; @@ -38,6 +37,7 @@ import java.util.concurrent.atomic.AtomicReference; import static com.mongodb.assertions.Assertions.assertTrue; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** @@ -317,7 +317,7 @@ private T get(final String prefix) throws IOException { try { latch.await(); } catch (InterruptedException e) { - throw new MongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e); + throw interruptAndCreateMongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e); } if (error != null) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 1510cf50359..317b83b8b8f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -18,7 +18,6 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoIncompatibleDriverException; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterDescription; @@ -60,6 +59,7 @@ import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Comparator.comparingInt; @@ -142,7 +142,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera } } catch (InterruptedException e) { - throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e); + throw interruptAndCreateMongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e); } } @@ -211,7 +211,7 @@ public ClusterDescription getDescription() { } return curDescription; } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted while waiting to connect", e); + throw interruptAndCreateMongoInterruptedException("Interrupted while waiting to connect", e); } } @@ -516,7 +516,7 @@ public void run() { try { currentPhase.await(waitTimeNanos, NANOSECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException closed) { // The cluster has been closed and the while loop will exit. } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java index 35661561704..a7ff1b070b6 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java @@ -43,6 +43,7 @@ import static com.mongodb.assertions.Assertions.assertTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; /** * A concurrent pool implementation. @@ -411,7 +412,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter return false; } } catch (InterruptedException e) { - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } finally { waitersEstimate.decrementAndGet(); } @@ -518,19 +519,20 @@ static void lockInterruptibly(final Lock lock) throws MongoInterruptedException try { lock.lockInterruptibly(); } catch (InterruptedException e) { - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } } private static void lockInterruptiblyUnfair(final ReentrantLock lock) throws MongoInterruptedException { - throwIfInterrupted(); + if (Thread.currentThread().isInterrupted()) { + throw interruptAndCreateMongoInterruptedException(null, null); + } // `ReentrantLock.tryLock` is unfair if (!lock.tryLock()) { try { lock.lockInterruptibly(); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } } } @@ -541,10 +543,4 @@ static void lockUnfair(final ReentrantLock lock) { lock.lock(); } } - - private static void throwIfInterrupted() throws MongoInterruptedException { - if (Thread.currentThread().isInterrupted()) { - throw new MongoInterruptedException(null, null); - } - } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 3f76ccf307c..20a0b61324a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -119,6 +119,7 @@ import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVICE_ID; import static com.mongodb.internal.logging.LogMessage.Entry.Name.WAIT_QUEUE_TIMEOUT_MS; import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -1178,7 +1179,7 @@ private long awaitNanos(final Condition condition, final long timeoutNanos) thro return Math.max(0, condition.awaitNanos(timeoutNanos)); } } catch (InterruptedException e) { - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultDnsSrvRecordMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultDnsSrvRecordMonitor.java index fddb24ad02b..d535cb0aeca 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultDnsSrvRecordMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultDnsSrvRecordMonitor.java @@ -106,7 +106,7 @@ public void run() { try { Thread.sleep(getRescanFrequencyMillis()); - } catch (InterruptedException e) { + } catch (InterruptedException closed) { // fall through } clusterType = dnsSrvRecordInitializer.getClusterType(); @@ -130,4 +130,3 @@ private Set createServerAddressSet(final List resolvedHos } } } - diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index cc0bb8a35c8..0f953201365 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -180,8 +180,8 @@ public void run() { } waitForNext(); } - } catch (MongoInterruptedException e) { - // ignore + } catch (InterruptedException | MongoInterruptedException closed) { + // stop the monitor } catch (RuntimeException e) { LOGGER.error(format("Server monitor for %s exiting with exception", serverId), e); } finally { @@ -285,21 +285,17 @@ private void logStateChange(final ServerDescription previousServerDescription, } } - private void waitForNext() { - try { - long timeRemaining = waitForSignalOrTimeout(); - if (timeRemaining > 0) { - long timeWaiting = serverSettings.getHeartbeatFrequency(NANOSECONDS) - timeRemaining; - long minimumNanosToWait = serverSettings.getMinHeartbeatFrequency(NANOSECONDS); - if (timeWaiting < minimumNanosToWait) { - long millisToSleep = MILLISECONDS.convert(minimumNanosToWait - timeWaiting, NANOSECONDS); - if (millisToSleep > 0) { - Thread.sleep(millisToSleep); - } + private void waitForNext() throws InterruptedException { + long timeRemaining = waitForSignalOrTimeout(); + if (timeRemaining > 0) { + long timeWaiting = serverSettings.getHeartbeatFrequency(NANOSECONDS) - timeRemaining; + long minimumNanosToWait = serverSettings.getMinHeartbeatFrequency(NANOSECONDS); + if (timeWaiting < minimumNanosToWait) { + long millisToSleep = MILLISECONDS.convert(minimumNanosToWait - timeWaiting, NANOSECONDS); + if (millisToSleep > 0) { + Thread.sleep(millisToSleep); } } - } catch (InterruptedException e) { - // fall through } } @@ -429,6 +425,8 @@ public void run() { } waitForNext(); } + } catch (InterruptedException closed) { + // stop the monitor } finally { if (connection != null) { connection.close(); @@ -453,12 +451,8 @@ private void pingServer(final InternalConnection connection) { } } - private void waitForNext() { - try { - Thread.sleep(serverSettings.getHeartbeatFrequency(MILLISECONDS)); - } catch (InterruptedException e) { - // fall through - } + private void waitForNext() throws InterruptedException { + Thread.sleep(serverSettings.getHeartbeatFrequency(MILLISECONDS)); } private String getHandshakeCommandName(final ServerDescription serverDescription) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/FutureAsyncCompletionHandler.java b/driver-core/src/main/com/mongodb/internal/connection/FutureAsyncCompletionHandler.java index 4e8d3f751f4..2a9cc5af9c3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/FutureAsyncCompletionHandler.java +++ b/driver-core/src/main/com/mongodb/internal/connection/FutureAsyncCompletionHandler.java @@ -18,13 +18,14 @@ import com.mongodb.MongoException; import com.mongodb.MongoInternalException; -import com.mongodb.MongoInterruptedException; import com.mongodb.connection.AsyncCompletionHandler; import com.mongodb.lang.Nullable; import java.io.IOException; import java.util.concurrent.CountDownLatch; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; + class FutureAsyncCompletionHandler implements AsyncCompletionHandler { private final CountDownLatch latch = new CountDownLatch(1); private volatile T result; @@ -58,7 +59,7 @@ private T get(final String prefix) throws IOException { try { latch.await(); } catch (InterruptedException e) { - throw new MongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e); + throw interruptAndCreateMongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e); } if (error != null) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 64e85a6d337..1f9cefcb125 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -59,15 +59,13 @@ import org.bson.io.ByteBufferBsonInput; import java.io.IOException; -import java.io.InterruptedIOException; -import java.net.SocketException; import java.net.SocketTimeoutException; -import java.nio.channels.ClosedByInterruptException; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; @@ -89,6 +87,7 @@ import static com.mongodb.internal.connection.ProtocolHelper.getSnapshotTimestamp; import static com.mongodb.internal.connection.ProtocolHelper.isCommandOk; import static com.mongodb.internal.logging.LogMessage.Level.DEBUG; +import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException; import static java.util.Arrays.asList; /** @@ -707,9 +706,9 @@ private MongoException translateWriteException(final Throwable e) { if (e instanceof MongoException) { return (MongoException) e; } - MongoInterruptedException interruptedException = translateInterruptedExceptions(e, "Interrupted while sending message"); - if (interruptedException != null) { - return interruptedException; + Optional interruptedException = translateInterruptedException(e, "Interrupted while sending message"); + if (interruptedException.isPresent()) { + return interruptedException.get(); } else if (e instanceof IOException) { return new MongoSocketWriteException("Exception sending message", getServerAddress(), e); } else { @@ -721,9 +720,9 @@ private MongoException translateReadException(final Throwable e) { if (e instanceof MongoException) { return (MongoException) e; } - MongoInterruptedException interruptedException = translateInterruptedExceptions(e, "Interrupted while receiving message"); - if (interruptedException != null) { - return interruptedException; + Optional interruptedException = translateInterruptedException(e, "Interrupted while receiving message"); + if (interruptedException.isPresent()) { + return interruptedException.get(); } else if (e instanceof SocketTimeoutException) { return new MongoSocketReadTimeoutException("Timeout while receiving message", getServerAddress(), e); } else if (e instanceof IOException) { @@ -735,37 +734,6 @@ private MongoException translateReadException(final Throwable e) { } } - /** - * @return {@code null} iff {@code e} does not communicate an interrupt. - */ - @Nullable - private static MongoInterruptedException translateInterruptedExceptions(final Throwable e, final String message) { - if (e instanceof InterruptedException) { - // The interrupted status is cleared before throwing `InterruptedException`, - // we are not propagating `InterruptedException`, and we do not own the current thread, - // which means we must reinstate the interrupted status. - Thread.currentThread().interrupt(); - return new MongoInterruptedException(message, (InterruptedException) e); - } else if ( - // `InterruptedIOException` is weirdly documented, and almost seems to be a relic abandoned by the Java SE APIs: - // - `SocketTimeoutException` is `InterruptedIOException`, - // but it is not related to the Java SE interrupt mechanism. As a side note, it does not happen when writing. - // - Java SE methods, where IO may indeed be interrupted via the Java SE interrupt mechanism, - // use different exceptions, like `ClosedByInterruptException` or even `SocketException`. - (e instanceof InterruptedIOException && !(e instanceof SocketTimeoutException)) - // see `java.nio.channels.InterruptibleChannel` and `java.net.Socket.getOutputStream`/`getInputStream` - || e instanceof ClosedByInterruptException - // see `java.net.Socket.getOutputStream`/`getInputStream` - || (e instanceof SocketException && Thread.currentThread().isInterrupted())) { - // The interrupted status is not cleared before throwing `ClosedByInterruptException`/`SocketException`, - // so we do not need to reinstate it. - // `InterruptedIOException` does not specify how it behaves with regard to the interrupted status, so we do nothing. - return new MongoInterruptedException(message, (Exception) e); - } else { - return null; - } - } - private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) throws IOException { ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, additionalTimeout); MessageHeader messageHeader; diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 5bb7270ad9f..11266793e95 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -18,7 +18,6 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoException; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.annotations.ThreadSafe; @@ -58,6 +57,7 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; @@ -231,8 +231,7 @@ private void waitForSrv() { remainingTimeNanos = condition.awaitNanos(remainingTimeNanos); } } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MongoInterruptedException(format("Interrupted while resolving SRV records for %s", settings.getSrvHost()), e); + throw interruptAndCreateMongoInterruptedException(format("Interrupted while resolving SRV records for %s", settings.getSrvHost()), e); } finally { lock.unlock(); } @@ -380,7 +379,7 @@ public void run() { try { //noinspection ResultOfMethodCallIgnored condition.await(waitTimeNanos, NANOSECONDS); - } catch (InterruptedException e) { + } catch (InterruptedException unexpected) { fail(); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java index ccc82f97eaf..6ba89b5157a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java +++ b/driver-core/src/main/com/mongodb/internal/connection/tlschannel/async/AsynchronousTlsChannelGroup.java @@ -56,6 +56,7 @@ import java.util.function.Consumer; import java.util.function.LongConsumer; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; /** @@ -383,7 +384,7 @@ private void waitForSocketRegistration(RegisteredSocket socket) { try { socket.registered.await(); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw interruptAndCreateMongoInterruptedException(null, e); } } diff --git a/driver-core/src/main/com/mongodb/internal/thread/InterruptionUtil.java b/driver-core/src/main/com/mongodb/internal/thread/InterruptionUtil.java new file mode 100644 index 00000000000..54a3ba31f24 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/thread/InterruptionUtil.java @@ -0,0 +1,77 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.thread; + +import com.mongodb.MongoInterruptedException; +import com.mongodb.lang.Nullable; + +import java.io.InterruptedIOException; +import java.net.SocketException; +import java.net.SocketTimeoutException; +import java.nio.channels.ClosedByInterruptException; +import java.util.Optional; + +/** + *

This class is not part of the public API and may be removed or changed at any time

+ */ +public final class InterruptionUtil { + /** + * {@linkplain Thread#interrupt() Interrupts} the {@linkplain Thread#currentThread() current thread} + * before creating {@linkplain MongoInterruptedException}. + * We do this because the interrupt status is cleared before throwing {@link InterruptedException}, + * we are not propagating {@link InterruptedException}, which means we must reinstate the interrupt status. + * This matches the behavior documented by {@link MongoInterruptedException}. + */ + public static MongoInterruptedException interruptAndCreateMongoInterruptedException( + @Nullable final String msg, @Nullable final InterruptedException cause) { + Thread.currentThread().interrupt(); + return new MongoInterruptedException(msg, cause); + } + + /** + * If {@code e} is {@link InterruptedException}, then {@link #interruptAndCreateMongoInterruptedException(String, InterruptedException)} + * is used. + * + * @return {@link Optional#empty()} iff {@code e} does not communicate an interrupt. + */ + public static Optional translateInterruptedException( + @Nullable final Throwable e, @Nullable final String message) { + if (e instanceof InterruptedException) { + return Optional.of(interruptAndCreateMongoInterruptedException(message, (InterruptedException) e)); + } else if ( + // `InterruptedIOException` is weirdly documented, and almost seems to be a relic abandoned by the Java SE APIs: + // - `SocketTimeoutException` is `InterruptedIOException`, + // but it is not related to the Java SE interrupt mechanism. As a side note, it does not happen when writing. + // - Java SE methods, where IO may indeed be interrupted via the Java SE interrupt mechanism, + // use different exceptions, like `ClosedByInterruptException` or even `SocketException`. + (e instanceof InterruptedIOException && !(e instanceof SocketTimeoutException)) + // see `java.nio.channels.InterruptibleChannel` + // and `java.net.Socket.connect`, `java.net.Socket.getOutputStream`/`getInputStream` + || e instanceof ClosedByInterruptException + // see `java.net.Socket.connect`, `java.net.Socket.getOutputStream`/`getInputStream` + || (e instanceof SocketException && Thread.currentThread().isInterrupted())) { + // The interrupted status is not cleared before throwing `ClosedByInterruptException`/`SocketException`, + // so we do not need to reinstate it. + // `InterruptedIOException` does not specify how it behaves with regard to the interrupted status, so we do nothing. + return Optional.of(new MongoInterruptedException(message, (Exception) e)); + } else { + return Optional.empty(); + } + } + + private InterruptionUtil() { + } +} diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 85b4a9cfeac..65b3c8c57de 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -87,6 +87,7 @@ import static com.mongodb.connection.ClusterType.STANDALONE; import static com.mongodb.internal.connection.ClusterDescriptionHelper.getPrimaries; import static com.mongodb.internal.connection.ClusterDescriptionHelper.getSecondaries; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.lang.Thread.sleep; import static java.util.Arrays.asList; @@ -718,7 +719,7 @@ public static int getReferenceCountAfterTimeout(final ReferenceCounted reference sleep(10); count = referenceCounted.getCount(); } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted", e); + throw interruptAndCreateMongoInterruptedException("Interrupted", e); } } return count; diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java b/driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java index 1e07bdb5b44..ecbf4befb73 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/ServerHelper.java @@ -17,7 +17,6 @@ package com.mongodb.internal.connection; import com.mongodb.ClusterFixture; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.connection.ServerDescription; @@ -27,6 +26,7 @@ import static com.mongodb.ClusterFixture.getAsyncCluster; import static com.mongodb.ClusterFixture.getCluster; import static com.mongodb.assertions.Assertions.fail; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.Thread.sleep; public final class ServerHelper { @@ -56,7 +56,7 @@ public static void waitForLastRelease(final ServerAddress address, final Cluster + pool.getInUseCount()); } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted", e); + throw interruptAndCreateMongoInterruptedException("Interrupted", e); } } } @@ -90,7 +90,7 @@ public static void waitForRelease(final AsyncConnectionSource connectionSource, throw new MongoTimeoutException("Timed out waiting for ConnectionSource count to drop to " + expectedCount); } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted", e); + throw interruptAndCreateMongoInterruptedException("Interrupted", e); } } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index fcdbeccc420..67c51686706 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -16,7 +16,6 @@ package com.mongodb.internal.connection; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandFailedEvent; @@ -46,6 +45,7 @@ import static com.mongodb.ClusterFixture.TIMEOUT; import static com.mongodb.internal.connection.InternalStreamConnection.getSecuritySensitiveCommands; import static com.mongodb.internal.connection.InternalStreamConnection.getSecuritySensitiveHelloCommands; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.util.Collections.emptyList; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; @@ -174,7 +174,7 @@ public List waitForStartedEvents(final int numEvents) { throw new MongoTimeoutException("Timeout waiting for event"); } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted waiting for event", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e); } } return getCommandStartedEvents(numEvents); @@ -192,7 +192,7 @@ public void waitForFirstCommandCompletion() { throw new MongoTimeoutException("Timeout waiting for event"); } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted waiting for event", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for event", e); } } } finally { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java index a596b637735..6a3a54b91fd 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractConnectionPoolTest.java @@ -20,7 +20,6 @@ import com.mongodb.JsonTestServerVersionChecker; import com.mongodb.LoggerSettings; import com.mongodb.MongoDriverInformation; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterConnectionMode; @@ -82,6 +81,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -522,8 +522,7 @@ public static void waitForPoolAsyncWorkManagerStart() { try { Thread.sleep(500); } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } } diff --git a/driver-legacy/src/main/com/mongodb/client/jndi/MongoClientFactory.java b/driver-legacy/src/main/com/mongodb/client/jndi/MongoClientFactory.java index 049bb08b422..d287865b8c0 100644 --- a/driver-legacy/src/main/com/mongodb/client/jndi/MongoClientFactory.java +++ b/driver-legacy/src/main/com/mongodb/client/jndi/MongoClientFactory.java @@ -63,8 +63,7 @@ public class MongoClientFactory implements ObjectFactory { * Note: Not all options that can be specified via {@link com.mongodb.MongoClientOptions} can be specified via the connection string. */ @Override - public Object getObjectInstance(final Object obj, final Name name, final Context nameCtx, final Hashtable environment) - throws Exception { + public Object getObjectInstance(final Object obj, final Name name, final Context nameCtx, final Hashtable environment) { // Some app servers, e.g. Wildfly, use the environment to pass location information to an ObjectFactory String connectionString = null; diff --git a/driver-reactive-streams/src/examples/reactivestreams/helpers/SubscriberHelpers.java b/driver-reactive-streams/src/examples/reactivestreams/helpers/SubscriberHelpers.java index 0013b35fc21..29c6e9f3735 100644 --- a/driver-reactive-streams/src/examples/reactivestreams/helpers/SubscriberHelpers.java +++ b/driver-reactive-streams/src/examples/reactivestreams/helpers/SubscriberHelpers.java @@ -33,7 +33,6 @@ package reactivestreams.helpers; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import org.bson.Document; import org.reactivestreams.Subscriber; @@ -45,6 +44,8 @@ import java.util.concurrent.TimeUnit; import java.util.function.Consumer; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; + /** * Subscriber helper implementations for the Quick Tour. */ @@ -181,7 +182,7 @@ public ObservableSubscriber await(final long timeout, final TimeUnit unit) { throw new MongoTimeoutException("Publisher onComplete timed out"); } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted waiting for observeration", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for observeration", e); } if (!errors.isEmpty()) { throw errors.get(0); diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/Fixture.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/Fixture.java index a184dfa80c1..5291ad0cc05 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/Fixture.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/Fixture.java @@ -20,7 +20,6 @@ import com.mongodb.ConnectionString; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoNamespace; import com.mongodb.MongoTimeoutException; import com.mongodb.connection.AsynchronousSocketChannelStreamFactoryFactory; @@ -39,6 +38,7 @@ import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; import static com.mongodb.ClusterFixture.getServerApi; import static com.mongodb.ClusterFixture.getSslSettings; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.Thread.sleep; /** @@ -141,7 +141,7 @@ public static synchronized void waitForLastServerSessionPoolRelease() { sleep(10); sessionInUseCount = getSessionInUseCount(); } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted", e); + throw interruptAndCreateMongoInterruptedException("Interrupted", e); } } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java index 9fff5db302f..36aff9506ed 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncClientSession.java @@ -17,7 +17,6 @@ package com.mongodb.reactivestreams.client.syncadapter; import com.mongodb.ClientSessionOptions; -import com.mongodb.MongoInterruptedException; import com.mongodb.ServerAddress; import com.mongodb.TransactionOptions; import com.mongodb.client.ClientSession; @@ -29,6 +28,7 @@ import reactor.core.publisher.Mono; import static com.mongodb.ClusterFixture.TIMEOUT_DURATION; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterSessionClose; @@ -186,7 +186,7 @@ private static void sleep(final long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { - throw new MongoInterruptedException(null, e); + throw interruptAndCreateMongoInterruptedException(null, e); } } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java index 6c3bb44b240..c21cbc0e9f0 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java @@ -16,7 +16,6 @@ package com.mongodb.reactivestreams.client.syncadapter; -import com.mongodb.MongoInterruptedException; import com.mongodb.MongoTimeoutException; import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; @@ -34,6 +33,7 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.ClusterFixture.TIMEOUT; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose; import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen; @@ -85,7 +85,7 @@ public void onComplete() { } sleep(getSleepAfterCursorOpen()); } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e); } } @@ -99,7 +99,7 @@ private static void sleep(final long millis) { try { Thread.sleep(millis); } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted from nap", e); + throw interruptAndCreateMongoInterruptedException("Interrupted from nap", e); } } @@ -136,7 +136,7 @@ public boolean hasNext() { return true; } } catch (InterruptedException e) { - throw new MongoInterruptedException("Interrupted waiting for next result", e); + throw interruptAndCreateMongoInterruptedException("Interrupted waiting for next result", e); } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java index 1dba06b38df..4b7dc8d9310 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -18,7 +18,6 @@ import com.mongodb.ClusterFixture; import com.mongodb.MongoClientSettings; -import com.mongodb.MongoInterruptedException; import com.mongodb.event.ConnectionPoolClearedEvent; import com.mongodb.event.ConnectionPoolListener; import com.mongodb.event.ConnectionPoolReadyEvent; @@ -56,6 +55,7 @@ import static com.mongodb.ClusterFixture.serverVersionAtLeast; import static com.mongodb.client.Fixture.getDefaultDatabaseName; import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException; import static java.lang.String.format; import static java.util.Arrays.asList; import static java.util.Collections.singleton; @@ -313,7 +313,7 @@ private static void put(final BlockingQueue q, final E e) { try { q.put(e); } catch (InterruptedException t) { - throw new MongoInterruptedException(null, t); + throw interruptAndCreateMongoInterruptedException(null, t); } } }