Skip to content

Commit

Permalink
Introduce streaming ismaster monitoring protocol
Browse files Browse the repository at this point in the history
* DefaultServerMonitor implements the new streaming isMaster protocol,
  if available, to detect topology changes sooner. This capability was
  added in MongoDB release 4.4.
* This requires the ability to increase the read timeout on a per-read
  basis.  Since the Stream class is public, this has to be done carefully
  in order to avoid using the streaming protocol with Stream implementations
  that don't have this ability.  All the built-in Stream implementations have
  had this ability added, so in practice it should not happen unless an
  application has created its own Stream implementation (unlikely).
* Implement the new server discovery and monitoring specification integration
  tests
* Fix some remaining bugs in error handling

JAVA-3626
  • Loading branch information
jyemin committed May 28, 2020
1 parent 665cc9b commit c8b028a
Show file tree
Hide file tree
Showing 61 changed files with 3,355 additions and 366 deletions.
1 change: 1 addition & 0 deletions config/checkstyle-exclude.xml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
<suppress checks="Regexp" files="Tour"/>

<suppress checks="MethodLength" files="PojoRoundTripTest"/>
<suppress checks="MethodLength" files="AbstractUnifiedTest"/>

<suppress checks="JavadocPackage" files="com[\\/]mongodb[\\/][^\\/]*\.java"/>
<suppress checks="JavadocPackage" files="com[\\/]mongodb[\\/]client[\\/][^\\/]*\.java"/>
Expand Down
36 changes: 36 additions & 0 deletions driver-core/src/main/com/mongodb/connection/Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,42 @@ public interface Stream extends BufferProvider{
*/
ByteBuf read(int numBytes) throws IOException;

/**
* Gets whether this implementation supports specifying an additional timeout for read operations
* <p>
* The default is to not support specifying an additional timeout
* </p>
*
* @return true if this implementation supports specifying an additional timeouts for reads operations
* @see #read(int, int)
* @since 4.1
*/
default boolean supportsAdditionalTimeout() {
return false;
}

/**
* Read from the stream, blocking until the requested number of bytes have been read. If supported by the implementation,
* adds the given additional timeout to the configured timeout for the stream.
* <p>
* This method should not be called unless {@link #supportsAdditionalTimeout()} returns true.
* </p>
* <p>
* The default behavior is to throw an {@link UnsupportedOperationException}
* </p>
*
* @param numBytes The number of bytes to read into the returned byte buffer
* @param additionalTimeout additional timeout in milliseconds to add to the configured timeout
* @return a byte buffer filled with number of bytes requested
* @throws IOException if there are problems reading from the stream
* @throws UnsupportedOperationException if this implementation does not support additional timeouts
* @see #supportsAdditionalTimeout()
* @since 4.1
*/
default ByteBuf read(int numBytes, int additionalTimeout) throws IOException {
throw new UnsupportedOperationException();
}

/**
* Write each buffer in the list to the stream in order, asynchronously. This method should return immediately, and invoke the given
* callback on completion.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,11 @@ private static class TlsChannelStream extends AsynchronousChannelStream implemen
this.selectorMonitor = selectorMonitor;
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public void openAsync(final AsyncCompletionHandler<Void> handler) {
isTrue("unopened", getChannel() == null);
Expand Down
33 changes: 25 additions & 8 deletions driver-core/src/main/com/mongodb/connection/netty/NettyStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,18 @@ public void write(final List<ByteBuf> buffers) throws IOException {

@Override
public ByteBuf read(final int numBytes) throws IOException {
return read(numBytes, 0);
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
FutureAsyncCompletionHandler<ByteBuf> future = new FutureAsyncCompletionHandler<ByteBuf>();
readAsync(numBytes, future);
readAsync(numBytes, future, additionalTimeout);
return future.get();
}

Expand All @@ -201,7 +211,11 @@ public void operationComplete(final ChannelFuture future) throws Exception {

@Override
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
scheduleReadTimeout();
readAsync(numBytes, handler, 0);
}

private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final int additionalTimeout) {
scheduleReadTimeout(additionalTimeout);
ByteBuf buffer = null;
Throwable exceptionResult = null;
synchronized (this) {
Expand Down Expand Up @@ -431,15 +445,18 @@ public void operationComplete(final ChannelFuture future) {
}
}

private void scheduleReadTimeout() {
adjustTimeout(false);
private void scheduleReadTimeout(final int additionalTimeout) {
adjustTimeout(false, additionalTimeout);
}

private void disableReadTimeout() {
adjustTimeout(true);
adjustTimeout(true, 0);
}

private void adjustTimeout(final boolean disable) {
private void adjustTimeout(final boolean disable, final int additionalTimeout) {
if (isClosed) {
return;
}
ChannelHandler timeoutHandler = channel.pipeline().get(READ_HANDLER_NAME);
if (timeoutHandler != null) {
final ReadTimeoutHandler readTimeoutHandler = (ReadTimeoutHandler) timeoutHandler;
Expand All @@ -459,12 +476,12 @@ public void run() {
}
} else {
if (executor.inEventLoop()) {
readTimeoutHandler.scheduleTimeout(handlerContext);
readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout);
} else {
executor.submit(new Runnable() {
@Override
public void run() {
readTimeoutHandler.scheduleTimeout(handlerContext);
readTimeoutHandler.scheduleTimeout(handlerContext, additionalTimeout);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ final class ReadTimeoutHandler extends ChannelInboundHandlerAdapter {
this.readTimeout = readTimeout;
}

void scheduleTimeout(final ChannelHandlerContext ctx) {
void scheduleTimeout(final ChannelHandlerContext ctx, final int additionalTimeout) {
isTrue("Handler called from the eventLoop", ctx.channel().eventLoop().inEventLoop());
if (timeout == null) {
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout, TimeUnit.MILLISECONDS);
timeout = ctx.executor().schedule(new ReadTimeoutTask(ctx), readTimeout + additionalTimeout, TimeUnit.MILLISECONDS);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
public final class ServerHeartbeatFailedEvent {
private final ConnectionId connectionId;
private final long elapsedTimeNanos;
private final boolean awaited;
private final Throwable throwable;

/**
Expand All @@ -39,9 +40,26 @@ public final class ServerHeartbeatFailedEvent {
* @param connectionId the non-null connectionId
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
* @param throwable the non-null exception that caused the failure
* @deprecated Prefer {@link #ServerHeartbeatFailedEvent(ConnectionId, long, boolean, Throwable)}
*/
@Deprecated
public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final Throwable throwable) {
this(connectionId, elapsedTimeNanos, false, throwable);
}

/**
* Construct an instance.
*
* @param connectionId the non-null connectionId
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
* @param awaited true if the response was awaited
* @param throwable the non-null exception that caused the failure
* @since 4.1
*/
public ServerHeartbeatFailedEvent(final ConnectionId connectionId, final long elapsedTimeNanos, final boolean awaited,
final Throwable throwable) {
this.connectionId = notNull("connectionId", connectionId);
this.awaited = awaited;
isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0);
this.elapsedTimeNanos = elapsedTimeNanos;
this.throwable = notNull("throwable", throwable);
Expand All @@ -67,6 +85,18 @@ public long getElapsedTime(final TimeUnit timeUnit) {
return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
}

/**
* Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
* @since 4.1
* @mongodb.server.release 4.4
*/
public boolean isAwaited() {
return awaited;
}

/**
* Gets the exceptions that caused the failure
*
Expand All @@ -81,6 +111,7 @@ public String toString() {
return "ServerHeartbeatFailedEvent{"
+ "connectionId=" + connectionId
+ ", elapsedTimeNanos=" + elapsedTimeNanos
+ ", awaited=" + awaited
+ ", throwable=" + throwable
+ "} " + super.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,19 +33,37 @@ public final class ServerHeartbeatSucceededEvent {
private final ConnectionId connectionId;
private final BsonDocument reply;
private final long elapsedTimeNanos;
private final boolean awaited;

/**
* Construct an instance.
*
* @param connectionId the non-null connectionId
* @param reply the non-null reply to an isMaster command
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
* @deprecated Prefer {@link #ServerHeartbeatSucceededEvent(ConnectionId, BsonDocument, long, boolean)}
*/
@Deprecated
public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos) {
this(connectionId, reply, elapsedTimeNanos, false);
}

/**
* Construct an instance.
*
* @param connectionId the non-null connectionId
* @param reply the non-null reply to an isMaster command
* @param elapsedTimeNanos the non-negative elapsed time in nanoseconds
* @param awaited true if the response was awaited
* @since 4.1
*/
public ServerHeartbeatSucceededEvent(final ConnectionId connectionId, final BsonDocument reply, final long elapsedTimeNanos,
final boolean awaited) {
this.connectionId = notNull("connectionId", connectionId);
this.reply = notNull("reply", reply);
isTrueArgument("elapsed time is not negative", elapsedTimeNanos >= 0);
this.elapsedTimeNanos = elapsedTimeNanos;
this.awaited = awaited;
}

/**
Expand Down Expand Up @@ -77,12 +95,25 @@ public long getElapsedTime(final TimeUnit timeUnit) {
return timeUnit.convert(elapsedTimeNanos, TimeUnit.NANOSECONDS);
}

/**
* Gets whether the heartbeat was awaited. If true, then {@link #getElapsedTime(TimeUnit)} reflects the sum of the round trip time
* to the server and the time that the server waited before sending a response.
*
* @return whether the response was awaited
* @since 4.1
* @mongodb.server.release 4.4
*/
public boolean isAwaited() {
return awaited;
}

@Override
public String toString() {
return "ServerHeartbeatSucceededEvent{"
+ "connectionId=" + connectionId
+ ", reply=" + reply
+ ", elapsedTimeNanos=" + elapsedTimeNanos
+ ", awaited=" + awaited
+ "} ";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

import com.mongodb.ClientSessionOptions;
import com.mongodb.annotations.Immutable;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.connection.ClusterSettings;
import com.mongodb.event.ClusterListener;
import com.mongodb.internal.async.SingleResultCallback;
import org.bson.Document;
import org.bson.conversions.Bson;
Expand Down Expand Up @@ -233,4 +236,19 @@ public interface AsyncMongoClient extends Closeable {
<TResult> AsyncChangeStreamIterable<TResult> watch(AsyncClientSession clientSession, List<? extends Bson> pipeline,
Class<TResult> resultClass);

/**
* Gets the current cluster description.
*
* <p>
* This method will not block, meaning that it may return a {@link ClusterDescription} whose {@code clusterType} is unknown
* and whose {@link com.mongodb.connection.ServerDescription}s are all in the connecting state. If the application requires
* notifications after the driver has connected to a member of the cluster, it should register a {@link ClusterListener} via
* the {@link ClusterSettings} in {@link com.mongodb.MongoClientSettings}.
* </p>
*
* @return the current cluster description
* @see ClusterSettings.Builder#addClusterListener(ClusterListener)
* @see com.mongodb.MongoClientSettings.Builder#applyToClusterSettings(com.mongodb.Block)
*/
ClusterDescription getClusterDescription();
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.mongodb.MongoClientException;
import com.mongodb.MongoClientSettings;
import com.mongodb.ReadPreference;
import com.mongodb.connection.ClusterDescription;
import com.mongodb.diagnostics.logging.Logger;
import com.mongodb.diagnostics.logging.Loggers;
import com.mongodb.internal.async.SingleResultCallback;
Expand Down Expand Up @@ -211,6 +212,11 @@ public <TResult> AsyncChangeStreamIterable<TResult> watch(final AsyncClientSessi
return createChangeStreamIterable(clientSession, pipeline, resultClass);
}

@Override
public ClusterDescription getClusterDescription() {
return cluster.getCurrentDescription();
}

private <TResult> AsyncChangeStreamIterable<TResult> createChangeStreamIterable(@Nullable final AsyncClientSession clientSession,
final List<? extends Bson> pipeline,
final Class<TResult> resultClass) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.mongodb.internal.connection;

import com.mongodb.MongoException;
import com.mongodb.MongoNotPrimaryException;
import com.mongodb.ServerAddress;
import com.mongodb.connection.ClusterConnectionMode;
import com.mongodb.connection.ClusterDescription;
Expand All @@ -30,7 +29,6 @@
import com.mongodb.event.ClusterDescriptionChangedEvent;
import com.mongodb.event.ServerDescriptionChangedEvent;
import com.mongodb.event.ServerListener;
import org.bson.BsonDocument;
import org.bson.types.ObjectId;

import java.util.ArrayList;
Expand Down Expand Up @@ -283,7 +281,7 @@ private boolean handleReplicaSetMemberChanged(final ServerDescription newDescrip
setVersion, electionId,
maxSetVersion, maxElectionId));
}
addressToServerTupleMap.get(newDescription.getAddress()).server.invalidate();
addressToServerTupleMap.get(newDescription.getAddress()).server.resetToConnecting();
return false;
}

Expand Down Expand Up @@ -370,7 +368,7 @@ private void invalidateOldPrimaries(final ServerAddress newPrimary) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info(format("Rediscovering type of existing primary %s", serverTuple.description.getAddress()));
}
serverTuple.server.invalidate(new MongoNotPrimaryException(new BsonDocument(), serverTuple.description.getAddress()));
serverTuple.server.invalidate();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,9 +105,18 @@ public void failed(final Throwable t) {

@Override
public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler) {
readAsync(numBytes, 0, handler);
}

private void readAsync(final int numBytes, final int additionalTimeout, final AsyncCompletionHandler<ByteBuf> handler) {
ByteBuf buffer = bufferProvider.getBuffer(numBytes);
channel.read(buffer.asNIO(), settings.getReadTimeout(MILLISECONDS), MILLISECONDS, null,
new BasicCompletionHandler(buffer, handler));

int timeout = settings.getReadTimeout(MILLISECONDS);
if (timeout > 0 && additionalTimeout > 0) {
timeout += additionalTimeout;
}

channel.read(buffer.asNIO(), timeout, MILLISECONDS, null, new BasicCompletionHandler(buffer, handler));
}

@Override
Expand All @@ -131,6 +140,18 @@ public ByteBuf read(final int numBytes) throws IOException {
return handler.getRead();
}

@Override
public boolean supportsAdditionalTimeout() {
return true;
}

@Override
public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOException {
FutureAsyncCompletionHandler<ByteBuf> handler = new FutureAsyncCompletionHandler<ByteBuf>();
readAsync(numBytes, additionalTimeout, handler);
return handler.getRead();
}

@Override
public ServerAddress getAddress() {
return serverAddress;
Expand Down
Loading

0 comments on commit c8b028a

Please sign in to comment.