Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix how InterruptedExceptions are handled #1192

Merged
merged 4 commits into from
Sep 12, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions driver-core/src/main/com/mongodb/MongoInterruptedException.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,32 @@

import com.mongodb.lang.Nullable;

import java.io.InputStream;
import java.io.InterruptedIOException;
import java.io.OutputStream;
import java.net.Socket;
import java.net.SocketAddress;
import java.net.SocketException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.InterruptibleChannel;

/**
* A non-checked exception indicating that the driver has been interrupted by a call to Thread.interrupt.
* A driver-specific non-checked counterpart to {@link InterruptedException}.
* Before this exception is thrown, the {@linkplain Thread#isInterrupted() interrupt status} of the thread will have been set
* unless the {@linkplain #getCause() cause} is {@link InterruptedIOException}, in which case the driver leaves the status as is.
* <p>
* The Java SE API uses exceptions different from {@link InterruptedException} to communicate the same information:</p>
* <ul>
* <li>{@link InterruptibleChannel} uses {@link ClosedByInterruptException}.</li>
* <li>{@link Socket#connect(SocketAddress)},
* {@linkplain InputStream}/{@link OutputStream} obtained via {@link Socket#getInputStream()}/{@link Socket#getOutputStream()}
* use either {@link ClosedByInterruptException} or {@link SocketException}.</li>
* <li>There is also {@link InterruptedIOException}, which is documented to an extent as an IO-specific counterpart to
* {@link InterruptedException}.</li>
* </ul>
* The driver strives to wrap those in {@link MongoInterruptedException} where relevant.
*
* @see Thread#interrupt()
* @see InterruptedException
*/
public class MongoInterruptedException extends MongoException {
private static final long serialVersionUID = -4110417867718417860L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoSocketException;
import com.mongodb.MongoSocketOpenException;
import com.mongodb.MongoSocketReadTimeoutException;
Expand Down Expand Up @@ -71,6 +70,7 @@
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification;
import static com.mongodb.internal.connection.SslHelper.enableSni;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.util.Optional.ofNullable;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

Expand Down Expand Up @@ -436,7 +436,7 @@ private void addSslHandler(final SocketChannel channel) {

private class InboundBufferHandler extends SimpleChannelInboundHandler<io.netty.buffer.ByteBuf> {
@Override
protected void channelRead0(final ChannelHandlerContext ctx, final io.netty.buffer.ByteBuf buffer) throws Exception {
protected void channelRead0(final ChannelHandlerContext ctx, final io.netty.buffer.ByteBuf buffer) {
handleReadResponse(buffer, null);
}

Expand Down Expand Up @@ -499,7 +499,7 @@ public T get() throws IOException {
}
return t;
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted", e);
throw interruptAndCreateMongoInterruptedException("Interrupted", e);
}
}
}
Expand Down
7 changes: 3 additions & 4 deletions driver-core/src/main/com/mongodb/internal/Locks.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@

package com.mongodb.internal;

import com.mongodb.MongoInterruptedException;

import java.util.concurrent.locks.Lock;
import java.util.function.Supplier;

import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

/**
* <p>This class is not part of the public API and may be removed or changed at any time</p>
*/
Expand All @@ -45,8 +45,7 @@ public static <V, E extends Exception> V checkedWithLock(final Lock lock, final
lock.unlock();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MongoInterruptedException("Interrupted waiting for lock", e);
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for lock", e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoSocketReadException;
import com.mongodb.MongoSocketReadTimeoutException;
import com.mongodb.ServerAddress;
Expand All @@ -38,6 +37,7 @@
import java.util.concurrent.atomic.AtomicReference;

import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.util.concurrent.TimeUnit.MILLISECONDS;

/**
Expand Down Expand Up @@ -317,7 +317,7 @@ private T get(final String prefix) throws IOException {
try {
latch.await();
} catch (InterruptedException e) {
throw new MongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e);
throw interruptAndCreateMongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e);

}
if (error != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.mongodb.MongoClientException;
import com.mongodb.MongoIncompatibleDriverException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.MongoTimeoutException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterDescription;
Expand Down Expand Up @@ -60,6 +59,7 @@
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.connection.EventHelper.wouldDescriptionsGenerateEquivalentEvents;
import static com.mongodb.internal.event.EventListenerHelper.singleClusterListener;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.lang.String.format;
import static java.util.Arrays.asList;
import static java.util.Comparator.comparingInt;
Expand Down Expand Up @@ -142,7 +142,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera
}

} catch (InterruptedException e) {
throw new MongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e);
throw interruptAndCreateMongoInterruptedException(format("Interrupted while waiting for a server that matches %s", serverSelector), e);
}
}

Expand Down Expand Up @@ -211,7 +211,7 @@ public ClusterDescription getDescription() {
}
return curDescription;
} catch (InterruptedException e) {
throw new MongoInterruptedException("Interrupted while waiting to connect", e);
throw interruptAndCreateMongoInterruptedException("Interrupted while waiting to connect", e);
}
}

Expand Down Expand Up @@ -516,7 +516,7 @@ public void run() {

try {
currentPhase.await(waitTimeNanos, NANOSECONDS);
} catch (InterruptedException e) {
} catch (InterruptedException closed) {
// The cluster has been closed and the while loop will exit.
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import static com.mongodb.assertions.Assertions.assertTrue;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

/**
* A concurrent pool implementation.
Expand Down Expand Up @@ -411,7 +412,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
return false;
}
} catch (InterruptedException e) {
throw new MongoInterruptedException(null, e);
throw interruptAndCreateMongoInterruptedException(null, e);
} finally {
waitersEstimate.decrementAndGet();
}
Expand Down Expand Up @@ -518,19 +519,20 @@ static void lockInterruptibly(final Lock lock) throws MongoInterruptedException
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
throw new MongoInterruptedException(null, e);
throw interruptAndCreateMongoInterruptedException(null, e);
}
}

private static void lockInterruptiblyUnfair(final ReentrantLock lock) throws MongoInterruptedException {
throwIfInterrupted();
if (Thread.currentThread().isInterrupted()) {
throw interruptAndCreateMongoInterruptedException(null, null);
}
// `ReentrantLock.tryLock` is unfair
if (!lock.tryLock()) {
try {
lock.lockInterruptibly();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new MongoInterruptedException(null, e);
throw interruptAndCreateMongoInterruptedException(null, e);
}
}
}
Expand All @@ -541,10 +543,4 @@ static void lockUnfair(final ReentrantLock lock) {
lock.lock();
}
}

private static void throwIfInterrupted() throws MongoInterruptedException {
if (Thread.currentThread().isInterrupted()) {
throw new MongoInterruptedException(null, null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@
import static com.mongodb.internal.logging.LogMessage.Entry.Name.SERVICE_ID;
import static com.mongodb.internal.logging.LogMessage.Entry.Name.WAIT_QUEUE_TIMEOUT_MS;
import static com.mongodb.internal.logging.LogMessage.Level.DEBUG;
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
import static java.lang.String.format;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -1178,7 +1179,7 @@ private long awaitNanos(final Condition condition, final long timeoutNanos) thro
return Math.max(0, condition.awaitNanos(timeoutNanos));
}
} catch (InterruptedException e) {
throw new MongoInterruptedException(null, e);
throw interruptAndCreateMongoInterruptedException(null, e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ public void run() {

try {
Thread.sleep(getRescanFrequencyMillis());
} catch (InterruptedException e) {
} catch (InterruptedException closed) {
// fall through
}
clusterType = dnsSrvRecordInitializer.getClusterType();
Expand All @@ -130,4 +130,3 @@ private Set<ServerAddress> createServerAddressSet(final List<String> resolvedHos
}
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,8 @@ public void run() {
}
waitForNext();
}
} catch (MongoInterruptedException e) {
jyemin marked this conversation as resolved.
Show resolved Hide resolved
// ignore
} catch (InterruptedException | MongoInterruptedException closed) {
// stop the monitor
} catch (RuntimeException e) {
LOGGER.error(format("Server monitor for %s exiting with exception", serverId), e);
} finally {
Expand Down Expand Up @@ -285,21 +285,17 @@ private void logStateChange(final ServerDescription previousServerDescription,
}
}

private void waitForNext() {
try {
long timeRemaining = waitForSignalOrTimeout();
if (timeRemaining > 0) {
long timeWaiting = serverSettings.getHeartbeatFrequency(NANOSECONDS) - timeRemaining;
long minimumNanosToWait = serverSettings.getMinHeartbeatFrequency(NANOSECONDS);
if (timeWaiting < minimumNanosToWait) {
long millisToSleep = MILLISECONDS.convert(minimumNanosToWait - timeWaiting, NANOSECONDS);
if (millisToSleep > 0) {
Thread.sleep(millisToSleep);
}
private void waitForNext() throws InterruptedException {
long timeRemaining = waitForSignalOrTimeout();
if (timeRemaining > 0) {
long timeWaiting = serverSettings.getHeartbeatFrequency(NANOSECONDS) - timeRemaining;
long minimumNanosToWait = serverSettings.getMinHeartbeatFrequency(NANOSECONDS);
if (timeWaiting < minimumNanosToWait) {
long millisToSleep = MILLISECONDS.convert(minimumNanosToWait - timeWaiting, NANOSECONDS);
if (millisToSleep > 0) {
Thread.sleep(millisToSleep);
}
}
} catch (InterruptedException e) {
// fall through
}
}

Expand Down Expand Up @@ -429,6 +425,8 @@ public void run() {
}
waitForNext();
}
} catch (InterruptedException closed) {
// stop the monitor
} finally {
if (connection != null) {
connection.close();
Expand All @@ -453,12 +451,8 @@ private void pingServer(final InternalConnection connection) {
}
}

private void waitForNext() {
try {
Thread.sleep(serverSettings.getHeartbeatFrequency(MILLISECONDS));
} catch (InterruptedException e) {
// fall through
}
private void waitForNext() throws InterruptedException {
Thread.sleep(serverSettings.getHeartbeatFrequency(MILLISECONDS));
}

private String getHandshakeCommandName(final ServerDescription serverDescription) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@

import com.mongodb.MongoException;
import com.mongodb.MongoInternalException;
import com.mongodb.MongoInterruptedException;
import com.mongodb.connection.AsyncCompletionHandler;
import com.mongodb.lang.Nullable;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;

class FutureAsyncCompletionHandler<T> implements AsyncCompletionHandler<T> {
private final CountDownLatch latch = new CountDownLatch(1);
private volatile T result;
Expand Down Expand Up @@ -58,7 +59,7 @@ private T get(final String prefix) throws IOException {
try {
latch.await();
} catch (InterruptedException e) {
throw new MongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e);
throw interruptAndCreateMongoInterruptedException(prefix + " the AsynchronousSocketChannelStream failed", e);

}
if (error != null) {
Expand Down
Loading