From c8b028a509c825c0de30c67fa1eb6a5871d8bea9 Mon Sep 17 00:00:00 2001 From: Jeff Yemin Date: Sat, 15 Feb 2020 22:02:29 -0500 Subject: [PATCH] Introduce streaming ismaster monitoring protocol * DefaultServerMonitor implements the new streaming isMaster protocol, if available, to detect topology changes sooner. This capability was added in MongoDB release 4.4. * This requires the ability to increase the read timeout on a per-read basis. Since the Stream class is public, this has to be done carefully in order to avoid using the streaming protocol with Stream implementations that don't have this ability. All the built-in Stream implementations have had this ability added, so in practice it should not happen unless an application has created its own Stream implementation (unlikely). * Implement the new server discovery and monitoring specification integration tests * Fix some remaining bugs in error handling JAVA-3626 --- config/checkstyle-exclude.xml | 1 + .../main/com/mongodb/connection/Stream.java | 36 ++ .../TlsChannelStreamFactoryFactory.java | 5 + .../mongodb/connection/netty/NettyStream.java | 33 +- .../connection/netty/ReadTimeoutHandler.java | 4 +- .../event/ServerHeartbeatFailedEvent.java | 31 ++ .../event/ServerHeartbeatSucceededEvent.java | 31 ++ .../async/client/AsyncMongoClient.java | 18 + .../async/client/AsyncMongoClientImpl.java | 6 + .../AbstractMultiServerCluster.java | 6 +- .../connection/AsynchronousChannelStream.java | 25 +- .../connection/ClusterableServer.java | 24 +- .../internal/connection/CommandMessage.java | 34 +- .../connection/DefaultConnectionPool.java | 30 ++ .../internal/connection/DefaultServer.java | 82 ++-- .../connection/DefaultServerMonitor.java | 287 ++++++++++---- .../ExponentiallyWeightedMovingAverage.java | 9 +- .../connection/InternalConnection.java | 16 +- .../connection/InternalStreamConnection.java | 82 +++- .../internal/connection/ReplyHeader.java | 7 +- .../internal/connection/ServerMonitor.java | 1 + .../internal/connection/SocketStream.java | 18 + .../UsageTrackingInternalConnection.java | 30 ++ .../com/mongodb/ClusterFixture.java | 15 +- .../mongodb/client/test/CollectionHelper.java | 5 +- .../cancel-server-check.json | 130 +++++++ .../find-network-error.json | 144 +++++++ .../find-shutdown-error.json | 168 ++++++++ .../insert-network-error.json | 156 ++++++++ .../insert-shutdown-error.json | 167 ++++++++ .../isMaster-command-error.json | 245 ++++++++++++ .../isMaster-network-error.json | 225 +++++++++++ .../isMaster-timeout.json | 359 ++++++++++++++++++ .../rediscover-quickly-after-step-down.json | 152 ++++++++ .../README.rst | 265 ++++++++++++- .../rs/error_handling_handshake.json | 112 ------ .../ServerHeartbeatEventSpecification.groovy | 5 +- ...tractServerDiscoveryAndMonitoringTest.java | 7 +- .../CommandMessageSpecification.groovy | 12 +- .../DefaultServerMonitorSpecification.groovy | 12 +- .../DefaultServerSpecification.groovy | 8 +- ...ternalStreamConnectionSpecification.groovy | 38 +- .../connection/TestConnectionPool.java | 15 + .../TestConnectionPoolListener.java | 2 +- .../connection/TestInternalConnection.java | 15 + .../TestInternalConnectionFactory.java | 14 + .../internal/connection/TestServer.java | 16 +- .../connection/TestServerMonitor.java | 4 + .../reactivestreams/client/MongoClient.java | 19 + .../client/internal/MongoClientImpl.java | 6 + .../ServerDiscoveryAndMonitoringTest.java | 37 ++ .../client/syncadapter/SyncMongoClient.java | 2 +- .../scala/org/mongodb/scala/MongoClient.scala | 20 +- .../org/mongodb/scala/MongoClientSpec.scala | 5 +- .../client/AbstractRetryableReadsTest.java | 7 + .../client/AbstractRetryableWritesTest.java | 10 + ...tractServerDiscoveryAndMonitoringTest.java | 61 +++ .../mongodb/client/AbstractUnifiedTest.java | 192 +++++++++- ...erverDiscoveryAndMonitoringProseTests.java | 125 ++++++ .../ServerDiscoveryAndMonitoringTest.java | 34 ++ .../mongodb/client/TestServerListener.java | 96 +++++ 61 files changed, 3355 insertions(+), 366 deletions(-) create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/cancel-server-check.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-network-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-shutdown-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-network-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-shutdown-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-command-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-network-error.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-timeout.json create mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring-integration/rediscover-quickly-after-step-down.json delete mode 100644 driver-core/src/test/resources/server-discovery-and-monitoring/rs/error_handling_handshake.json create mode 100644 driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ServerDiscoveryAndMonitoringTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/AbstractServerDiscoveryAndMonitoringTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringTest.java create mode 100644 driver-sync/src/test/functional/com/mongodb/client/TestServerListener.java diff --git a/config/checkstyle-exclude.xml b/config/checkstyle-exclude.xml index 65c23c517a6..39f4153a935 100644 --- a/config/checkstyle-exclude.xml +++ b/config/checkstyle-exclude.xml @@ -28,6 +28,7 @@ + diff --git a/driver-core/src/main/com/mongodb/connection/Stream.java b/driver-core/src/main/com/mongodb/connection/Stream.java index 10128b8a859..d5297bdd5fb 100644 --- a/driver-core/src/main/com/mongodb/connection/Stream.java +++ b/driver-core/src/main/com/mongodb/connection/Stream.java @@ -60,6 +60,42 @@ public interface Stream extends BufferProvider{ */ ByteBuf read(int numBytes) throws IOException; + /** + * Gets whether this implementation supports specifying an additional timeout for read operations + *

+ * The default is to not support specifying an additional timeout + *

+ * + * @return true if this implementation supports specifying an additional timeouts for reads operations + * @see #read(int, int) + * @since 4.1 + */ + default boolean supportsAdditionalTimeout() { + return false; + } + + /** + * Read from the stream, blocking until the requested number of bytes have been read. If supported by the implementation, + * adds the given additional timeout to the configured timeout for the stream. + *

+ * This method should not be called unless {@link #supportsAdditionalTimeout()} returns true. + *

+ *

+ * The default behavior is to throw an {@link UnsupportedOperationException} + *

+ * + * @param numBytes The number of bytes to read into the returned byte buffer + * @param additionalTimeout additional timeout in milliseconds to add to the configured timeout + * @return a byte buffer filled with number of bytes requested + * @throws IOException if there are problems reading from the stream + * @throws UnsupportedOperationException if this implementation does not support additional timeouts + * @see #supportsAdditionalTimeout() + * @since 4.1 + */ + default ByteBuf read(int numBytes, int additionalTimeout) throws IOException { + throw new UnsupportedOperationException(); + } + /** * Write each buffer in the list to the stream in order, asynchronously. This method should return immediately, and invoke the given * callback on completion. diff --git a/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java b/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java index 4bf1382a33d..5f3e6a2c4f5 100644 --- a/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java +++ b/driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java @@ -196,6 +196,11 @@ private static class TlsChannelStream extends AsynchronousChannelStream implemen this.selectorMonitor = selectorMonitor; } + @Override + public boolean supportsAdditionalTimeout() { + return true; + } + @Override public void openAsync(final AsyncCompletionHandler handler) { isTrue("unopened", getChannel() == null); diff --git a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java index 1d5acd657e1..948da9162ec 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/connection/netty/NettyStream.java @@ -175,8 +175,18 @@ public void write(final List buffers) throws IOException { @Override public ByteBuf read(final int numBytes) throws IOException { + return read(numBytes, 0); + } + + @Override + public boolean supportsAdditionalTimeout() { + return true; + } + + @Override + public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException { FutureAsyncCompletionHandler future = new FutureAsyncCompletionHandler(); - readAsync(numBytes, future); + readAsync(numBytes, future, additionalTimeout); return future.get(); } @@ -201,7 +211,11 @@ public void operationComplete(final ChannelFuture future) throws Exception { @Override public void readAsync(final int numBytes, final AsyncCompletionHandler handler) { - scheduleReadTimeout(); + readAsync(numBytes, handler, 0); + } + + private void readAsync(final int numBytes, final AsyncCompletionHandler handler, final int additionalTimeout) { + scheduleReadTimeout(additionalTimeout); ByteBuf buffer = null; Throwable exceptionResult = null; synchronized (this) { @@ -431,15 +445,18 @@ public void operationComplete(final ChannelFuture future) { } } - private void scheduleReadTimeout() { - adjustTimeout(false); + private void scheduleReadTimeout(final int additionalTimeout) { + adjustTimeout(false, additionalTimeout); } private void disableReadTimeout() { - adjustTimeout(true); + adjustTimeout(true, 0); } - private void adjustTimeout(final boolean disable) { + private void adjustTimeout(final boolean disable, final int additionalTimeout) { + if (isClosed) { + return; + } ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME); if (timeoutHandler != null) { final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler; @@ -459,12 +476,12 @@ public void run() { } } else { if (executor.inEventLoop()) { - readTimeoutHandler.scheduleTimeout(handlerContext); + readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); } else { executor.submit(new Runnable() { @Override public void run() { - readTimeoutHandler.scheduleTimeout(handlerContext); + readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout); } }); } diff --git a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java index 4b4533f3cde..824c8f7d6a3 100644 --- a/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java +++ b/driver-core/src/main/com/mongodb/connection/netty/ReadTimeoutHandler.java @@ -40,10 +40,10 @@ final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter { this.readTimeout = readTimeout; } - void scheduleTimeout(final ChannelHandlerContext ctx) { + void scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) { isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop()); if (timeout == null) { - timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout, TimeUnit.MILLISECONDS); + timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout + additionalTimeout, TimeUnit.MILLISECONDS); } } diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java index 9f2d3e30ea7..1ce62ef0237 100644 --- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java +++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatFailedEvent.java @@ -31,6 +31,7 @@ public final class ServerHeartbeatFailedEvent { private final ConnectionId connectionId; private final long elapsedTimeNanos; + private final boolean awaited; private final Throwable throwable; /** @@ -39,9 +40,26 @@ public final class ServerHeartbeatFailedEvent { * @param connectionId the non-null connectionId * @param elapsedTimeNanos the non-negative elapsed time in nanoseconds * @param throwable the non-null exception that caused the failure + * @deprecated Prefer {@link #ServerHeartbeatFailedEvent(ConnectionId, long, boolean, Throwable)} */ + @Deprecated public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final Throwable throwable) { + this(connectionId, elapsedTimeNanos, false, throwable); + } + + /** + * Construct an instance. + * + * @param connectionId the non-null connectionId + * @param elapsedTimeNanos the non-negative elapsed time in nanoseconds + * @param awaited true if the response was awaited + * @param throwable the non-null exception that caused the failure + * @since 4.1 + */ + public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final boolean awaited, + final Throwable throwable) { this.connectionId = notNull("connectionId", connectionId); + this.awaited = awaited; isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0); this.elapsedTimeNanos = elapsedTimeNanos; this.throwable = notNull("throwable", throwable); @@ -67,6 +85,18 @@ public long getElapsedTime(final TimeUnit timeUnit) { return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS); } + /** + * Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time + * to the server and the time that the server waited before sending a response. + * + * @return whether the response was awaited + * @since 4.1 + * @mongodb.server.release 4.4 + */ + public boolean isAwaited() { + return awaited; + } + /** * Gets the exceptions that caused the failure * @@ -81,6 +111,7 @@ public String toString() { return "ServerHeartbeatFailedEvent{" + "connectionId=" + connectionId + ", elapsedTimeNanos=" + elapsedTimeNanos + + ", awaited=" + awaited + ", throwable=" + throwable + "} " + super.toString(); } diff --git a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java index f41dc2b68ea..0d55b8b2509 100644 --- a/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java +++ b/driver-core/src/main/com/mongodb/event/ServerHeartbeatSucceededEvent.java @@ -33,6 +33,7 @@ public final class ServerHeartbeatSucceededEvent { private final ConnectionId connectionId; private final BsonDocument reply; private final long elapsedTimeNanos; + private final boolean awaited; /** * Construct an instance. @@ -40,12 +41,29 @@ public final class ServerHeartbeatSucceededEvent { * @param connectionId the non-null connectionId * @param reply the non-null reply to an isMaster command * @param elapsedTimeNanos the non-negative elapsed time in nanoseconds + * @deprecated Prefer {@link #ServerHeartbeatSucceededEvent(ConnectionId, BsonDocument, long, boolean)} */ + @Deprecated public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos) { + this(connectionId, reply, elapsedTimeNanos, false); + } + + /** + * Construct an instance. + * + * @param connectionId the non-null connectionId + * @param reply the non-null reply to an isMaster command + * @param elapsedTimeNanos the non-negative elapsed time in nanoseconds + * @param awaited true if the response was awaited + * @since 4.1 + */ + public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos, + final boolean awaited) { this.connectionId = notNull("connectionId", connectionId); this.reply = notNull("reply", reply); isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0); this.elapsedTimeNanos = elapsedTimeNanos; + this.awaited = awaited; } /** @@ -77,12 +95,25 @@ public long getElapsedTime(final TimeUnit timeUnit) { return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS); } + /** + * Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time + * to the server and the time that the server waited before sending a response. + * + * @return whether the response was awaited + * @since 4.1 + * @mongodb.server.release 4.4 + */ + public boolean isAwaited() { + return awaited; + } + @Override public String toString() { return "ServerHeartbeatSucceededEvent{" + "connectionId=" + connectionId + ", reply=" + reply + ", elapsedTimeNanos=" + elapsedTimeNanos + + ", awaited=" + awaited + "} "; } } diff --git a/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClient.java b/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClient.java index 33db746e632..c497e4b3630 100644 --- a/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClient.java +++ b/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClient.java @@ -18,6 +18,9 @@ import com.mongodb.ClientSessionOptions; import com.mongodb.annotations.Immutable; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.connection.ClusterSettings; +import com.mongodb.event.ClusterListener; import com.mongodb.internal.async.SingleResultCallback; import org.bson.Document; import org.bson.conversions.Bson; @@ -233,4 +236,19 @@ public interface AsyncMongoClient extends Closeable { AsyncChangeStreamIterable watch(AsyncClientSession clientSession, List pipeline, Class resultClass); + /** + * Gets the current cluster description. + * + *

+ * This method will not block, meaning that it may return a {@link ClusterDescription} whose {@code clusterType} is unknown + * and whose {@link com.mongodb.connection.ServerDescription}s are all in the connecting state. If the application requires + * notifications after the driver has connected to a member of the cluster, it should register a {@link ClusterListener} via + * the {@link ClusterSettings} in {@link com.mongodb.MongoClientSettings}. + *

+ * + * @return the current cluster description + * @see ClusterSettings.Builder#addClusterListener(ClusterListener) + * @see com.mongodb.MongoClientSettings.Builder#applyToClusterSettings(com.mongodb.Block) + */ + ClusterDescription getClusterDescription(); } diff --git a/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClientImpl.java b/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClientImpl.java index 5a41cb8b25b..07a71171e52 100644 --- a/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClientImpl.java +++ b/driver-core/src/main/com/mongodb/internal/async/client/AsyncMongoClientImpl.java @@ -22,6 +22,7 @@ import com.mongodb.MongoClientException; import com.mongodb.MongoClientSettings; import com.mongodb.ReadPreference; +import com.mongodb.connection.ClusterDescription; import com.mongodb.diagnostics.logging.Logger; import com.mongodb.diagnostics.logging.Loggers; import com.mongodb.internal.async.SingleResultCallback; @@ -211,6 +212,11 @@ public AsyncChangeStreamIterable watch(final AsyncClientSessi return createChangeStreamIterable(clientSession, pipeline, resultClass); } + @Override + public ClusterDescription getClusterDescription() { + return cluster.getCurrentDescription(); + } + private AsyncChangeStreamIterable createChangeStreamIterable(@Nullable final AsyncClientSession clientSession, final List pipeline, final Class resultClass) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java index f415cf46377..991fa6e616d 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java @@ -17,7 +17,6 @@ package com.mongodb.internal.connection; import com.mongodb.MongoException; -import com.mongodb.MongoNotPrimaryException; import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ClusterDescription; @@ -30,7 +29,6 @@ import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.event.ServerListener; -import org.bson.BsonDocument; import org.bson.types.ObjectId; import java.util.ArrayList; @@ -283,7 +281,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip setVersion, electionId, maxSetVersion, maxElectionId)); } - addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate(); + addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting(); return false; } @@ -370,7 +368,7 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) { if (LOGGER.isInfoEnabled()) { LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress())); } - serverTuple.server.invalidate(new MongoNotPrimaryException(new BsonDocument(), serverTuple.description.getAddress())); + serverTuple.server.invalidate(); } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java index b113acbed9e..2a5caf4dbcf 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java @@ -105,9 +105,18 @@ public void failed(final Throwable t) { @Override public void readAsync(final int numBytes, final AsyncCompletionHandler handler) { + readAsync(numBytes, 0, handler); + } + + private void readAsync(final int numBytes, final int additionalTimeout, final AsyncCompletionHandler handler) { ByteBuf buffer = bufferProvider.getBuffer(numBytes); - channel.read(buffer.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null, - new BasicCompletionHandler(buffer, handler)); + + int timeout = settings.getReadTimeout(MILLISECONDS); + if (timeout > 0 && additionalTimeout > 0) { + timeout += additionalTimeout; + } + + channel.read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler)); } @Override @@ -131,6 +140,18 @@ public ByteBuf read(final int numBytes) throws IOException { return handler.getRead(); } + @Override + public boolean supportsAdditionalTimeout() { + return true; + } + + @Override + public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException { + FutureAsyncCompletionHandler handler = new FutureAsyncCompletionHandler(); + readAsync(numBytes, additionalTimeout, handler); + return handler.getRead(); + } + @Override public ServerAddress getAddress() { return serverAddress; diff --git a/driver-core/src/main/com/mongodb/internal/connection/ClusterableServer.java b/driver-core/src/main/com/mongodb/internal/connection/ClusterableServer.java index f5ce8544197..409c067cc9b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ClusterableServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ClusterableServer.java @@ -20,31 +20,31 @@ * A logical connection to a MongoDB server that supports clustering along with other servers. */ interface ClusterableServer extends Server { - /** - * Invalidate the description of this server. Implementation of this method should not block, but rather trigger an asynchronous - * attempt to connect with the server in order to determine its current status. - */ - void invalidate(); + + enum ConnectionState { + BEFORE_HANDSHAKE, + AFTER_HANDSHAKE + } /** - * Invalidate the description of this server due to the passed in reason. - * @param connectionGeneration the connection pool's generation of the connection from which the error arose + * Reset server description to connecting state */ - void invalidate(int connectionGeneration); + void resetToConnecting(); /** - * Invalidate the description of this server due to the passed in reason. - * @param reason the reason for invalidation. + * Invalidate the description of this server. Implementation of this method should not block, but rather trigger an asynchronous + * attempt to connect with the server in order to determine its current status. */ - void invalidate(Throwable reason); + void invalidate(); /** * Invalidate the description of this server due to the passed in reason. + * @param connectionState the connection state * @param reason the reason for invalidation. * @param connectionGeneration the connection pool's generation of the connection from which the error arose * @param maxWireVersion the maxWireVersion from the connection from which the error arose */ - void invalidate(Throwable reason, int connectionGeneration, int maxWireVersion); + void invalidate(ConnectionState connectionState, Throwable reason, int connectionGeneration, int maxWireVersion); /** *

Closes the server. Instances that have been closed will no longer be available for use.

diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index 7e9895fee73..8c7a796906a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -20,8 +20,8 @@ import com.mongodb.MongoNamespace; import com.mongodb.ReadPreference; import com.mongodb.connection.ClusterConnectionMode; -import com.mongodb.internal.validator.MappedFieldNameValidator; import com.mongodb.internal.session.SessionContext; +import com.mongodb.internal.validator.MappedFieldNameValidator; import org.bson.BsonArray; import org.bson.BsonBinaryWriter; import org.bson.BsonBoolean; @@ -34,7 +34,7 @@ import org.bson.codecs.EncoderContext; import org.bson.io.BsonOutput; -import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -61,6 +61,7 @@ public final class CommandMessage extends RequestMessage { private final BsonDocument command; private final FieldNameValidator commandFieldNameValidator; private final ReadPreference readPreference; + private final boolean exhaustAllowed; private final SplittablePayload payload; private final FieldNameValidator payloadFieldNameValidator; private final boolean responseExpected; @@ -72,16 +73,32 @@ public final class CommandMessage extends RequestMessage { MULTIPLE); } + CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator, + final ReadPreference readPreference, final MessageSettings settings, final boolean exhaustAllowed) { + this(namespace, command, commandFieldNameValidator, readPreference, settings, true, exhaustAllowed, null, null, + MULTIPLE); + } + CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator, final ReadPreference readPreference, final MessageSettings settings, final boolean responseExpected, final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator, final ClusterConnectionMode clusterConnectionMode) { + this(namespace, command, commandFieldNameValidator, readPreference, settings, responseExpected, false, payload, + payloadFieldNameValidator, clusterConnectionMode); + } + + CommandMessage(final MongoNamespace namespace, final BsonDocument command, final FieldNameValidator commandFieldNameValidator, + final ReadPreference readPreference, final MessageSettings settings, + final boolean responseExpected, final boolean exhaustAllowed, + final SplittablePayload payload, final FieldNameValidator payloadFieldNameValidator, + final ClusterConnectionMode clusterConnectionMode) { super(namespace.getFullName(), getOpCode(settings), settings); this.namespace = namespace; this.command = command; this.commandFieldNameValidator = commandFieldNameValidator; this.readPreference = readPreference; this.responseExpected = responseExpected; + this.exhaustAllowed = exhaustAllowed; this.payload = payload; this.payloadFieldNameValidator = payloadFieldNameValidator; this.clusterConnectionMode = clusterConnectionMode; @@ -99,7 +116,7 @@ BsonDocument getCommandDocument(final ByteBufferBsonOutput bsonOutput) { + byteBufBsonDocument.getSizeInBytes() + 1 // payload type + 4 // payload size - + payload.getPayloadName().getBytes(Charset.forName("UTF-8")).length + 1; // null-terminated UTF-8 payload name + + payload.getPayloadName().getBytes(StandardCharsets.UTF_8).length + 1; // null-terminated UTF-8 payload name commandBsonDocument.append(payload.getPayloadName(), new BsonArray(ByteBufBsonDocument.createList(bsonOutput, payloadStartPosition))); } else { @@ -186,11 +203,14 @@ private int getOpMsgFlagBits() { } private int getOpMsgResponseExpectedFlagBit() { - if (requireOpMsgResponse()) { - return 0; - } else { - return 1 << 1; + int flagBits = 0; + if (!requireOpMsgResponse()) { + flagBits = 1 << 1; + } + if (exhaustAllowed) { + flagBits |= 1 << 16; } + return flagBits; } private boolean requireOpMsgResponse() { diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index 42d33f44b78..cc26278b5b7 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -485,6 +485,36 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod return wrapped.sendAndReceive(message, decoder, sessionContext); } + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + isTrue("open", !isClosed.get()); + wrapped.send(message, decoder, sessionContext); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + isTrue("open", !isClosed.get()); + return wrapped.receive(decoder, sessionContext); + } + + @Override + public boolean supportsAdditionalTimeout() { + isTrue("open", !isClosed.get()); + return wrapped.supportsAdditionalTimeout(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext, final int additionalTimeout) { + isTrue("open", !isClosed.get()); + return wrapped.receive(decoder, sessionContext, additionalTimeout); + } + + @Override + public boolean hasMoreToCome() { + isTrue("open", !isClosed.get()); + return wrapped.hasMoreToCome(); + } + @Override public void sendAndReceiveAsync(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext, final SingleResultCallback callback) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java index 57e50f6bd9b..9ca776c8e01 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java @@ -26,7 +26,6 @@ import com.mongodb.connection.ClusterConnectionMode; import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerId; -import com.mongodb.connection.ServerType; import com.mongodb.connection.TopologyVersion; import com.mongodb.diagnostics.logging.Logger; import com.mongodb.diagnostics.logging.Loggers; @@ -44,6 +43,8 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.connection.ClusterableServer.ConnectionState.AFTER_HANDSHAKE; +import static com.mongodb.internal.connection.ClusterableServer.ConnectionState.BEFORE_HANDSHAKE; import static com.mongodb.internal.operation.ServerVersionHelper.FOUR_DOT_TWO_WIRE_VERSION; import static java.util.Arrays.asList; @@ -93,7 +94,7 @@ public Connection getConnection() { connectionPool.invalidate(); throw e; } catch (MongoSocketException e) { - invalidate(connectionPool.getGeneration()); + invalidate(ConnectionState.BEFORE_HANDSHAKE, e, connectionPool.getGeneration(), description.getMaxWireVersion()); throw e; } } @@ -107,7 +108,7 @@ public void onResult(final InternalConnection result, final Throwable t) { if (t instanceof MongoSecurityException) { connectionPool.invalidate(); } else if (t instanceof MongoSocketException) { - invalidate(connectionPool.getGeneration()); + invalidate(ConnectionState.BEFORE_HANDSHAKE, t, connectionPool.getGeneration(), description.getMaxWireVersion()); } if (t != null) { callback.onResult(null, t); @@ -126,52 +127,46 @@ public ServerDescription getDescription() { } @Override - public void invalidate() { - invalidate(connectionPool.getGeneration()); + public void resetToConnecting() { + serverStateListener.stateChanged(new ChangeEvent<>(description, ServerDescription.builder() + .state(CONNECTING).address(serverId.getAddress()).build())); } @Override - public void invalidate(final int connectionGeneration) { + public synchronized void invalidate() { if (!isClosed()) { - if (connectionGeneration < connectionPool.getGeneration()) { - return; - } - serverStateListener.stateChanged(new ChangeEvent(description, ServerDescription.builder() - .state(CONNECTING) - .address(serverId.getAddress()) - .build())); - connectionPool.invalidate(); - // TODO: in streaming protocol, we want to close the current connection and start over + serverStateListener.stateChanged(new ChangeEvent<>(description, ServerDescription.builder() + .state(CONNECTING).address(serverId.getAddress()).build())); connect(); + if (description.getMaxWireVersion() < FOUR_DOT_TWO_WIRE_VERSION) { + connectionPool.invalidate(); + } } } @Override - public void invalidate(final Throwable reason) { - invalidate(reason, connectionPool.getGeneration(), description.getMaxWireVersion()); - } - - @Override - public void invalidate(final Throwable t, final int connectionGeneration, final int maxWireVersion) { + public synchronized void invalidate(final ConnectionState connectionState, final Throwable t, final int connectionGeneration, + final int maxWireVersion) { if (!isClosed()) { if (connectionGeneration < connectionPool.getGeneration()) { return; } - if ((t instanceof MongoSocketException && !(t instanceof MongoSocketReadTimeoutException))) { - invalidate(); + if (t instanceof MongoSocketException + && (!(t instanceof MongoSocketReadTimeoutException) || connectionState == BEFORE_HANDSHAKE)) { + serverStateListener.stateChanged(new ChangeEvent<>(description, ServerDescription.builder() + .state(CONNECTING).address(serverId.getAddress()).exception(t).build())); + connectionPool.invalidate(); + serverMonitor.cancelCurrentCheck(); } else if (t instanceof MongoNotPrimaryException || t instanceof MongoNodeIsRecoveringException) { if (isStale(((MongoCommandException) t))) { return; } - if (maxWireVersion < FOUR_DOT_TWO_WIRE_VERSION) { - invalidate(); - } else if (SHUTDOWN_CODES.contains(((MongoCommandException) t).getErrorCode())) { - invalidate(); - } else { - ChangeEvent event = new ChangeEvent(description, ServerDescription.builder() - .state(CONNECTING).type(ServerType.UNKNOWN).address(serverId.getAddress()).exception(t).build()); - serverStateListener.stateChanged(event); - connect(); + serverStateListener.stateChanged(new ChangeEvent(description, ServerDescription.builder() + .state(CONNECTING).address(serverId.getAddress()).exception(t).build())); + connect(); + + if (maxWireVersion < FOUR_DOT_TWO_WIRE_VERSION || SHUTDOWN_CODES.contains(((MongoCommandException) t).getErrorCode())) { + connectionPool.invalidate(); } } } @@ -200,15 +195,6 @@ private boolean isStale(final TopologyVersion currentTopologyVersion, final Topo return false; } - public void invalidate(final Throwable t, final int connectionGeneration, final int maxWireVersion, - final SessionContext sessionContext) { - notNull("sessionContext", sessionContext); - invalidate(t, connectionGeneration, maxWireVersion); - if (t instanceof MongoSocketException && sessionContext.hasSession()) { - sessionContext.markSessionDirty(); - } - } - @Override public void close() { if (!isClosed()) { @@ -240,7 +226,7 @@ public T execute(final LegacyProtocol protocol, final InternalConnection protocol.setCommandListener(commandListener); return protocol.execute(connection); } catch (MongoException e) { - invalidate(e, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); + invalidate(AFTER_HANDSHAKE, e, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); throw e; } } @@ -253,7 +239,7 @@ public void executeAsync(final LegacyProtocol protocol, final InternalCon @Override public void onResult(final T result, final Throwable t) { if (t != null) { - invalidate(t, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); + invalidate(AFTER_HANDSHAKE, t, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); } callback.onResult(result, t); } @@ -271,7 +257,10 @@ public T execute(final CommandProtocol protocol, final InternalConnection invalidate(); return (T) e.getResponse(); } catch (MongoException e) { - invalidate(e, connection.getGeneration(), connection.getDescription().getMaxWireVersion(), sessionContext); + invalidate(AFTER_HANDSHAKE, e, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); + if (e instanceof MongoSocketException && sessionContext.hasSession()) { + sessionContext.markSessionDirty(); + } throw e; } } @@ -289,7 +278,10 @@ public void onResult(final T result, final Throwable t) { invalidate(); callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null); } else { - invalidate(t, connection.getGeneration(), connection.getDescription().getMaxWireVersion(), sessionContext); + invalidate(AFTER_HANDSHAKE, t, connection.getGeneration(), connection.getDescription().getMaxWireVersion()); + if (t instanceof MongoSocketException && sessionContext.hasSession()) { + sessionContext.markSessionDirty(); + } callback.onResult(null, t); } } else { diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 5b54d272770..d97fbc6df02 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -16,6 +16,7 @@ package com.mongodb.internal.connection; +import com.mongodb.MongoNamespace; import com.mongodb.MongoSocketException; import com.mongodb.annotations.ThreadSafe; import com.mongodb.connection.ServerDescription; @@ -27,14 +28,21 @@ import com.mongodb.event.ServerHeartbeatStartedEvent; import com.mongodb.event.ServerHeartbeatSucceededEvent; import com.mongodb.event.ServerMonitorListener; +import com.mongodb.internal.session.SessionContext; +import com.mongodb.internal.validator.NoOpFieldNameValidator; import org.bson.BsonDocument; import org.bson.BsonInt32; +import org.bson.BsonInt64; +import org.bson.codecs.BsonDocumentCodec; import org.bson.types.ObjectId; +import java.util.Objects; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import static com.mongodb.MongoNamespace.COMMAND_COLLECTION_NAME; +import static com.mongodb.ReadPreference.primary; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.connection.ServerType.UNKNOWN; @@ -59,6 +67,9 @@ class DefaultServerMonitor implements ServerMonitor { private final ServerSettings serverSettings; private final ServerMonitorRunnable monitor; private final Thread monitorThread; + private final RoundTripTimeRunnable roundTripTimeMonitor; + private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2); + private final Thread roundTripTimeMonitorThread; private final Lock lock = new ReentrantLock(); private final Condition condition = lock.newCondition(); private volatile boolean isClosed; @@ -76,12 +87,17 @@ class DefaultServerMonitor implements ServerMonitor { monitor = new ServerMonitorRunnable(); monitorThread = new Thread(monitor, "cluster-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress()); monitorThread.setDaemon(true); + roundTripTimeMonitor = new RoundTripTimeRunnable(); + roundTripTimeMonitorThread = new Thread(roundTripTimeMonitor, + "cluster-rtt-" + this.serverId.getClusterId() + "-" + this.serverId.getAddress()); + roundTripTimeMonitorThread.setDaemon(true); isClosed = false; } @Override public void start() { monitorThread.start(); + roundTripTimeMonitorThread.start(); } @Override @@ -97,67 +113,60 @@ public void connect() { @Override public void close() { isClosed = true; + monitor.close(); monitorThread.interrupt(); + roundTripTimeMonitor.close(); + roundTripTimeMonitorThread.interrupt(); + } + + @Override + public void cancelCurrentCheck() { + monitor.cancelCurrentCheck(); } class ServerMonitorRunnable implements Runnable { - private final ExponentiallyWeightedMovingAverage averageRoundTripTime = new ExponentiallyWeightedMovingAverage(0.2); + private volatile InternalConnection connection = null; + private volatile boolean currentCheckCancelled; + + void close() { + InternalConnection connection = this.connection; + if (connection != null) { + connection.close(); + } + } @Override - @SuppressWarnings("unchecked") - public synchronized void run() { - InternalConnection connection = null; - try { - ServerDescription currentServerDescription = getConnectingServerDescription(null); - while (!isClosed) { - ServerDescription previousServerDescription = currentServerDescription; - try { - if (connection == null) { - connection = internalConnectionFactory.create(serverId); - try { - connection.open(); - currentServerDescription = connection.getInitialServerDescription(); - } catch (Throwable t) { - connection = null; - throw t; - } - } else { - try { - currentServerDescription = lookupServerDescription(connection); - } catch (MongoSocketException e) { - connectionPool.invalidate(); - connection.close(); - connection = null; - connection = internalConnectionFactory.create(serverId); - try { - connection.open(); - currentServerDescription = connection.getInitialServerDescription(); - } catch (Throwable t) { - connection = null; - throw t; - } - } - } - } catch (Throwable t) { - averageRoundTripTime.reset(); - currentServerDescription = getConnectingServerDescription(t); - } + public void run() { + ServerDescription currentServerDescription = getConnectingServerDescription(null); + while (!isClosed) { + ServerDescription previousServerDescription = currentServerDescription; + currentServerDescription = lookupServerDescription(currentServerDescription); - if (!isClosed) { - try { - logStateChange(previousServerDescription, currentServerDescription); - serverStateListener.stateChanged(new ChangeEvent(previousServerDescription, - currentServerDescription)); - } catch (Throwable t) { - LOGGER.warn("Exception in monitor thread during notification of server description state change", t); - } - waitForNext(); - } + if (isClosed) { + continue; } - } finally { - if (connection != null) { - connection.close(); + + if (currentCheckCancelled) { + waitForNext(); + currentCheckCancelled = false; + continue; + } + + logStateChange(previousServerDescription, currentServerDescription); + serverStateListener.stateChanged(new ChangeEvent<>(previousServerDescription, currentServerDescription)); + + if (currentServerDescription.getException() != null) { + connectionPool.invalidate(); } + + if (((connection == null || shouldStreamResponses(currentServerDescription)) + && currentServerDescription.getTopologyVersion() != null) + || (connection != null && connection.hasMoreToCome()) + || (currentServerDescription.getException() instanceof MongoSocketException + && previousServerDescription.getType() != UNKNOWN)) { + continue; + } + waitForNext(); } } @@ -165,36 +174,89 @@ private ServerDescription getConnectingServerDescription(final Throwable excepti return ServerDescription.builder().type(UNKNOWN).state(CONNECTING).address(serverId.getAddress()).exception(exception).build(); } - private ServerDescription lookupServerDescription(final InternalConnection connection) { - if (LOGGER.isDebugEnabled()) { - LOGGER.debug(format("Checking status of %s", serverId.getAddress())); - } - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(connection.getDescription().getConnectionId())); - - long start = System.nanoTime(); + private ServerDescription lookupServerDescription(final ServerDescription currentServerDescription) { try { - BsonDocument isMasterResult = - executeCommand("admin", new BsonDocument("ismaster", new BsonInt32(1)), clusterClock, connection); - long elapsedTimeNanos = System.nanoTime() - start; - averageRoundTripTime.addSample(elapsedTimeNanos); - - serverMonitorListener.serverHeartbeatSucceeded( - new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), isMasterResult, elapsedTimeNanos)); - - return createServerDescription(serverId.getAddress(), isMasterResult, averageRoundTripTime.getAverage()); - } catch (RuntimeException e) { - serverMonitorListener.serverHeartbeatFailed( - new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start, e)); - throw e; + if (connection == null || connection.isClosed()) { + currentCheckCancelled = false; + connection = internalConnectionFactory.create(serverId); + connection.open(); + averageRoundTripTime.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); + return connection.getInitialServerDescription(); + } + + if (LOGGER.isDebugEnabled()) { + LOGGER.debug(format("Checking status of %s", serverId.getAddress())); + } + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent(connection.getDescription().getConnectionId())); + + long start = System.nanoTime(); + try { + SessionContext sessionContext = new ClusterClockAdvancingSessionContext(NoOpSessionContext.INSTANCE, clusterClock); + if (!connection.hasMoreToCome()) { + BsonDocument ismaster = new BsonDocument("ismaster", new BsonInt32(1)); + if (shouldStreamResponses(currentServerDescription)) { + ismaster.append("topologyVersion", currentServerDescription.getTopologyVersion().asDocument()); + ismaster.append("maxAwaitTimeMS", new BsonInt64(serverSettings.getHeartbeatFrequency(MILLISECONDS))); + } + + connection.send(createCommandMessage(ismaster, connection, currentServerDescription), new BsonDocumentCodec(), + sessionContext); + } + + BsonDocument isMasterResult; + if (shouldStreamResponses(currentServerDescription)) { + isMasterResult = connection.receive(new BsonDocumentCodec(), sessionContext, + Math.toIntExact(serverSettings.getHeartbeatFrequency(MILLISECONDS))); + } else { + isMasterResult = connection.receive(new BsonDocumentCodec(), sessionContext); + } + + long elapsedTimeNanos = System.nanoTime() - start; + serverMonitorListener.serverHeartbeatSucceeded( + new ServerHeartbeatSucceededEvent(connection.getDescription().getConnectionId(), isMasterResult, + elapsedTimeNanos, currentServerDescription.getTopologyVersion() != null)); + + return createServerDescription(serverId.getAddress(), isMasterResult, averageRoundTripTime.getAverage()); + } catch (RuntimeException e) { + serverMonitorListener.serverHeartbeatFailed( + new ServerHeartbeatFailedEvent(connection.getDescription().getConnectionId(), System.nanoTime() - start, + currentServerDescription.getTopologyVersion() != null, e)); + throw e; + } + } catch (Throwable t) { + averageRoundTripTime.reset(); + InternalConnection localConnection; + synchronized (this) { + localConnection = connection; + connection = null; + } + if (localConnection != null) { + localConnection.close(); + } + return getConnectingServerDescription(t); } } + private boolean shouldStreamResponses(final ServerDescription currentServerDescription) { + return currentServerDescription.getTopologyVersion() != null && connection.supportsAdditionalTimeout(); + } + + private CommandMessage createCommandMessage(final BsonDocument ismaster, final InternalConnection connection, + final ServerDescription currentServerDescription) { + return new CommandMessage(new MongoNamespace("admin", COMMAND_COLLECTION_NAME), ismaster, + new NoOpFieldNameValidator(), primary(), + MessageSettings.builder() + .maxWireVersion(connection.getDescription().getMaxWireVersion()) + .build(), + shouldStreamResponses(currentServerDescription)); + } + private void logStateChange(final ServerDescription previousServerDescription, final ServerDescription currentServerDescription) { if (shouldLogStageChange(previousServerDescription, currentServerDescription)) { if (currentServerDescription.getException() != null) { LOGGER.info(format("Exception in monitor thread while connecting to server %s", serverId.getAddress()), - currentServerDescription.getException()); + currentServerDescription.getException()); } else { LOGGER.info(format("Monitor thread successfully connected to server with description %s", currentServerDescription)); } @@ -227,6 +289,19 @@ private long waitForSignalOrTimeout() throws InterruptedException { lock.unlock(); } } + + public void cancelCurrentCheck() { + InternalConnection localConnection = null; + synchronized (this) { + if (connection != null && !currentCheckCancelled) { + localConnection = connection; + currentCheckCancelled = true; + } + } + if (localConnection != null) { + localConnection.close(); + } + } } static boolean shouldLogStageChange(final ServerDescription previous, final ServerDescription current) { @@ -238,7 +313,7 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv return true; } if (previous.getCanonicalAddress() != null - ? !previous.getCanonicalAddress().equals(current.getCanonicalAddress()) : current.getCanonicalAddress() != null) { + ? !previous.getCanonicalAddress().equals(current.getCanonicalAddress()) : current.getCanonicalAddress() != null) { return true; } if (!previous.getHosts().equals(current.getHosts())) { @@ -284,16 +359,76 @@ static boolean shouldLogStageChange(final ServerDescription previous, final Serv Throwable currentException = current.getException(); Class thisExceptionClass = previousException != null ? previousException.getClass() : null; Class thatExceptionClass = currentException != null ? currentException.getClass() : null; - if (thisExceptionClass != null ? !thisExceptionClass.equals(thatExceptionClass) : thatExceptionClass != null) { + if (!Objects.equals(thisExceptionClass, thatExceptionClass)) { return true; } String thisExceptionMessage = previousException != null ? previousException.getMessage() : null; String thatExceptionMessage = currentException != null ? currentException.getMessage() : null; - if (thisExceptionMessage != null ? !thisExceptionMessage.equals(thatExceptionMessage) : thatExceptionMessage != null) { + if (!Objects.equals(thisExceptionMessage, thatExceptionMessage)) { return true; } return false; } + + + private class RoundTripTimeRunnable implements Runnable { + private volatile InternalConnection connection = null; + + void close() { + InternalConnection connection = this.connection; + if (connection != null) { + connection.close(); + } + } + + @Override + public void run() { + try { + while (!isClosed) { + try { + if (connection == null) { + initialize(); + } else { + pingServer(connection); + } + } catch (Throwable t) { + if (connection != null) { + connection.close(); + connection = null; + } + averageRoundTripTime.reset(); + } + waitForNext(); + } + } finally { + if (connection != null) { + connection.close(); + } + } + } + + private void initialize() { + connection = null; + connection = internalConnectionFactory.create(serverId); + connection.open(); + averageRoundTripTime.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); + } + + private void pingServer(final InternalConnection connection) { + long start = System.nanoTime(); + executeCommand("admin", new BsonDocument("ismaster", new BsonInt32(1)), clusterClock, connection); + long elapsedTimeNanos = System.nanoTime() - start; + averageRoundTripTime.addSample(elapsedTimeNanos); + } + } + + private void waitForNext() { + try { + Thread.sleep(serverSettings.getHeartbeatFrequency(MILLISECONDS)); + } catch (InterruptedException e) { + // fall through + } + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ExponentiallyWeightedMovingAverage.java b/driver-core/src/main/com/mongodb/internal/connection/ExponentiallyWeightedMovingAverage.java index 03e65b10705..3d1986d8eed 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ExponentiallyWeightedMovingAverage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ExponentiallyWeightedMovingAverage.java @@ -16,11 +16,8 @@ package com.mongodb.internal.connection; -import com.mongodb.annotations.NotThreadSafe; - import static com.mongodb.assertions.Assertions.isTrueArgument; -@NotThreadSafe class ExponentiallyWeightedMovingAverage { private final double alpha; private long average = -1; @@ -30,11 +27,11 @@ class ExponentiallyWeightedMovingAverage { this.alpha = alpha; } - void reset() { + synchronized void reset() { average = -1; } - long addSample(final long sample) { + synchronized long addSample(final long sample) { if (average == -1) { average = sample; } else { @@ -44,7 +41,7 @@ long addSample(final long sample) { return average; } - long getAverage() { + synchronized long getAverage() { return average == -1 ? 0 : average; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java index 33a6df11889..8e6fd0203d5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalConnection.java @@ -90,6 +90,21 @@ default int getGeneration() { */ T sendAndReceive(CommandMessage message, Decoder decoder, SessionContext sessionContext); + void send(CommandMessage message, Decoder decoder, SessionContext sessionContext); + + T receive(Decoder decoder, SessionContext sessionContext); + + + default boolean supportsAdditionalTimeout() { + return false; + } + + default T receive(Decoder decoder, SessionContext sessionContext, int additionalTimeout) { + throw new UnsupportedOperationException(); + } + + boolean hasMoreToCome(); + /** * Send a command message to the server. * @@ -132,5 +147,4 @@ void sendAndReceiveAsync(CommandMessage message, Decoder decoder, Session * @param callback the callback to invoke on completion */ void receiveMessageAsync(int responseTo, SingleResultCallback callback); - } diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java index 25e34754023..f43e298a9c4 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java @@ -105,6 +105,8 @@ public class InternalStreamConnection implements InternalConnection { private final CommandListener commandListener; private volatile Compressor sendCompressor; private volatile Map compressorMap; + private volatile boolean hasMoreToCome; + private volatile int responseTo; public InternalStreamConnection(final ServerId serverId, final StreamFactory streamFactory, final List compressorList, final CommandListener commandListener, @@ -139,13 +141,14 @@ public void open() { stream = streamFactory.create(serverId.getAddress()); try { stream.open(); - LOGGER.debug("Done opening stream to " + serverId.toString()); InternalConnectionInitializationDescription initializationDescription = connectionInitializer.initialize(this); description = initializationDescription.getConnectionDescription(); initialServerDescription = initializationDescription.getServerDescription(); opened.set(true); sendCompressor = findSendCompressor(description); - LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress())); + if (LOGGER.isInfoEnabled()) { + LOGGER.info(format("Opened connection [%s] to %s", getId(), serverId.getAddress())); + } } catch (Throwable t) { close(); if (t instanceof MongoException) { @@ -274,7 +277,7 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod try { sendCommandMessage(message, bsonOutput, sessionContext); if (message.isResponseExpected()) { - return receiveCommandMessageResponse(message, decoder, commandEventSender, sessionContext); + return receiveCommandMessageResponse(decoder, commandEventSender, sessionContext, 0); } else { commandEventSender.sendSucceededEventForOneWayCommand(); return null; @@ -284,6 +287,40 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod throw e; } } + + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) { + message.encode(bsonOutput, sessionContext); + sendCommandMessage(message, bsonOutput, sessionContext); + if (message.isResponseExpected()) { + hasMoreToCome = true; + } + } + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + isTrue("Response is expected", hasMoreToCome); + return receiveCommandMessageResponse(decoder, null, sessionContext, 0); + } + + @Override + public boolean supportsAdditionalTimeout() { + return stream.supportsAdditionalTimeout(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext, final int additionalTimeout) { + isTrue("Response is expected", hasMoreToCome); + return receiveCommandMessageResponse(decoder, null, sessionContext, additionalTimeout); + } + + @Override + public boolean hasMoreToCome() { + return hasMoreToCome; + } + private void sendCommandMessage(final CommandMessage message, final ByteBufferBsonOutput bsonOutput, final SessionContext sessionContext) { if (sendCompressor == null || SECURITY_SENSITIVE_COMMANDS.contains(message.getCommandDocument(bsonOutput).getFirstKey())) { @@ -310,23 +347,32 @@ private void sendCommandMessage(final CommandMessage message, compressedBsonOutput.close(); } } + responseTo = message.getId(); } - private T receiveCommandMessageResponse(final CommandMessage message, final Decoder decoder, - final CommandEventSender commandEventSender, final SessionContext sessionContext) { - ResponseBuffers responseBuffers = receiveMessage(message.getId()); - try { + private T receiveCommandMessageResponse(final Decoder decoder, + final CommandEventSender commandEventSender, final SessionContext sessionContext, + final int additionalTimeout) { + try (ResponseBuffers responseBuffers = receiveMessageWithAdditionalTimeout(additionalTimeout)) { updateSessionContext(sessionContext, responseBuffers); if (!isCommandOk(responseBuffers)) { - throw getCommandFailureException(responseBuffers.getResponseDocument(message.getId(), new BsonDocumentCodec()), + throw getCommandFailureException(responseBuffers.getResponseDocument(responseTo, new BsonDocumentCodec()), description.getServerAddress()); } - commandEventSender.sendSucceededEvent(responseBuffers); + if (commandEventSender != null) { + commandEventSender.sendSucceededEvent(responseBuffers); + } - return getCommandResult(decoder, responseBuffers, message.getId()); - } finally { - responseBuffers.close(); + T commandResult = getCommandResult(decoder, responseBuffers, responseTo); + + hasMoreToCome = responseBuffers.getReplyHeader().hasMoreToCome(); + if (hasMoreToCome) { + responseTo = responseBuffers.getReplyHeader().getRequestId(); + } else { + responseTo = 0; + } + return commandResult; } } @@ -459,8 +505,12 @@ public ResponseBuffers receiveMessage(final int responseTo) { throw new MongoSocketClosedException("Cannot read from a closed stream", getServerAddress()); } + return receiveMessageWithAdditionalTimeout(0); + } + + private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additionalTimeout) { try { - return receiveResponseBuffers(); + return receiveResponseBuffers(additionalTimeout); } catch (Throwable t) { close(); throw translateReadException(t); @@ -594,8 +644,8 @@ private MongoException translateReadException(final Throwable e) { } } - private ResponseBuffers receiveResponseBuffers() throws IOException { - ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH); + private ResponseBuffers receiveResponseBuffers(final int additionalTimeout) throws IOException { + ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, additionalTimeout); MessageHeader messageHeader; try { messageHeader = new MessageHeader(messageHeaderBuffer, description.getMaxMessageSize()); @@ -603,7 +653,7 @@ private ResponseBuffers receiveResponseBuffers() throws IOException { messageHeaderBuffer.release(); } - ByteBuf messageBuffer = stream.read(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH); + ByteBuf messageBuffer = stream.read(messageHeader.getMessageLength() - MESSAGE_HEADER_LENGTH, additionalTimeout); if (messageHeader.getOpCode() == OP_COMPRESSED.getValue()) { CompressedHeader compressedHeader = new CompressedHeader(messageBuffer, messageHeader); diff --git a/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java b/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java index 5b2053f9385..7a4def09d2f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ReplyHeader.java @@ -190,8 +190,11 @@ public boolean isQueryFailure() { return (responseFlags & QUERY_FAILURE_RESPONSE_FLAG) == QUERY_FAILURE_RESPONSE_FLAG; } - // for unit testing - int getOpMsgFlagBits() { + public int getOpMsgFlagBits() { return opMsgFlagBits; } + + public boolean hasMoreToCome() { + return (opMsgFlagBits & (1 << 1)) != 0; + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitor.java index 97c5d01f5cd..5ccb9c81ebe 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ServerMonitor.java @@ -24,4 +24,5 @@ interface ServerMonitor { void close(); + void cancelCurrentCheck(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java index 6b9a7af60ab..9e704dd157f 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java @@ -116,6 +116,24 @@ public ByteBuf read(final int numBytes) throws IOException { return buffer; } + @Override + public boolean supportsAdditionalTimeout() { + return true; + } + + @Override + public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException { + int curTimeout = socket.getSoTimeout(); + if (curTimeout > 0 && additionalTimeout > 0) { + socket.setSoTimeout(curTimeout + additionalTimeout); + } + try { + return read(numBytes); + } finally { + socket.setSoTimeout(curTimeout); + } + } + @Override public void openAsync(final AsyncCompletionHandler handler) { throw new UnsupportedOperationException(getClass() + " does not support asynchronous operations."); diff --git a/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java b/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java index 29677a73e79..0e5293169f2 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java +++ b/driver-core/src/main/com/mongodb/internal/connection/UsageTrackingInternalConnection.java @@ -102,6 +102,36 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod return result; } + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + wrapped.send(message, decoder, sessionContext); + lastUsedAt = System.currentTimeMillis(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + T result = wrapped.receive(decoder, sessionContext); + lastUsedAt = System.currentTimeMillis(); + return result; + } + + @Override + public boolean supportsAdditionalTimeout() { + return wrapped.supportsAdditionalTimeout(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext, final int additionalTimeout) { + T result = wrapped.receive(decoder, sessionContext, additionalTimeout); + lastUsedAt = System.currentTimeMillis(); + return result; + } + + @Override + public boolean hasMoreToCome() { + return wrapped.hasMoreToCome(); + } + @Override public void sendAndReceiveAsync(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext, final SingleResultCallback callback) { diff --git a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java index 641764fec99..916b1c6ef96 100644 --- a/driver-core/src/test/functional/com/mongodb/ClusterFixture.java +++ b/driver-core/src/test/functional/com/mongodb/ClusterFixture.java @@ -343,7 +343,7 @@ private static Cluster createCluster(final ConnectionString connectionString, fi ServerSettings.builder().build(), ConnectionPoolSettings.builder().applyConnectionString(connectionString).build(), streamFactory, - new SocketStreamFactory(SocketSettings.builder().build(), getSslSettings(connectionString)), + new SocketStreamFactory(SocketSettings.builder().readTimeout(5, SECONDS).build(), getSslSettings(connectionString)), connectionString.getCredential(), null, null, null, connectionString.getCompressorList()); @@ -369,6 +369,19 @@ public static StreamFactory getAsyncStreamFactory() { } } + @Nullable + public static StreamFactoryFactory getOverriddenStreamFactoryFactory() { + String streamType = System.getProperty("org.mongodb.test.async.type", "nio2"); + + if (streamType.equals("netty")) { + if (nettyStreamFactoryFactory == null) { + nettyStreamFactoryFactory = NettyStreamFactoryFactory.builder().build(); + } + return nettyStreamFactoryFactory; + } + return null; + } + private static SocketSettings getSocketSettings() { return SocketSettings.builder().applyConnectionString(getConnectionString()).build(); } diff --git a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java index 3665b63fbc3..1656abd889f 100644 --- a/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java +++ b/driver-core/src/test/functional/com/mongodb/client/test/CollectionHelper.java @@ -48,6 +48,7 @@ import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonDocumentWrapper; +import org.bson.BsonInt32; import org.bson.BsonInt64; import org.bson.BsonString; import org.bson.Document; @@ -229,7 +230,9 @@ public List find() { } public List find(final Codec codec) { - BatchCursor cursor = new FindOperation(namespace, codec).execute(getBinding()); + BatchCursor cursor = new FindOperation(namespace, codec) + .sort(new BsonDocument("_id", new BsonInt32(1))) + .execute(getBinding()); List results = new ArrayList(); while (cursor.hasNext()) { results.addAll(cursor.next()); diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/cancel-server-check.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/cancel-server-check.json new file mode 100644 index 00000000000..95863509595 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/cancel-server-check.json @@ -0,0 +1,130 @@ +{ + "runOn": [ + { + "minServerVersion": "4.0", + "topology": [ + "replicaset" + ] + }, + { + "minServerVersion": "4.2", + "topology": [ + "sharded" + ] + } + ], + "database_name": "sdam-tests", + "collection_name": "cancel-server-check", + "data": [], + "tests": [ + { + "description": "Cancel server check", + "clientOptions": { + "retryWrites": true, + "heartbeatFrequencyMS": 10000, + "serverSelectionTimeoutMS": 5000, + "appname": "cancelServerCheckTest" + }, + "operations": [ + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "closeConnection": true + } + } + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 2 + } + }, + "result": { + "insertedId": 2 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 3 + } + }, + "result": { + "insertedId": 3 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-network-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-network-error.json new file mode 100644 index 00000000000..4db2634cd62 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-network-error.json @@ -0,0 +1,144 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "find-network-error", + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ], + "tests": [ + { + "description": "Reset server and pool after network error on find", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "find" + ], + "closeConnection": true, + "appName": "findNetworkErrorTest" + } + }, + "clientOptions": { + "retryWrites": false, + "retryReads": false, + "appname": "findNetworkErrorTest" + }, + "operations": [ + { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + }, + "error": true + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "find": "find-network-error" + }, + "command_name": "find", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "find-network-error", + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-shutdown-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-shutdown-error.json new file mode 100644 index 00000000000..65de8398b13 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/find-shutdown-error.json @@ -0,0 +1,168 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "find-shutdown-error", + "data": [], + "tests": [ + { + "description": "Concurrent shutdown error on find", + "clientOptions": { + "retryWrites": false, + "retryReads": false, + "heartbeatFrequencyMS": 500, + "appname": "shutdownErrorFindTest" + }, + "operations": [ + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "find" + ], + "appName": "shutdownErrorFindTest", + "errorCode": 91, + "blockConnection": true, + "blockTimeMS": 500 + } + } + } + }, + { + "name": "startThread", + "object": "testRunner", + "arguments": { + "name": "thread1" + } + }, + { + "name": "startThread", + "object": "testRunner", + "arguments": { + "name": "thread2" + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "name": "thread1", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + }, + "error": true + } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "name": "thread2", + "operation": { + "name": "find", + "object": "collection", + "arguments": { + "filter": { + "_id": 1 + } + }, + "error": true + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "name": "thread1" + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "name": "thread2" + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 4 + } + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-network-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-network-error.json new file mode 100644 index 00000000000..fa8bb253e12 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-network-error.json @@ -0,0 +1,156 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "insert-network-error", + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ], + "tests": [ + { + "description": "Reset server and pool after network error on insert", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 1 + }, + "data": { + "failCommands": [ + "insert" + ], + "closeConnection": true, + "appName": "insertNetworkErrorTest" + } + }, + "clientOptions": { + "retryWrites": false, + "appname": "insertNetworkErrorTest" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "error": true + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "insert-network-error", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "insert-network-error", + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-shutdown-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-shutdown-error.json new file mode 100644 index 00000000000..edde149a91b --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/insert-shutdown-error.json @@ -0,0 +1,167 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "insert-shutdown-error", + "data": [], + "tests": [ + { + "description": "Concurrent shutdown error on insert", + "clientOptions": { + "retryWrites": false, + "heartbeatFrequencyMS": 500, + "appname": "shutdownErrorInsertTest" + }, + "operations": [ + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 1 + } + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "insert" + ], + "appName": "shutdownErrorInsertTest", + "errorCode": 91, + "blockConnection": true, + "blockTimeMS": 500 + } + } + } + }, + { + "name": "startThread", + "object": "testRunner", + "arguments": { + "name": "thread1" + } + }, + { + "name": "startThread", + "object": "testRunner", + "arguments": { + "name": "thread2" + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "name": "thread1", + "operation": { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 2 + } + }, + "error": true + } + } + }, + { + "name": "runOnThread", + "object": "testRunner", + "arguments": { + "name": "thread2", + "operation": { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 3 + } + }, + "error": true + } + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "name": "thread1" + } + }, + { + "name": "waitForThread", + "object": "testRunner", + "arguments": { + "name": "thread2" + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertOne", + "object": "collection", + "arguments": { + "document": { + "_id": 4 + } + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-command-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-command-error.json new file mode 100644 index 00000000000..4bdfd9adffc --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-command-error.json @@ -0,0 +1,245 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "isMaster-command-error", + "data": [], + "tests": [ + { + "description": "Command error on Monitor handshake", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "commandErrorHandshakeTest", + "closeConnection": false, + "errorCode": 91 + } + }, + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 250, + "heartbeatFrequencyMS": 500, + "appname": "commandErrorHandshakeTest" + }, + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-command-error", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "Command error on Monitor check", + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 1000, + "heartbeatFrequencyMS": 500, + "appname": "commandErrorCheckTest" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "commandErrorCheckTest", + "closeConnection": false, + "blockConnection": true, + "blockTimeMS": 750, + "errorCode": 91 + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-command-error", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "isMaster-command-error", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-network-error.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-network-error.json new file mode 100644 index 00000000000..eb1f3eac197 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-network-error.json @@ -0,0 +1,225 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "isMaster-network-error", + "data": [], + "tests": [ + { + "description": "Network error on Monitor handshake", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "networkErrorHandshakeTest", + "closeConnection": true + } + }, + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 250, + "heartbeatFrequencyMS": 500, + "appname": "networkErrorHandshakeTest" + }, + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-network-error", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "Network error on Monitor check", + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 250, + "heartbeatFrequencyMS": 500, + "appname": "networkErrorCheckTest" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "networkErrorCheckTest", + "closeConnection": true + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-network-error", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "isMaster-network-error", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-timeout.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-timeout.json new file mode 100644 index 00000000000..eeee612be87 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/isMaster-timeout.json @@ -0,0 +1,359 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4" + } + ], + "database_name": "sdam-tests", + "collection_name": "isMaster-timeout", + "data": [], + "tests": [ + { + "description": "Network timeout on Monitor handshake", + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "timeoutMonitorHandshakeTest", + "blockConnection": true, + "blockTimeMS": 1000 + } + }, + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 250, + "heartbeatFrequencyMS": 500, + "appname": "timeoutMonitorHandshakeTest" + }, + "operations": [ + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-timeout", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + } + }, + { + "description": "Network timeout on Monitor check", + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 750, + "heartbeatFrequencyMS": 500, + "appname": "timeoutMonitorCheckTest" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + }, + { + "name": "configureFailPoint", + "object": "testRunner", + "arguments": { + "failPoint": { + "configureFailPoint": "failCommand", + "mode": { + "times": 2 + }, + "data": { + "failCommands": [ + "isMaster" + ], + "appName": "timeoutMonitorCheckTest", + "blockConnection": true, + "blockTimeMS": 1000 + } + } + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "waitForEvent", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 1 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 1 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-timeout", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "isMaster-timeout", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + }, + { + "description": "Driver extends timeout while streaming", + "clientOptions": { + "retryWrites": false, + "connectTimeoutMS": 250, + "heartbeatFrequencyMS": 500, + "appname": "extendsTimeoutTest" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + } + }, + { + "name": "wait", + "object": "testRunner", + "arguments": { + "ms": 2000 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "ServerMarkedUnknownEvent", + "count": 0 + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 0 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "isMaster-timeout", + "documents": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "isMaster-timeout", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring-integration/rediscover-quickly-after-step-down.json b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/rediscover-quickly-after-step-down.json new file mode 100644 index 00000000000..d17146cc1d4 --- /dev/null +++ b/driver-core/src/test/resources/server-discovery-and-monitoring-integration/rediscover-quickly-after-step-down.json @@ -0,0 +1,152 @@ +{ + "runOn": [ + { + "minServerVersion": "4.4", + "topology": [ + "replicaset" + ] + } + ], + "database_name": "sdam-tests", + "collection_name": "test-replSetStepDown", + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + } + ], + "tests": [ + { + "description": "Rediscover quickly after replSetStepDown", + "clientOptions": { + "appname": "replSetStepDownTest", + "heartbeatFrequencyMS": 60000, + "serverSelectionTimeoutMS": 5000, + "w": "majority" + }, + "operations": [ + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + } + }, + { + "name": "recordPrimary", + "object": "testRunner" + }, + { + "name": "runAdminCommand", + "object": "testRunner", + "command_name": "replSetStepDown", + "arguments": { + "command": { + "replSetStepDown": 1, + "secondaryCatchUpPeriodSecs": 1, + "force": false + } + } + }, + { + "name": "waitForPrimaryChange", + "object": "testRunner", + "arguments": { + "timeoutMS": 5000 + } + }, + { + "name": "insertMany", + "object": "collection", + "arguments": { + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + }, + { + "name": "assertEventCount", + "object": "testRunner", + "arguments": { + "event": "PoolClearedEvent", + "count": 0 + } + } + ], + "expectations": [ + { + "command_started_event": { + "command": { + "insert": "test-replSetStepDown", + "documents": [ + { + "_id": 3 + }, + { + "_id": 4 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + }, + { + "command_started_event": { + "command": { + "insert": "test-replSetStepDown", + "documents": [ + { + "_id": 5 + }, + { + "_id": 6 + } + ] + }, + "command_name": "insert", + "database_name": "sdam-tests" + } + } + ], + "outcome": { + "collection": { + "data": [ + { + "_id": 1 + }, + { + "_id": 2 + }, + { + "_id": 3 + }, + { + "_id": 4 + }, + { + "_id": 5 + }, + { + "_id": 6 + } + ] + } + } + } + ] +} diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring/README.rst b/driver-core/src/test/resources/server-discovery-and-monitoring/README.rst index 2ad4c183ecc..2d90d592c29 100644 --- a/driver-core/src/test/resources/server-discovery-and-monitoring/README.rst +++ b/driver-core/src/test/resources/server-discovery-and-monitoring/README.rst @@ -2,10 +2,17 @@ Server Discovery And Monitoring Tests ===================================== +.. contents:: + +---- + The YAML and JSON files in this directory tree are platform-independent tests that drivers can use to prove their conformance to the Server Discovery And Monitoring Spec. +Additional prose tests, that cannot be represented as spec tests, are +described and MUST be implemented. + Version ------- @@ -99,7 +106,6 @@ A "pool" object represents a correct connection pool for a given server. It has the following keys: - generation: This server's expected pool generation, like ``0``. -- topologyVersion: absent, null, or a topologyVersion document. In monitoring tests, an "outcome" contains a list of SDAM events that should have been published by the client as a result of processing ismaster responses @@ -118,11 +124,6 @@ Mocking ~~~~~~~ Drivers should be able to test their server discovery and monitoring logic -without any network I/O, by parsing ismaster responses from the test file -and passing them into the driver code. Parts of the client and monitoring -code may need to be mocked or subclassed to achieve this. `A reference -implementation for PyMongo 3.x is available here -`_. without any network I/O, by parsing ismaster and application error from the test file and passing them into the driver code. Parts of the client and monitoring code may need to be mocked or subclassed to achieve this. @@ -182,3 +183,255 @@ the driver's current TopologyDescription or ServerDescription. For monitoring tests, clear the list of events collected so far. Continue until all phases have been executed. + +Integration Tests +----------------- + +Integration tests are provided in the "integration" directory. + +Test Format +~~~~~~~~~~~ + +The same as the `Transactions Spec Test format +`_. + +Special Test Operations +~~~~~~~~~~~~~~~~~~~~~~~ + +Certain operations that appear in the "operations" array do not correspond to +API methods but instead represent special test operations. Such operations are +defined on the "testRunner" object and are documented in the +`Transactions Spec Test +`_. + +Additional, SDAM test specific operations are documented here: + +configureFailPoint +'''''''''''''''''' + +The "configureFailPoint" operation instructs the test runner to configure +the given server failpoint on the "admin" database. The runner MUST disable +this failpoint at the end of the test. For example:: + + - name: configureFailPoint + object: testRunner + arguments: + failPoint: + configureFailPoint: failCommand + mode: { times: 1 } + data: + failCommands: ["insert"] + closeConnection: true + +Tests that use the "configureFailPoint" operation do not include +``configureFailPoint`` commands in their command expectations. Drivers MUST +ensure that ``configureFailPoint`` commands do not appear in the list of logged +commands, either by manually filtering it from the list of observed commands or +by using a different MongoClient to execute ``configureFailPoint``. + +wait +'''' + +The "wait" operation instructs the test runner to sleep for "ms" +milliseconds. For example:: + + - name: wait + object: testRunner + arguments: + ms: 1000 + +waitForEvent +'''''''''''' + +The "waitForEvent" operation instructs the test runner to wait until the test's +MongoClient has published a specific event a given number of times. For +example, the following instructs the test runner to wait for at least one +PoolClearedEvent to be published:: + + - name: waitForEvent + object: testRunner + arguments: + event: PoolClearedEvent + count: 1 + +Note that "count" includes events that were published while running previous +operations. + +ServerMarkedUnknownEvent +```````````````````````` + +The ServerMarkedUnknownEvent may appear as an event in `waitForEvent`_ and +`assertEventCount`_. This event is defined as ServerDescriptionChangedEvent +where newDescription.type is ``Unknown``. + +assertEventCount +'''''''''''''''' + +The "assertEventCount" operation instructs the test runner to assert the test's +MongoClient has published a specific event a given number of times. For +example, the following instructs the test runner to assert that a single +PoolClearedEvent was published:: + + - name: waitForEvent + object: testRunner + arguments: + event: PoolClearedEvent + count: 1 + +recordPrimary +''''''''''''' + +The "recordPrimary" operation instructs the test runner to record the current +primary of the test's MongoClient. For example:: + + - name: recordPrimary + object: testRunner + +runAdminCommand +''''''''''''''' + +The "runAdminCommand" operation instructs the test runner to run the given +command on the admin database. Drivers MUST run this command on a different +MongoClient from the one used for test operations. For example:: + + - name: runAdminCommand + object: testRunner + command_name: replSetStepDown + arguments: + command: + replSetStepDown: 20 + force: false + +waitForPrimaryChange +'''''''''''''''''''' + +The "waitForPrimaryChange" operation instructs the test runner to wait up to +"timeoutMS" milliseconds for the MongoClient to discover a new primary server. +The new primary should be different from the one recorded by "recordPrimary". +For example:: + + - name: waitForPrimaryChange + object: testRunner + arguments: + timeoutMS: 15000 + +To implement, Drivers can subscribe to ServerDescriptionChangedEvents and wait +for an event where newDescription.type is ``RSPrimary`` and the address is +different from the one previously recorded by "recordPrimary". + +startThread +''''''''''' + +The "startThread" operation instructs the test runner to start a new thread +with the provided "name". The `runOnThread`_ and `waitForThread`_ operations +reference a thread by its "name". For example:: + + - name: startThread + object: testRunner + arguments: + name: thread1 + +runOnThread +''''''''''' + +The "runOnThread" operation instructs the test runner to schedule an operation +to be run on the given thread. runOnThread MUST NOT wait for the scheduled +operation to complete. For example:: + + - name: runOnThread + object: testRunner + arguments: + name: thread1 + operation: + name: insertOne + object: collection + arguments: + document: + _id: 2 + error: true + +waitForThread +''''''''''''' + +The "waitForThread" operation instructs the test runner to stop the given +thread, wait for it to complete, and assert that the thread exited without +any errors. For example:: + + - name: waitForThread + object: testRunner + arguments: + name: thread1 + +Prose Tests +----------- + +The following prose tests cannot be represented as spec tests and MUST be +implemented. + +Streaming protocol Tests +~~~~~~~~~~~~~~~~~~~~~~~~ + +Drivers that implement the streaming protocol (multi-threaded or +asynchronous drivers) must implement the following tests. Each test should be +run against a standalone, replica set, and sharded cluster unless otherwise +noted. + +Some of these cases should already be tested with the old protocol; in +that case just verify the test cases succeed with the new protocol. + +1. Configure the client with heartbeatFrequencyMS set to 500, + overriding the default of 10000. Assert the client processes + isMaster replies more frequently (approximately every 500ms). + +RTT Tests +~~~~~~~~~ + +Run the following test(s) on MongoDB 4.4+. + +1. Test that RTT is continuously updated. + + #. Create a client with ``heartbeatFrequencyMS=500``, + ``appName=streamingRttTest``, and subscribe to server events. + + #. Run a find command to wait for the server to be discovered. + + #. Sleep for 2 seconds. This must be long enough for multiple heartbeats + to succeed. + + #. Assert that each ``ServerDescriptionChangedEvent`` includes a non-zero + RTT. + + #. Configure the following failpoint to block isMaster commands for 250ms + which should add extra latency to each RTT check:: + + db.adminCommand({ + configureFailPoint: "failCommand", + mode: {times: 1000}, + data: { + failCommands: ["isMaster"], + blockConnection: true, + blockTimeMS: 500, + appName: "streamingRttTest", + }, + }); + + #. Wait for the server's RTT to exceed 250ms. Eventually the average RTT + should also exceed 500ms but we use 250ms to speed up the test. Note + that the `Server Description Equality`_ rule means that + ServerDescriptionChangedEvents will not be published. This test may + need to use a driver specific helper to obtain the latest RTT instead. + + #. Disable the failpoint:: + + db.adminCommand({ + configureFailPoint: "failCommand", + mode: "off", + }); + +.. Section for links. + +.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality + +.. Section for links. + +.. _Server Description Equality: /source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#server-description-equality diff --git a/driver-core/src/test/resources/server-discovery-and-monitoring/rs/error_handling_handshake.json b/driver-core/src/test/resources/server-discovery-and-monitoring/rs/error_handling_handshake.json deleted file mode 100644 index cdd6df62474..00000000000 --- a/driver-core/src/test/resources/server-discovery-and-monitoring/rs/error_handling_handshake.json +++ /dev/null @@ -1,112 +0,0 @@ -{ - "description": "Network timeouts before and after the handshake completes", - "uri": "mongodb://a/?replicaSet=rs", - "phases": [ - { - "description": "Primary A is discovered", - "responses": [ - [ - "a:27017", - { - "ok": 1, - "ismaster": true, - "hosts": [ - "a:27017" - ], - "setName": "rs", - "minWireVersion": 0, - "maxWireVersion": 9, - "topologyVersion": { - "processId": { - "$oid": "000000000000000000000001" - }, - "counter": { - "$numberLong": "1" - } - } - } - ] - ], - "outcome": { - "servers": { - "a:27017": { - "type": "RSPrimary", - "setName": "rs", - "topologyVersion": { - "processId": { - "$oid": "000000000000000000000001" - }, - "counter": { - "$numberLong": "1" - } - }, - "pool": { - "generation": 0 - } - } - }, - "topologyType": "ReplicaSetWithPrimary", - "logicalSessionTimeoutMinutes": null, - "setName": "rs" - } - }, - { - "description": "Ignore network timeout application error (afterHandshakeCompletes)", - "applicationErrors": [ - { - "address": "a:27017", - "when": "afterHandshakeCompletes", - "maxWireVersion": 9, - "type": "timeout" - } - ], - "outcome": { - "servers": { - "a:27017": { - "type": "RSPrimary", - "setName": "rs", - "topologyVersion": { - "processId": { - "$oid": "000000000000000000000001" - }, - "counter": { - "$numberLong": "1" - } - }, - "pool": { - "generation": 0 - } - } - }, - "topologyType": "ReplicaSetWithPrimary", - "logicalSessionTimeoutMinutes": null, - "setName": "rs" - } - }, - { - "description": "Mark server unknown on network timeout application error (beforeHandshakeCompletes)", - "applicationErrors": [ - { - "address": "a:27017", - "when": "beforeHandshakeCompletes", - "maxWireVersion": 9, - "type": "timeout" - } - ], - "outcome": { - "servers": { - "a:27017": { - "type": "Unknown", - "topologyVersion": null, - "pool": { - "generation": 1 - } - } - }, - "topologyType": "ReplicaSetNoPrimary", - "logicalSessionTimeoutMinutes": null, - "setName": "rs" - } - } - ] -} diff --git a/driver-core/src/test/unit/com/mongodb/event/ServerHeartbeatEventSpecification.groovy b/driver-core/src/test/unit/com/mongodb/event/ServerHeartbeatEventSpecification.groovy index 8ea7145b0e2..d86910f5dea 100644 --- a/driver-core/src/test/unit/com/mongodb/event/ServerHeartbeatEventSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/event/ServerHeartbeatEventSpecification.groovy @@ -28,14 +28,15 @@ class ServerHeartbeatEventSpecification extends Specification { def 'should fail if elapsed time is negative'() { when: new ServerHeartbeatSucceededEvent(new ConnectionId(new ServerId(new ClusterId(), new ServerAddress())), - new BsonDocument('ok', new BsonInt32(1)), -1) + new BsonDocument('ok', new BsonInt32(1)), -1, false) then: def e = thrown(IllegalArgumentException) e.getMessage() == 'state should be: elapsed time is not negative' when: - new ServerHeartbeatFailedEvent(new ConnectionId(new ServerId(new ClusterId(), new ServerAddress())), -1, new Throwable()) + new ServerHeartbeatFailedEvent(new ConnectionId(new ServerId(new ClusterId(), new ServerAddress())), -1, false, + new Throwable()) then: e = thrown(IllegalArgumentException) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java index 45583e641a6..71ab59c0a99 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java @@ -42,6 +42,8 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.connection.ServerConnectionState.CONNECTING; +import static com.mongodb.internal.connection.ClusterableServer.ConnectionState.AFTER_HANDSHAKE; +import static com.mongodb.internal.connection.ClusterableServer.ConnectionState.BEFORE_HANDSHAKE; import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; import static com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException; import static org.junit.Assert.assertEquals; @@ -50,7 +52,6 @@ public class AbstractServerDiscoveryAndMonitoringTest { private final BsonDocument definition; private DefaultTestClusterableServerFactory factory; private BaseCluster cluster; - private TestInternalConnectionFactory internalConnectionFactory; public AbstractServerDiscoveryAndMonitoringTest(final BsonDocument definition) { this.definition = definition; @@ -105,10 +106,10 @@ protected void applyApplicationError(final BsonDocument applicationError) { switch (when) { case "beforeHandshakeCompletes": - server.invalidate(errorGeneration); + server.invalidate(BEFORE_HANDSHAKE, exception, errorGeneration, maxWireVersion); break; case "afterHandshakeCompletes": - server.invalidate(exception, errorGeneration, maxWireVersion); + server.invalidate(AFTER_HANDSHAKE, exception, errorGeneration, maxWireVersion); break; default: throw new UnsupportedOperationException("Unsupported `when` value: " + when); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy index af18fba58f0..4128eba4453 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageSpecification.groovy @@ -24,8 +24,8 @@ import com.mongodb.connection.ClusterConnectionMode import com.mongodb.connection.ServerType import com.mongodb.internal.bulk.InsertRequest import com.mongodb.internal.bulk.WriteRequestWithIndex -import com.mongodb.internal.validator.NoOpFieldNameValidator import com.mongodb.internal.session.SessionContext +import com.mongodb.internal.validator.NoOpFieldNameValidator import org.bson.BsonArray import org.bson.BsonBinary import org.bson.BsonBinaryReader @@ -61,9 +61,9 @@ class CommandMessageSpecification extends Specification { def message = new CommandMessage(namespace, command, fieldNameValidator, readPreference, MessageSettings.builder() .maxWireVersion(THREE_DOT_SIX_WIRE_VERSION) - .serverType(serverType) + .serverType(serverType as ServerType) .build(), - responseExpected, null, null, clusterConnectionMode) + responseExpected, exhaustAllowed, null, null, clusterConnectionMode) def output = new BasicOutputBuffer() when: @@ -76,7 +76,8 @@ class CommandMessageSpecification extends Specification { messageHeader.opCode == OpCode.OP_MSG.value replyHeader.requestId < RequestMessage.currentGlobalId replyHeader.responseTo == 0 - replyHeader.opMsgFlagBits == (responseExpected ? 0 : 2) + ((replyHeader.opMsgFlagBits & (1 << 16)) != 0) == exhaustAllowed + ((replyHeader.opMsgFlagBits & (1 << 1)) == 0) == responseExpected def expectedCommandDocument = command.clone() .append('$db', new BsonString(namespace.databaseName)) @@ -96,7 +97,7 @@ class CommandMessageSpecification extends Specification { getCommandDocument(byteBuf, replyHeader) == expectedCommandDocument where: - [readPreference, serverType, clusterConnectionMode, sessionContext, responseExpected] << [ + [readPreference, serverType, clusterConnectionMode, sessionContext, responseExpected, exhaustAllowed] << [ [ReadPreference.primary(), ReadPreference.secondary()], [ServerType.REPLICA_SET_PRIMARY, ServerType.SHARD_ROUTER], [ClusterConnectionMode.SINGLE, ClusterConnectionMode.MULTIPLE], @@ -125,6 +126,7 @@ class CommandMessageSpecification extends Specification { getReadConcern() >> ReadConcern.DEFAULT } ], + [true, false], [true, false] ].combinations() } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy index 9619ee4310a..45a2acfa034 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy @@ -138,7 +138,11 @@ class DefaultServerMonitorSpecification extends Specification { initialServerDescription } - sendAndReceive(_, _, _) >> { + supportsAdditionalTimeout() >> true + + send(_, _, _) >> { } + + receive(_, _) >> { BsonDocument.parse(isMasterResponse) } } @@ -221,7 +225,11 @@ class DefaultServerMonitorSpecification extends Specification { initialServerDescription } - sendAndReceive(_, _, _) >> { + supportsAdditionalTimeout() >> true + + send(_, _, _) >> { } + + receive(_, _) >> { throw exception } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index c629558e1f1..2fb788f3e69 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -178,7 +178,7 @@ class DefaultServerSpecification extends Specification { def e = thrown(MongoException) e.is(exceptionToThrow) 1 * connectionPool.invalidate() - 1 * serverMonitor.connect() + 1 * serverMonitor.cancelCurrentCheck() where: exceptionToThrow << [ @@ -240,7 +240,7 @@ class DefaultServerSpecification extends Specification { !receivedConnection receivedThrowable.is(exceptionToThrow) 1 * connectionPool.invalidate() - 1 * serverMonitor.connect() + 1 * serverMonitor.cancelCurrentCheck() where: @@ -397,7 +397,7 @@ class DefaultServerSpecification extends Specification { then: thrown(MongoSocketException) 1 * connectionPool.invalidate() - 1 * serverMonitor.connect() + 1 * serverMonitor.cancelCurrentCheck() when: def futureResultCallback = new FutureResultCallback() @@ -408,7 +408,7 @@ class DefaultServerSpecification extends Specification { then: thrown(MongoSocketException) 1 * connectionPool.invalidate() - 1 * serverMonitor.connect() + 1 * serverMonitor.cancelCurrentCheck() } def 'should not invalidate on MongoSocketReadTimeoutException'() { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy index b8664fa624f..10cdbf1bfd7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionSpecification.groovy @@ -247,7 +247,7 @@ class InternalStreamConnectionSpecification extends Specification { def 'should close the stream when reading the message header throws an exception'() { given: - stream.read(16) >> { throw new IOException('Something went wrong') } + stream.read(16, 0) >> { throw new IOException('Something went wrong') } def connection = getOpenedConnection() def (buffers1, messageId1) = helper.isMaster() @@ -271,7 +271,7 @@ class InternalStreamConnectionSpecification extends Specification { def 'should throw MongoInternalException when reply header message length > max message length'() { given: - stream.read(36) >> { helper.headerWithMessageSizeGreaterThanMax(1) } + stream.read(36, 0) >> { helper.headerWithMessageSizeGreaterThanMax(1) } def connection = getOpenedConnection() @@ -344,8 +344,8 @@ class InternalStreamConnectionSpecification extends Specification { def 'should close the stream when reading the message body throws an exception'() { given: - stream.read(16) >> helper.defaultMessageHeader(1) - stream.read(90) >> { throw new IOException('Something went wrong') } + stream.read(16, 0) >> helper.defaultMessageHeader(1) + stream.read(90, 0) >> { throw new IOException('Something went wrong') } def connection = getOpenedConnection() @@ -406,8 +406,8 @@ class InternalStreamConnectionSpecification extends Specification { def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings) def response = '{ok : 0, errmsg : "failed"}' stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.messageHeader(commandMessage.getId(), response) - stream.read(_) >> helper.reply(response) + stream.read(16, 0) >> helper.messageHeader(commandMessage.getId(), response) + stream.read(_, 0) >> helper.reply(response) when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -485,8 +485,8 @@ class InternalStreamConnectionSpecification extends Specification { def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings) stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.defaultMessageHeader(commandMessage.getId()) - stream.read(90) >> helper.defaultReply() + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) + stream.read(90, 0) >> helper.defaultReply() when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -510,8 +510,8 @@ class InternalStreamConnectionSpecification extends Specification { $clusterTime : { clusterTime : { $timestamp : { "t" : 42, "i" : 21 } } } }''' stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.defaultMessageHeader(commandMessage.getId()) - stream.read(_) >> helper.reply(response) + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) + stream.read(_, 0) >> helper.reply(response) def sessionContext = Mock(SessionContext) { 1 * advanceOperationTime(BsonDocument.parse(response).getTimestamp('operationTime')) 1 * advanceClusterTime(BsonDocument.parse(response).getDocument('$clusterTime')) @@ -585,7 +585,7 @@ class InternalStreamConnectionSpecification extends Specification { def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings) stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> { throw new MongoSocketReadException('Failed to read', serverAddress) } + stream.read(16, 0) >> { throw new MongoSocketReadException('Failed to read', serverAddress) } when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -604,8 +604,8 @@ class InternalStreamConnectionSpecification extends Specification { def pingCommandDocument = new BsonDocument('ping', new BsonInt32(1)) def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings) stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.defaultMessageHeader(commandMessage.getId()) - stream.read(90) >> { throw new MongoSocketReadException('Failed to read', serverAddress) } + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) + stream.read(90, 0) >> { throw new MongoSocketReadException('Failed to read', serverAddress) } when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -625,8 +625,8 @@ class InternalStreamConnectionSpecification extends Specification { def commandMessage = new CommandMessage(cmdNamespace, pingCommandDocument, fieldNameValidator, primary(), messageSettings) def response = '{ok : 0, errmsg : "failed"}' stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.messageHeader(commandMessage.getId(), response) - stream.read(_) >> helper.reply(response) + stream.read(16, 0) >> helper.messageHeader(commandMessage.getId(), response) + stream.read(_, 0) >> helper.reply(response) when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -645,8 +645,8 @@ class InternalStreamConnectionSpecification extends Specification { def connection = getOpenedConnection() def commandMessage = new CommandMessage(cmdNamespace, securitySensitiveCommand, fieldNameValidator, primary(), messageSettings) stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.defaultMessageHeader(commandMessage.getId()) - stream.read(90) >> helper.defaultReply() + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) + stream.read(90, 0) >> helper.defaultReply() when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) @@ -677,8 +677,8 @@ class InternalStreamConnectionSpecification extends Specification { def connection = getOpenedConnection() def commandMessage = new CommandMessage(cmdNamespace, securitySensitiveCommand, fieldNameValidator, primary(), messageSettings) stream.getBuffer(1024) >> { new ByteBufNIO(ByteBuffer.wrap(new byte[1024])) } - stream.read(16) >> helper.defaultMessageHeader(commandMessage.getId()) - stream.read(_) >> helper.reply('{ok : 0, errmsg : "failed"}') + stream.read(16, 0) >> helper.defaultMessageHeader(commandMessage.getId()) + stream.read(_, 0) >> helper.reply('{ok : 0, errmsg : "failed"}') when: connection.sendAndReceive(commandMessage, new BsonDocumentCodec(), NoOpSessionContext.INSTANCE) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java index f413236b7fa..ffa91c76b62 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPool.java @@ -62,6 +62,21 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod throw new UnsupportedOperationException("Not implemented yet!"); } + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + throw new UnsupportedOperationException(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMoreToCome() { + throw new UnsupportedOperationException(); + } + @Override public void sendAndReceiveAsync(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext, final SingleResultCallback callback) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java index d7eef87896e..62fb5989881 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestConnectionPoolListener.java @@ -89,7 +89,7 @@ public void waitForEvent(final Class eventClass, final int count, final l } private boolean containsEvent(final Class eventClass, final int expectedEventCount) { - return countEvents(eventClass) == expectedEventCount; + return countEvents(eventClass) >= expectedEventCount; } private void addEvent(final Object event) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java index 2a839af395e..792857277f0 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnection.java @@ -176,6 +176,21 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod } } + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + throw new UnsupportedOperationException(); + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean hasMoreToCome() { + throw new UnsupportedOperationException(); + } + private T getResponseDocument(final ResponseBuffers responseBuffers, final CommandMessage commandMessage, final Decoder decoder) { ReplyMessage replyMessage = new ReplyMessage(responseBuffers, decoder, commandMessage.getId()); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java index dd0eb91c48e..8cb33005762 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestInternalConnectionFactory.java @@ -100,6 +100,20 @@ public T sendAndReceive(final CommandMessage message, final Decoder decod return null; } + @Override + public void send(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext) { + } + + @Override + public T receive(final Decoder decoder, final SessionContext sessionContext) { + return null; + } + + @Override + public boolean hasMoreToCome() { + return false; + } + @Override public void sendAndReceiveAsync(final CommandMessage message, final Decoder decoder, final SessionContext sessionContext, final SingleResultCallback callback) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java index d64ce42f6af..12f9da1988e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServer.java @@ -49,22 +49,18 @@ public void sendNotification(final ServerDescription newDescription) { } @Override - public void invalidate() { - sendNotification(ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build()); - } - - @Override - public void invalidate(final int connectionGeneration) { - invalidate(); + public void resetToConnecting() { + this.description = ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build(); } @Override - public void invalidate(final Throwable reason) { - invalidate(); + public void invalidate() { + sendNotification(ServerDescription.builder().state(CONNECTING).address(serverId.getAddress()).build()); } @Override - public void invalidate(final Throwable reason, final int connectionGeneration, final int maxWireVersion) { + public void invalidate(final ConnectionState connectionState, final Throwable reason, final int connectionGeneration, + final int maxWireVersion) { invalidate(); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerMonitor.java b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerMonitor.java index de2364941c5..bda5be7bc7d 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerMonitor.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/TestServerMonitor.java @@ -42,6 +42,10 @@ public void connect() { public void close() { } + @Override + public void cancelCurrentCheck() { + } + public void setServerStateListener(final ChangeListener serverStateListener) { this.serverStateListener = serverStateListener; } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClient.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClient.java index 6c72ed59b45..ed29939fbdc 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClient.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/MongoClient.java @@ -18,6 +18,9 @@ import com.mongodb.ClientSessionOptions; import com.mongodb.annotations.Immutable; +import com.mongodb.connection.ClusterDescription; +import com.mongodb.connection.ClusterSettings; +import com.mongodb.event.ClusterListener; import org.bson.Document; import org.bson.conversions.Bson; import org.reactivestreams.Publisher; @@ -224,4 +227,20 @@ public interface MongoClient extends Closeable { */ Publisher startSession(ClientSessionOptions options); + /** + * Gets the current cluster description. + * + *

+ * This method will not block, meaning that it may return a {@link ClusterDescription} whose {@code clusterType} is unknown + * and whose {@link com.mongodb.connection.ServerDescription}s are all in the connecting state. If the application requires + * notifications after the driver has connected to a member of the cluster, it should register a {@link ClusterListener} via + * the {@link ClusterSettings} in {@link com.mongodb.MongoClientSettings}. + *

+ * + * @return the current cluster description + * @see ClusterSettings.Builder#addClusterListener(ClusterListener) + * @see com.mongodb.MongoClientSettings.Builder#applyToClusterSettings(com.mongodb.Block) + * @since 4.1 + */ + ClusterDescription getClusterDescription(); } diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java index 60986d5b06b..2a485a88726 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/MongoClientImpl.java @@ -17,6 +17,7 @@ package com.mongodb.reactivestreams.client.internal; import com.mongodb.ClientSessionOptions; +import com.mongodb.connection.ClusterDescription; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.async.client.AsyncClientSession; import com.mongodb.internal.async.client.AsyncMongoClient; @@ -155,4 +156,9 @@ public void onResult(final AsyncClientSession result, final Throwable t) { } })); } + + @Override + public ClusterDescription getClusterDescription() { + return wrapped.getClusterDescription(); + } } diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ServerDiscoveryAndMonitoringTest.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ServerDiscoveryAndMonitoringTest.java new file mode 100644 index 00000000000..9dee7a373e4 --- /dev/null +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/ServerDiscoveryAndMonitoringTest.java @@ -0,0 +1,37 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.reactivestreams.client; + +import com.mongodb.MongoClientSettings; +import com.mongodb.client.AbstractServerDiscoveryAndMonitoringTest; +import com.mongodb.client.MongoClient; +import com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient; +import org.bson.BsonArray; +import org.bson.BsonDocument; + +public class ServerDiscoveryAndMonitoringTest extends AbstractServerDiscoveryAndMonitoringTest { + public ServerDiscoveryAndMonitoringTest(final String filename, final String description, final String databaseName, + final String collectionName, final BsonArray data, final BsonDocument definition, + final boolean skipTest) { + super(filename, description, databaseName, collectionName, data, definition, skipTest); + } + + @Override + protected MongoClient createMongoClient(final MongoClientSettings settings) { + return new SyncMongoClient(MongoClients.create(settings)); + } +} diff --git a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java index 1fd0ebee663..d5ff798bb8f 100644 --- a/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java +++ b/driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java @@ -139,7 +139,7 @@ public ChangeStreamIterable watch(final ClientSession clientS @Override public ClusterDescription getClusterDescription() { - throw new UnsupportedOperationException(); + return wrapped.getClusterDescription(); } private com.mongodb.reactivestreams.client.ClientSession unwrap(final ClientSession clientSession) { diff --git a/driver-scala/src/main/scala/org/mongodb/scala/MongoClient.scala b/driver-scala/src/main/scala/org/mongodb/scala/MongoClient.scala index d94570712ba..bda49873a6a 100644 --- a/driver-scala/src/main/scala/org/mongodb/scala/MongoClient.scala +++ b/driver-scala/src/main/scala/org/mongodb/scala/MongoClient.scala @@ -18,9 +18,10 @@ package org.mongodb.scala import java.io.Closeable +import com.mongodb.connection.ClusterDescription import com.mongodb.reactivestreams.client.{ MongoClients, MongoClient => JMongoClient } -import org.bson.codecs.configuration.CodecRegistry import org.bson.codecs.configuration.CodecRegistries.{ fromProviders, fromRegistries } +import org.bson.codecs.configuration.CodecRegistry import org.mongodb.scala.bson.DefaultHelper.DefaultsTo import org.mongodb.scala.bson.codecs.{ DocumentCodecProvider, IterableCodecProvider } import org.mongodb.scala.bson.conversions.Bson @@ -251,4 +252,21 @@ case class MongoClient(private val wrapped: JMongoClient) extends Closeable { )(implicit e: C DefaultsTo Document, ct: ClassTag[C]): ChangeStreamObservable[C] = ChangeStreamObservable(wrapped.watch(clientSession, pipeline.asJava, ct)) + /** + * Gets the current cluster description. + * + *

+ * This method will not block, meaning that it may return a { @link ClusterDescription} whose { @code clusterType} is unknown + * and whose { @link com.mongodb.connection.ServerDescription}s are all in the connecting state. If the application requires + * notifications after the driver has connected to a member of the cluster, it should register a { @link ClusterListener} via + * the { @link ClusterSettings} in { @link com.mongodb.MongoClientSettings}. + *

+ * + * @return the current cluster description + * @see ClusterSettings.Builder#addClusterListener(ClusterListener) + * @see com.mongodb.MongoClientSettings.Builder#applyToClusterSettings(com.mongodb.Block) + * @since 4.1 + */ + def getClusterDescription: ClusterDescription = + wrapped.getClusterDescription } diff --git a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala index ba7fd3b7550..806ce090074 100644 --- a/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala +++ b/driver-scala/src/test/scala/org/mongodb/scala/MongoClientSpec.scala @@ -19,7 +19,6 @@ package org.mongodb.scala import com.mongodb.reactivestreams.client.{ MongoClient => JMongoClient } import org.bson.BsonDocument import org.scalamock.scalatest.proxy.MockFactory -import org.scalatest.{ FlatSpec, Matchers } import scala.collection.JavaConverters._ @@ -99,4 +98,8 @@ class MongoClientSpec extends BaseSpec with MockFactory { mongoClient.watch[BsonDocument](clientSession, pipeline) shouldBe a[ChangeStreamObservable[_]] } + it should "call the underlying getClusterDescription" in { + wrapped.expects(Symbol("getClusterDescription"))().once() + mongoClient.getClusterDescription + } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java index 2aa7102b5df..200027d016c 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableReadsTest.java @@ -28,6 +28,7 @@ import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSBuckets; import com.mongodb.client.test.CollectionHelper; +import com.mongodb.connection.ServerSettings; import com.mongodb.connection.SocketSettings; import com.mongodb.event.CommandEvent; import com.mongodb.internal.connection.TestCommandListener; @@ -137,6 +138,12 @@ public void apply(final SocketSettings.Builder builder) { builder.readTimeout(5, TimeUnit.SECONDS); } }) + .applyToServerSettings(new Block() { + @Override + public void apply(final ServerSettings.Builder builder) { + builder.heartbeatFrequency(5, TimeUnit.MILLISECONDS); + } + }) .writeConcern(getWriteConcern(clientOptions)) .readConcern(getReadConcern(clientOptions)) .readPreference(getReadPreference(clientOptions)) diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableWritesTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableWritesTest.java index 80b7813eb30..c1bfc342d64 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableWritesTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractRetryableWritesTest.java @@ -16,12 +16,14 @@ package com.mongodb.client; +import com.mongodb.Block; import com.mongodb.MongoClientSettings; import com.mongodb.MongoCommandException; import com.mongodb.MongoException; import com.mongodb.MongoNamespace; import com.mongodb.MongoWriteConcernException; import com.mongodb.client.test.CollectionHelper; +import com.mongodb.connection.ServerSettings; import org.bson.BsonArray; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -43,6 +45,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.TimeUnit; import static com.mongodb.ClusterFixture.isDiscoverableReplicaSet; import static com.mongodb.JsonTestServerVersionChecker.skipTest; @@ -91,6 +94,13 @@ public void setUp() { if (clientOptions.containsKey("retryWrites")) { builder.retryWrites(clientOptions.getBoolean("retryWrites").getValue()); } + builder.applyToServerSettings(new Block() { + @Override + public void apply(final ServerSettings.Builder builder) { + builder.heartbeatFrequency(5, TimeUnit.MILLISECONDS); + } + }); + mongoClient = createMongoClient(builder.build()); List documents = new ArrayList<>(); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractServerDiscoveryAndMonitoringTest.java new file mode 100644 index 00000000000..78198b3e8af --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractServerDiscoveryAndMonitoringTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package com.mongodb.client; + +import org.bson.BsonArray; +import org.bson.BsonDocument; +import org.bson.BsonString; +import org.bson.BsonValue; +import org.junit.runners.Parameterized; +import util.JsonPoweredTestHelper; + +import java.io.File; +import java.io.IOException; +import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + +import static com.mongodb.JsonTestServerVersionChecker.skipTest; +import static com.mongodb.client.Fixture.getDefaultDatabaseName; + +public abstract class AbstractServerDiscoveryAndMonitoringTest extends AbstractUnifiedTest { + public AbstractServerDiscoveryAndMonitoringTest(final String filename, final String description, final String databaseName, + final String collectionName, final BsonArray data, final BsonDocument definition, + final boolean skipTest) { + super(filename, description, databaseName, collectionName, data, definition, skipTest); + } + + @Parameterized.Parameters(name = "{0}: {1}") + public static Collection data() throws URISyntaxException, IOException { + List data = new ArrayList(); + for (File file : JsonPoweredTestHelper.getTestFiles("/server-discovery-and-monitoring-integration")) { + BsonDocument testDocument = JsonPoweredTestHelper.getTestDocument(file); + + for (BsonValue test : testDocument.getArray("tests")) { + data.add(new Object[]{file.getName(), test.asDocument().getString("description").getValue(), + testDocument.getString("database_name", new BsonString(getDefaultDatabaseName())).getValue(), + testDocument.getString("collection_name", + new BsonString(file.getName().substring(0, file.getName().lastIndexOf(".")))).getValue(), + testDocument.getArray("data"), test.asDocument(), skipTest(testDocument, test.asDocument())}); + } + } + return data; + } + +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java index 1b197f0e139..b9a39bd2341 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractUnifiedTest.java @@ -33,11 +33,16 @@ import com.mongodb.client.model.CreateCollectionOptions; import com.mongodb.client.test.CollectionHelper; import com.mongodb.connection.ClusterSettings; +import com.mongodb.connection.ConnectionPoolSettings; +import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerSettings; +import com.mongodb.connection.ServerType; import com.mongodb.connection.SocketSettings; import com.mongodb.event.CommandEvent; import com.mongodb.event.CommandStartedEvent; +import com.mongodb.event.ConnectionPoolClearedEvent; import com.mongodb.internal.connection.TestCommandListener; +import com.mongodb.internal.connection.TestConnectionPoolListener; import com.mongodb.lang.Nullable; import org.bson.BsonArray; import org.bson.BsonBoolean; @@ -57,7 +62,10 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.stream.Collectors; import static com.mongodb.ClusterFixture.getConnectionString; @@ -71,6 +79,7 @@ import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; import static java.util.Collections.singletonList; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -91,6 +100,8 @@ public abstract class AbstractUnifiedTest { private final boolean skipTest; private JsonPoweredCrudTestHelper helper; private final TestCommandListener commandListener; + private final TestConnectionPoolListener connectionPoolListener; + private final TestServerListener serverListener; private MongoClient mongoClient; private CollectionHelper collectionHelper; private Map sessionsMap; @@ -99,6 +110,8 @@ public abstract class AbstractUnifiedTest { private ConnectionString connectionString = null; private final String collectionName; private MongoDatabase database; + private final Map executorServiceMap = new HashMap<>(); + private final Map> futureMap = new HashMap<>(); private static final long MIN_HEARTBEAT_FREQUENCY_MS = 50L; @@ -111,6 +124,8 @@ public AbstractUnifiedTest(final String filename, final String description, fina this.data = data; this.definition = definition; this.commandListener = new TestCommandListener(); + this.connectionPoolListener = new TestConnectionPoolListener(); + this.serverListener = new TestServerListener(); this.skipTest = skipTest; } @@ -154,10 +169,22 @@ public void setUp() { MongoClientSettings.Builder builder = getMongoClientSettingsBuilder() .applyConnectionString(connectionString) .addCommandListener(commandListener) + .applyToClusterSettings(new Block() { + @Override + public void apply(final ClusterSettings.Builder builder) { + if (clientOptions.containsKey("serverSelectionTimeoutMS")) { + builder.serverSelectionTimeout(clientOptions.getNumber("serverSelectionTimeoutMS").longValue(), + MILLISECONDS); + } + } + }) .applyToSocketSettings(new Block() { @Override public void apply(final SocketSettings.Builder builder) { - builder.readTimeout(5, TimeUnit.SECONDS); + builder.readTimeout(5, SECONDS); + if (clientOptions.containsKey("connectTimeoutMS")) { + builder.connectTimeout(clientOptions.getNumber("connectTimeoutMS").intValue(), MILLISECONDS); + } } }) .writeConcern(getWriteConcern(clientOptions)) @@ -165,10 +192,18 @@ public void apply(final SocketSettings.Builder builder) { .readPreference(getReadPreference(clientOptions)) .retryWrites(clientOptions.getBoolean("retryWrites", BsonBoolean.FALSE).getValue()) .retryReads(false) + .applyToConnectionPoolSettings(new Block() { + @Override + public void apply(final ConnectionPoolSettings.Builder builder) { + builder.addConnectionPoolListener(connectionPoolListener); + } + }) .applyToServerSettings(new Block() { @Override public void apply(final ServerSettings.Builder builder) { +// builder.heartbeatFrequency(5, MILLISECONDS); builder.minHeartbeatFrequency(MIN_HEARTBEAT_FREQUENCY_MS, MILLISECONDS); + builder.addServerListener(serverListener); } }); if (clientOptions.containsKey("heartbeatFrequencyMS")) { @@ -179,6 +214,16 @@ public void apply(final ServerSettings.Builder builder) { } }); } + if (clientOptions.containsKey("appname")) { + builder.applicationName(clientOptions.getString("appname").getValue()); + } + if (clientOptions.containsKey("w")) { + if (clientOptions.isString("w")) { + builder.writeConcern(new WriteConcern(clientOptions.getString("w").getValue())); + } else if (clientOptions.isNumber("w")) { + builder.writeConcern(new WriteConcern(clientOptions.getNumber("w").intValue())); + } + } mongoClient = createMongoClient(builder.build()); database = mongoClient.getDatabase(databaseName); @@ -302,12 +347,19 @@ private void closeAllSessions() { } } + private void shutdownAllExecutors() { + for (ExecutorService cur : executorServiceMap.values()) { + cur.shutdownNow(); + } + } + @Test public void shouldPassAllOutcomes() { try { executeOperations(definition.getArray("operations"), false); } finally { closeAllSessions(); + shutdownAllExecutors(); } if (definition.containsKey("expectations")) { @@ -333,7 +385,8 @@ public void shouldPassAllOutcomes() { } private void executeOperations(final BsonArray operations, final boolean throwExceptions) { - TargetedFailPoint failPoint = null; + FailPoint failPoint = null; + ServerAddress currentPrimary = null; try { for (BsonValue cur : operations) { @@ -386,9 +439,73 @@ public Void execute() { }, transactionOptions); } } else if (operationName.equals("targetedFailPoint")) { - assertTrue(failPoint == null); + assertNull(failPoint); failPoint = new TargetedFailPoint(operation); failPoint.executeFailPoint(); + } else if (operationName.equals("configureFailPoint")) { + assertNull(failPoint); + failPoint = new FailPoint(operation); + failPoint.executeFailPoint(); + } else if (operationName.equals("startThread")) { + String target = operation.getDocument("arguments").getString("name").getValue(); + executorServiceMap.put(target, Executors.newSingleThreadExecutor()); + } else if (operationName.equals("runOnThread")) { + String target = operation.getDocument("arguments").getString("name").getValue(); + ExecutorService executorService = executorServiceMap.get(target); + Callable callable = createCallable(operation.getDocument("arguments").getDocument("operation")); + futureMap.put(target, executorService.submit(callable)); + } else if (operationName.equals("wait")) { + Thread.sleep(operation.getDocument("arguments").getNumber("ms").longValue()); + } else if (operationName.equals("waitForThread")) { + String target = operation.getDocument("arguments").getString("name").getValue(); + Exception exceptionFromFuture = futureMap.remove(target).get(5, SECONDS); + if (exceptionFromFuture != null) { + throw exceptionFromFuture; + } + } else if (operationName.equals("waitForEvent")) { + String event = operation.getDocument("arguments").getString("event").getValue(); + int count = operation.getDocument("arguments").getNumber("count").intValue(); + switch (event) { + case "PoolClearedEvent": + connectionPoolListener.waitForEvent(ConnectionPoolClearedEvent.class, count, 5, SECONDS); + break; + case "ServerMarkedUnknownEvent": + serverListener.waitForEvent(ServerType.UNKNOWN, count, 5, SECONDS); + break; + default: + throw new UnsupportedOperationException("Unsupported event type: " + event); + } + } else if (operationName.equals("assertEventCount")) { + String event = operation.getDocument("arguments").getString("event").getValue(); + int expectedCount = operation.getDocument("arguments").getNumber("count").intValue(); + int actualCount = -1; + switch (event) { + case "PoolClearedEvent": + actualCount = connectionPoolListener.countEvents(ConnectionPoolClearedEvent.class); + break; + case "ServerMarkedUnknownEvent": + actualCount = serverListener.countEvents(ServerType.UNKNOWN); + break; + default: + throw new UnsupportedOperationException("Unsupported event type: " + event); + } + assertEquals(event + " counts not equal", expectedCount, actualCount); + } else if (operationName.equals("recordPrimary")) { + currentPrimary = getCurrentPrimary(); + } else if (operationName.equals("waitForPrimaryChange")) { + long startTimeMillis = System.currentTimeMillis(); + int timeoutMillis = operation.getDocument("arguments").getNumber("timeoutMS").intValue(); + ServerAddress newPrimary = getCurrentPrimary(); + while (newPrimary == null || newPrimary.equals(currentPrimary)) { + if (startTimeMillis + timeoutMillis <= System.currentTimeMillis()) { + fail("Timed out waiting for primary change"); + } + //noinspection BusyWait + Thread.sleep(50); + newPrimary = getCurrentPrimary(); + } + } else if (operationName.equals("runAdminCommand")) { + collectionHelper.runAdminCommand(operation.getDocument("arguments").getDocument("command")); } else if (operationName.equals("assertSessionPinned")) { final BsonDocument arguments = operation.getDocument("arguments", new BsonDocument()); assertNotNull(sessionsMap.get(arguments.getString("session").getValue()).getPinnedServerAddress()); @@ -465,6 +582,8 @@ public Void execute() { if (!assertExceptionState(e, expectedResult, operationName) || throwExceptions) { throw e; } + } catch (Exception e) { + throw new RuntimeException(e); } } } finally { @@ -474,6 +593,31 @@ public Void execute() { } } + private @Nullable ServerAddress getCurrentPrimary() { + for (ServerDescription serverDescription: mongoClient.getClusterDescription().getServerDescriptions()) { + if (serverDescription.getType() == ServerType.REPLICA_SET_PRIMARY) { + return serverDescription.getAddress(); + } + } + return null; + } + + private Callable createCallable(final BsonDocument operation) { + return () -> { + try { + executeOperations(new BsonArray(singletonList(operation)), true); + return null; + } catch (Exception e) { + if (operation.getBoolean("error", BsonBoolean.FALSE).getValue()) { + return null; + } + return e; + } catch (Error e) { + return new RuntimeException("Wrapping unexpected Error", e); + } + }; + } + private void assertCollectionExists(final BsonDocument operation, final boolean shouldExist) { BsonDocument arguments = operation.getDocument("arguments", new BsonDocument()); String databaseName = arguments.getString("database").getValue(); @@ -625,12 +769,34 @@ private ClientSession nonNullClientSession(@Nullable final ClientSession clientS return clientSession; } - private class TargetedFailPoint { + private class FailPoint { private final BsonDocument failPointDocument; + + protected FailPoint(final BsonDocument operation) { + this.failPointDocument = operation.getDocument("arguments").getDocument("failPoint"); + } + + public void executeFailPoint() { + executeCommand(failPointDocument); + } + + public void disableFailPoint() { + executeCommand(new BsonDocument("configureFailPoint", + failPointDocument.getString("configureFailPoint")) + .append("mode", new BsonString("off"))); + } + + protected void executeCommand(final BsonDocument doc) { + collectionHelper.runAdminCommand(doc); + } + } + + private class TargetedFailPoint extends FailPoint { private final MongoDatabase adminDB; - private MongoClient mongoClient; + private final MongoClient mongoClient; TargetedFailPoint(final BsonDocument operation) { + super(operation); final BsonDocument arguments = operation.getDocument("arguments", new BsonDocument()); final ClientSession clientSession = sessionsMap.get(arguments.getString("session").getValue()); @@ -646,29 +812,23 @@ public void apply(final ClusterSettings.Builder builder) { adminDB = mongoClient.getDatabase("admin"); } else { + mongoClient = null; adminDB = null; } - failPointDocument = arguments.getDocument("failPoint"); - } - - public void executeFailPoint() { - executeCommand(failPointDocument); } public void disableFailPoint() { - executeCommand(new BsonDocument("configureFailPoint", - failPointDocument.getString("configureFailPoint")) - .append("mode", new BsonString("off"))); + super.disableFailPoint(); if (mongoClient != null) { mongoClient.close(); } } - private void executeCommand(final BsonDocument doc) { + protected void executeCommand(final BsonDocument doc) { if (adminDB != null) { adminDB.runCommand(doc); } else { - collectionHelper.runAdminCommand(doc); + super.executeCommand(doc); } } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java new file mode 100644 index 00000000000..07cdfa111b0 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -0,0 +1,125 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.Block; +import com.mongodb.MongoClientSettings; +import com.mongodb.connection.ServerSettings; +import com.mongodb.event.ServerDescriptionChangedEvent; +import com.mongodb.event.ServerHeartbeatSucceededEvent; +import com.mongodb.event.ServerListener; +import com.mongodb.event.ServerMonitorListener; +import org.bson.Document; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static com.mongodb.ClusterFixture.configureFailPoint; +import static com.mongodb.ClusterFixture.disableFailPoint; +import static com.mongodb.ClusterFixture.isStandalone; +import static com.mongodb.ClusterFixture.serverVersionAtLeast; +import static com.mongodb.client.Fixture.getMongoClientSettingsBuilder; +import static java.util.concurrent.TimeUnit.NANOSECONDS; +import static org.bson.BsonDocument.parse; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assume.assumeTrue; + +public class ServerDiscoveryAndMonitoringProseTests { + + @Test + @SuppressWarnings("try") + public void testHeartbeatFrequency() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(5); + MongoClientSettings settings = getMongoClientSettingsBuilder() + .applyToServerSettings(new Block() { + @Override + public void apply(final ServerSettings.Builder builder) { + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS); + builder.addServerMonitorListener(new ServerMonitorListener() { + @Override + public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { + latch.countDown(); + } + }); + } + }).build(); + + try (MongoClient ignored = MongoClients.create(settings)) { + assertTrue("Took longer than expected to reach expected number of hearbeats", + latch.await(500, TimeUnit.MILLISECONDS)); + } + } + + @Test + public void testRTTUpdates() throws InterruptedException { + assumeTrue(isStandalone()); + assumeTrue(serverVersionAtLeast(4, 4)); + + List events = new ArrayList<>(); + MongoClientSettings settings = getMongoClientSettingsBuilder() + .applicationName("streamingRttTest") + .applyToServerSettings(new Block() { + @Override + public void apply(final ServerSettings.Builder builder) { + builder.heartbeatFrequency(50, TimeUnit.MILLISECONDS); + builder.addServerListener(new ServerListener() { + @Override + public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { + events.add(event); + } + }); + } + }).build(); + try (MongoClient client = MongoClients.create(settings)) { + client.getDatabase("admin").runCommand(new Document("ping", 1)); + Thread.sleep(250); + assertTrue(events.size() > 1); + events.forEach(event -> + assertTrue(event.getNewDescription().getRoundTripTimeNanos() > 0)); + + configureFailPoint(parse("{" + + "configureFailPoint: \"failCommand\"," + + "mode: {times: 1000}," + + " data: {" + + " failCommands: [\"isMaster\"]," + + " blockConnection: true," + + " blockTimeMS: 100," + + " appName: \"streamingRttTest\"" + + " }" + + "}")); + + long startTime = System.currentTimeMillis(); + while (true) { + long rttMillis = NANOSECONDS.toMillis(client.getClusterDescription().getServerDescriptions().get(0) + .getRoundTripTimeNanos()); + if (rttMillis > 50) { + break; + } + assertFalse(System.currentTimeMillis() - startTime > 1000); + //noinspection BusyWait + Thread.sleep(50); + } + + } finally { + disableFailPoint("failCommand"); + } + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringTest.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringTest.java new file mode 100644 index 00000000000..d0d48b0c255 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringTest.java @@ -0,0 +1,34 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.MongoClientSettings; +import org.bson.BsonArray; +import org.bson.BsonDocument; + +public class ServerDiscoveryAndMonitoringTest extends AbstractServerDiscoveryAndMonitoringTest { + public ServerDiscoveryAndMonitoringTest(final String filename, final String description, final String databaseName, + final String collectionName, final BsonArray data, final BsonDocument definition, + final boolean skipTest) { + super(filename, description, databaseName, collectionName, data, definition, skipTest); + } + + @Override + protected MongoClient createMongoClient(final MongoClientSettings settings) { + return MongoClients.create(settings); + } +} diff --git a/driver-sync/src/test/functional/com/mongodb/client/TestServerListener.java b/driver-sync/src/test/functional/com/mongodb/client/TestServerListener.java new file mode 100644 index 00000000000..7ef8d6fa276 --- /dev/null +++ b/driver-sync/src/test/functional/com/mongodb/client/TestServerListener.java @@ -0,0 +1,96 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.client; + +import com.mongodb.connection.ServerType; +import com.mongodb.event.ServerDescriptionChangedEvent; +import com.mongodb.event.ServerListener; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +public class TestServerListener implements ServerListener { + private final List events = new ArrayList<>(); + private final Lock lock = new ReentrantLock(); + private final Condition condition = lock.newCondition(); + private volatile int waitingForEventCount; + private volatile ServerType waitingForServerType; + + @Override + public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) { + addEvent(event); + } + + public List getEvents() { + lock.lock(); + try { + return new ArrayList<>(events); + } finally { + lock.unlock(); + } + } + + public void waitForEvent(final ServerType serverType, final int count, final long time, final TimeUnit unit) + throws InterruptedException, TimeoutException { + lock.lock(); + try { + waitingForServerType = serverType; + waitingForEventCount = count; + if (containsEvent(serverType, count)) { + return; + } + if (!condition.await(time, unit)) { + throw new TimeoutException("Timed out waiting for " + count + " server description changed events with serverType " + + serverType); + } + } finally { + waitingForServerType = null; + lock.unlock(); + } + } + + public int countEvents(final ServerType serverType) { + int eventCount = 0; + for (ServerDescriptionChangedEvent event : getEvents()) { + if (event.getNewDescription().getType() == serverType) { + eventCount++; + } + } + return eventCount; + } + + private void addEvent(final ServerDescriptionChangedEvent event) { + lock.lock(); + try { + events.add(event); + if (waitingForServerType != null && containsEvent(waitingForServerType, waitingForEventCount)) { + condition.signalAll(); + } + } finally { + lock.unlock(); + } + } + + private boolean containsEvent(final ServerType serverType, final int expectedEventCount) { + return countEvents(serverType) >= expectedEventCount; + } +}