From a064b7787fb442a15173f328bf0559a3ed48522b Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 28 Sep 2023 10:12:50 -0600 Subject: [PATCH 1/4] Fix flaky timeout tests # Conflicts: # driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java --- config/spotbugs/exclude.xml | 5 +++++ .../connection/BaseClusterSpecification.groovy | 4 ++-- .../connection/LoadBalancedClusterTest.java | 15 +++++++-------- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/config/spotbugs/exclude.xml b/config/spotbugs/exclude.xml index c24643ea982..3ab12a9b1a7 100644 --- a/config/spotbugs/exclude.xml +++ b/config/spotbugs/exclude.xml @@ -283,6 +283,11 @@ + + + + + diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 5364087d635..99587518581 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -188,11 +188,11 @@ class BaseClusterSpecification extends Specification { .exception(new MongoInternalException('oops')) .build()) - cluster.selectServer(new WritableServerSelector(), new OperationContext()) + cluster.selectServer(new WritableServerSelector(), OPERATION_CONTEXT) then: def e = thrown(MongoTimeoutException) - e.getMessage().startsWith("Timed out after ${serverSelectionTimeoutMS} ms while waiting for a server " + + e.getMessage().contains("ms while waiting for a server " + 'that matches WritableServerSelector. Client view of cluster state is {type=UNKNOWN') e.getMessage().contains('{address=localhost:27017, type=UNKNOWN, state=CONNECTING, ' + 'exception={com.mongodb.MongoInternalException: oops}}') diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java index 39bbd1167f4..03ca8f92cb5 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java @@ -234,7 +234,7 @@ public void shouldTimeoutSelectServerWhenThereIsSRVLookup() { MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, () -> cluster.selectServer(mock(ServerSelector.class), OPERATION_CONTEXT)); - assertEquals("Timed out after 5 ms while waiting to resolve SRV records for foo.bar.com.", exception.getMessage()); + assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); } @Test @@ -261,9 +261,9 @@ public void shouldTimeoutSelectServerWhenThereIsSRVLookupException() { MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, () -> cluster.selectServer(mock(ServerSelector.class), OPERATION_CONTEXT)); - assertEquals("Timed out after 10 ms while waiting to resolve SRV records for foo.bar.com. " - + "Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'", - exception.getMessage()); + + assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'")); } @Test @@ -292,7 +292,7 @@ public void shouldTimeoutSelectServerAsynchronouslyWhenThereIsSRVLookup() { cluster.selectServerAsync(mock(ServerSelector.class), OPERATION_CONTEXT, callback); MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, callback::get); - assertEquals("Timed out after 5 ms while waiting to resolve SRV records for foo.bar.com.", exception.getMessage()); + assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); } @Test @@ -321,9 +321,8 @@ public void shouldTimeoutSelectServerAsynchronouslyWhenThereIsSRVLookupException cluster.selectServerAsync(mock(ServerSelector.class), OPERATION_CONTEXT, callback); MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, callback::get); - assertEquals("Timed out after 10 ms while waiting to resolve SRV records for foo.bar.com. " - + "Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'", - exception.getMessage()); + assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'")); } @Test From eaa1e0ea553385e767540773468386ce453fbe9d Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 28 Sep 2023 13:24:57 -0600 Subject: [PATCH 2/4] Pass operationContext through to startServerSelectionTimeout --- .../AbstractMultiServerCluster.java | 2 +- .../internal/connection/BaseCluster.java | 34 +++++++++++-------- .../mongodb/internal/connection/Cluster.java | 2 +- .../connection/LoadBalancedCluster.java | 16 ++++----- .../connection/SingleServerCluster.java | 2 +- .../connection/SingleServerClusterTest.java | 4 +-- ...tractServerDiscoveryAndMonitoringTest.java | 5 +-- .../BaseClusterSpecification.groovy | 2 +- .../DefaultServerSpecification.groovy | 2 +- .../DnsMultiServerClusterSpecification.groovy | 1 + .../MultiServerClusterSpecification.groovy | 2 +- .../ServerDiscoveryAndMonitoringTest.java | 3 +- .../SingleServerClusterSpecification.groovy | 4 +-- 13 files changed, 44 insertions(+), 35 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java index 5b6fa8f56fe..c3c76c62e32 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java @@ -122,7 +122,7 @@ public void close() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress) { + public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { isTrue("is open", !isClosed()); ServerTuple serverTuple = addressToServerTupleMap.get(serverAddress); diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 6f2b6e1c68f..f010d4ff004 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -107,12 +107,12 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); boolean selectionFailureLogged = false; StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime); + Timeout timeout = startServerSelectionTimeout(startTime, operationContext); while (true) { CountDownLatch currentPhaseLatch = phase.get(); ClusterDescription currentDescription = description; - ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription); + ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, operationContext); throwIfIncompatible(currentDescription); if (serverTuple != null) { @@ -141,15 +141,15 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati LOGGER.trace(format("Asynchronously selecting server with selector %s", serverSelector)); } StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime); + Timeout timeout = startServerSelectionTimeout(startTime, operationContext); ServerSelectionRequest request = new ServerSelectionRequest( serverSelector, getCompositeServerSelector(serverSelector), timeout, startTime, callback); CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; - if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { - notifyWaitQueueHandler(request); + if (!handleServerSelectionRequest(request, currentPhase, currentDescription, operationContext)) { + notifyWaitQueueHandler(request, operationContext); } } @@ -218,8 +218,8 @@ private void updatePhase() { withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown()); } - private Timeout startServerSelectionTimeout(final StartTime startTime) { - long ms = settings.getServerSelectionTimeout(MILLISECONDS); + private Timeout startServerSelectionTimeout(final StartTime startTime, final OperationContext operationContext) { + long ms = operationContext.getTimeoutContext().getTimeoutSettings().getServerSelectionTimeoutMS(); return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); } @@ -231,7 +231,7 @@ private Timeout startMinWaitHeartbeatTimeout() { private boolean handleServerSelectionRequest( final ServerSelectionRequest request, final CountDownLatch currentPhase, - final ClusterDescription description) { + final ClusterDescription description, final OperationContext operationContext) { try { if (currentPhase != request.phase) { CountDownLatch prevPhase = request.phase; @@ -244,7 +244,7 @@ private boolean handleServerSelectionRequest( return true; } - ServerTuple serverTuple = selectServer(request.compositeSelector, description); + ServerTuple serverTuple = selectServer(request.compositeSelector, description, operationContext); if (serverTuple != null) { if (LOGGER.isTraceEnabled()) { LOGGER.trace(format("Asynchronously selected server %s", @@ -289,8 +289,8 @@ private void logServerSelectionFailure(final ServerSelector serverSelector, @Nullable private ServerTuple selectServer(final ServerSelector serverSelector, - final ClusterDescription clusterDescription) { - return selectServer(serverSelector, clusterDescription, this::getServer); + final ClusterDescription clusterDescription, final OperationContext operationContext) { + return selectServer(serverSelector, clusterDescription, serverAddress -> getServer(serverAddress, operationContext)); } @Nullable @@ -411,7 +411,7 @@ StartTime getStartTime() { } } - private void notifyWaitQueueHandler(final ServerSelectionRequest request) { + private void notifyWaitQueueHandler(final ServerSelectionRequest request, final OperationContext operationContext) { withLock(() -> { if (isClosed) { return; @@ -420,7 +420,7 @@ private void notifyWaitQueueHandler(final ServerSelectionRequest request) { waitQueue.add(request); if (waitQueueHandler == null) { - waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue()); + waitQueueHandler = new Thread(new WaitQueueHandler(operationContext), "cluster-" + clusterId.getValue()); waitQueueHandler.setDaemon(true); waitQueueHandler.start(); } else { @@ -438,6 +438,12 @@ private void stopWaitQueueHandler() { } private final class WaitQueueHandler implements Runnable { + private final OperationContext operationContext; + + WaitQueueHandler(final OperationContext operationContext) { + this.operationContext = operationContext; + } + public void run() { while (!isClosed) { CountDownLatch currentPhase = phase.get(); @@ -447,7 +453,7 @@ public void run() { for (Iterator iter = waitQueue.iterator(); iter.hasNext();) { ServerSelectionRequest nextRequest = iter.next(); - if (handleServerSelectionRequest(nextRequest, currentPhase, curDescription)) { + if (handleServerSelectionRequest(nextRequest, currentPhase, curDescription, operationContext)) { iter.remove(); } else { timeout = timeout diff --git a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java index a3a649b10a6..201470cc8ed 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java @@ -45,7 +45,7 @@ public interface Cluster extends Closeable { @Nullable @VisibleForTesting(otherwise = PRIVATE) - ClusterableServer getServer(ServerAddress serverAddress); + ClusterableServer getServer(ServerAddress serverAddress, OperationContext operationContext); /** * Get the current description of this cluster. diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 57d6d5cb497..ae21b937ea3 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -181,9 +181,9 @@ public ClusterId getClusterId() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress) { + public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { isTrue("open", !isClosed()); - waitForSrv(); + waitForSrv(operationContext); return assertNotNull(server); } @@ -202,7 +202,7 @@ public ClusterClock getClock() { @Override public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) { isTrue("open", !isClosed()); - waitForSrv(); + waitForSrv(operationContext); if (srvRecordResolvedToMultipleHosts) { throw createResolvedToMultipleHostsException(); } @@ -210,13 +210,13 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera } - private void waitForSrv() { + private void waitForSrv(final OperationContext operationContext) { if (initializationCompleted) { return; } Locks.withLock(lock, () -> { StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime); + Timeout timeout = startServerSelectionTimeout(startTime, operationContext); while (!initializationCompleted) { if (isClosed()) { throw createShutdownException(); @@ -236,7 +236,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati callback.onResult(null, createShutdownException()); return; } - Timeout timeout = startServerSelectionTimeout(StartTime.now()); + Timeout timeout = startServerSelectionTimeout(StartTime.now(), operationContext); ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(timeout, callback); if (initializationCompleted) { handleServerSelectionRequest(serverSelectionRequest); @@ -308,8 +308,8 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime) } } - private Timeout startServerSelectionTimeout(final StartTime startTime) { - long ms = settings.getServerSelectionTimeout(MILLISECONDS); + private Timeout startServerSelectionTimeout(final StartTime startTime, final OperationContext operationContext) { + long ms = operationContext.getTimeoutContext().getTimeoutSettings().getServerSelectionTimeoutMS(); return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java index ce76522ac1d..ff77a9a9812 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java @@ -69,7 +69,7 @@ protected void connect() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress) { + public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { isTrue("open", !isClosed()); return assertNotNull(server.get()); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java index 242bfb6bdc0..145a0b8fcc4 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java @@ -103,10 +103,10 @@ public void shouldGetServerWithOkDescription() { @Test public void shouldSuccessfullyQueryASecondaryWithPrimaryReadPreference() { // given - ServerAddress secondary = getSecondary(); + OperationContext operationContext = OPERATION_CONTEXT; + ServerAddress secondary = getSecondary(operationContext); setUpCluster(secondary); String collectionName = getClass().getName(); - OperationContext operationContext = OPERATION_CONTEXT; Connection connection = cluster.selectServer(new ServerAddressSelector(secondary), operationContext).getServer() .getConnection(operationContext); diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java index 5ac3c35d4ae..64d4fcc9468 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.connection.ServerConnectionState.CONNECTING; import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; import static com.mongodb.internal.connection.ProtocolHelper.getCommandFailureException; @@ -81,12 +82,12 @@ protected void applyResponse(final BsonArray response) { protected void applyApplicationError(final BsonDocument applicationError) { ServerAddress serverAddress = new ServerAddress(applicationError.getString("address").getValue()); int errorGeneration = applicationError.getNumber("generation", - new BsonInt32(((DefaultServer) getCluster().getServer(serverAddress)).getConnectionPool().getGeneration())).intValue(); + new BsonInt32(((DefaultServer) getCluster().getServer(serverAddress, OPERATION_CONTEXT)).getConnectionPool().getGeneration())).intValue(); int maxWireVersion = applicationError.getNumber("maxWireVersion").intValue(); String when = applicationError.getString("when").getValue(); String type = applicationError.getString("type").getValue(); - DefaultServer server = (DefaultServer) cluster.getServer(serverAddress); + DefaultServer server = (DefaultServer) cluster.getServer(serverAddress, OPERATION_CONTEXT); RuntimeException exception; switch (type) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 99587518581..8e132cb7194 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -68,7 +68,7 @@ class BaseClusterSpecification extends Specification { } @Override - ClusterableServer getServer(final ServerAddress serverAddress) { + ClusterableServer getServer(final ServerAddress serverAddress, OperationContext operationContext) { throw new UnsupportedOperationException() } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index 7c7132e2440..df6e6f31a22 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -392,7 +392,7 @@ class DefaultServerSpecification extends Specification { } @Override - ClusterableServer getServer(final ServerAddress serverAddress) { + ClusterableServer getServer(final ServerAddress serverAddress, OperationContext operationContext) { throw new UnsupportedOperationException() } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy index 2c381165acd..c681eb2bf80 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy @@ -25,6 +25,7 @@ import spock.lang.Specification import java.util.concurrent.TimeUnit +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE import static com.mongodb.connection.ClusterType.SHARDED import static com.mongodb.connection.ServerType.SHARD_ROUTER diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy index 9422c1a2452..36e2d58caa0 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy @@ -95,7 +95,7 @@ class MultiServerClusterSpecification extends Specification { cluster.close() when: - cluster.getServer(firstServer) + cluster.getServer(firstServer, OPERATION_CONTEXT) then: thrown(IllegalStateException) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java index 4af47cb9557..05ae1e5bd17 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java @@ -30,6 +30,7 @@ import java.net.URISyntaxException; import java.util.Collection; +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT; import static com.mongodb.ClusterFixture.getClusterDescription; import static com.mongodb.internal.connection.ClusterDescriptionHelper.getPrimaries; import static com.mongodb.internal.event.EventListenerHelper.NO_OP_CLUSTER_LISTENER; @@ -120,7 +121,7 @@ private void assertServer(final String serverName, final BsonDocument expectedSe if (expectedServerDescriptionDocument.isDocument("pool")) { int expectedGeneration = expectedServerDescriptionDocument.getDocument("pool").getNumber("generation").intValue(); - DefaultServer server = (DefaultServer) getCluster().getServer(new ServerAddress(serverName)); + DefaultServer server = (DefaultServer) getCluster().getServer(new ServerAddress(serverName), OPERATION_CONTEXT); assertEquals(expectedGeneration, server.getConnectionPool().getGeneration()); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy index 3d796976382..ff2acb3d0dc 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy @@ -77,7 +77,7 @@ class SingleServerClusterSpecification extends Specification { sendNotification(firstServer, STANDALONE) then: - cluster.getServer(firstServer) == factory.getServer(firstServer) + cluster.getServer(firstServer, OPERATION_CONTEXT) == factory.getServer(firstServer) cleanup: cluster?.close() @@ -91,7 +91,7 @@ class SingleServerClusterSpecification extends Specification { cluster.close() when: - cluster.getServer(firstServer) + cluster.getServer(firstServer, OPERATION_CONTEXT) then: thrown(IllegalStateException) From 72595714e38f13b017282ef2c6ffdfb5ff6b641f Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 28 Sep 2023 14:37:21 -0600 Subject: [PATCH 3/4] Move start timeout methods to TimeoutContext --- .../src/main/com/mongodb/internal/TimeoutContext.java | 6 ++++++ .../com/mongodb/internal/connection/BaseCluster.java | 9 ++------- .../internal/connection/LoadBalancedCluster.java | 10 ++-------- 3 files changed, 10 insertions(+), 15 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 990f19aa31e..0d8ccf80023 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -15,6 +15,7 @@ */ package com.mongodb.internal; +import com.mongodb.internal.time.StartTime; import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; @@ -152,4 +153,9 @@ private static Timeout calculateTimeout(@Nullable final Long timeoutMS) { } return null; } + + public Timeout startServerSelectionTimeout(final StartTime startTime) { + long ms = getTimeoutSettings().getServerSelectionTimeoutMS(); + return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index f010d4ff004..15398b6e2f1 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -107,7 +107,7 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); boolean selectionFailureLogged = false; StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime, operationContext); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); while (true) { CountDownLatch currentPhaseLatch = phase.get(); @@ -141,7 +141,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati LOGGER.trace(format("Asynchronously selecting server with selector %s", serverSelector)); } StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime, operationContext); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); ServerSelectionRequest request = new ServerSelectionRequest( serverSelector, getCompositeServerSelector(serverSelector), timeout, startTime, callback); @@ -218,11 +218,6 @@ private void updatePhase() { withLock(() -> phase.getAndSet(new CountDownLatch(1)).countDown()); } - private Timeout startServerSelectionTimeout(final StartTime startTime, final OperationContext operationContext) { - long ms = operationContext.getTimeoutContext().getTimeoutSettings().getServerSelectionTimeoutMS(); - return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); - } - private Timeout startMinWaitHeartbeatTimeout() { long minHeartbeatFrequency = serverFactory.getSettings().getMinHeartbeatFrequency(NANOSECONDS); minHeartbeatFrequency = Math.max(0, minHeartbeatFrequency); diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index ae21b937ea3..3ebe70afd97 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -65,7 +65,6 @@ import static java.lang.String.format; import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; -import static java.util.concurrent.TimeUnit.MILLISECONDS; @ThreadSafe final class LoadBalancedCluster implements Cluster { @@ -216,7 +215,7 @@ private void waitForSrv(final OperationContext operationContext) { } Locks.withLock(lock, () -> { StartTime startTime = StartTime.now(); - Timeout timeout = startServerSelectionTimeout(startTime, operationContext); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); while (!initializationCompleted) { if (isClosed()) { throw createShutdownException(); @@ -236,7 +235,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati callback.onResult(null, createShutdownException()); return; } - Timeout timeout = startServerSelectionTimeout(StartTime.now(), operationContext); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(StartTime.now()); ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(timeout, callback); if (initializationCompleted) { handleServerSelectionRequest(serverSelectionRequest); @@ -308,11 +307,6 @@ private MongoTimeoutException createTimeoutException(final StartTime startTime) } } - private Timeout startServerSelectionTimeout(final StartTime startTime, final OperationContext operationContext) { - long ms = operationContext.getTimeoutContext().getTimeoutSettings().getServerSelectionTimeoutMS(); - return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); - } - private void notifyWaitQueueHandler(final ServerSelectionRequest request) { Locks.withLock(lock, () -> { if (isClosed()) { From f8df2571ac5f89a3e1ed32985b8c9f1d3fba9ec1 Mon Sep 17 00:00:00 2001 From: Maxim Katcharov Date: Thu, 5 Oct 2023 15:06:46 -0600 Subject: [PATCH 4/4] Fix duplicated timeout, remove startTime from messages --- .../com/mongodb/internal/TimeoutContext.java | 4 +- .../AbstractMultiServerCluster.java | 3 +- .../internal/connection/BaseCluster.java | 58 ++++++++----------- .../mongodb/internal/connection/Cluster.java | 3 +- .../connection/LoadBalancedCluster.java | 33 +++++------ .../connection/SingleServerCluster.java | 3 +- .../connection/SingleServerClusterTest.java | 2 +- ...tractServerDiscoveryAndMonitoringTest.java | 6 +- .../BaseClusterSpecification.groovy | 3 +- .../DefaultServerSpecification.groovy | 3 +- .../DnsMultiServerClusterSpecification.groovy | 1 - .../connection/LoadBalancedClusterTest.java | 8 +-- .../MultiServerClusterSpecification.groovy | 2 +- .../ServerDiscoveryAndMonitoringTest.java | 4 +- .../SingleServerClusterSpecification.groovy | 5 +- 15 files changed, 67 insertions(+), 71 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 0d8ccf80023..0336f698d8f 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -154,8 +154,8 @@ private static Timeout calculateTimeout(@Nullable final Long timeoutMS) { return null; } - public Timeout startServerSelectionTimeout(final StartTime startTime) { + public Timeout startServerSelectionTimeout() { long ms = getTimeoutSettings().getServerSelectionTimeoutMS(); - return startTime.timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); + return StartTime.now().timeoutAfterOrInfiniteIfNegative(ms, MILLISECONDS); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java index c3c76c62e32..5b80906eacb 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/AbstractMultiServerCluster.java @@ -26,6 +26,7 @@ import com.mongodb.event.ServerDescriptionChangedEvent; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; +import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; import org.bson.types.ObjectId; @@ -122,7 +123,7 @@ public void close() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { + public ClusterableServer getServer(final ServerAddress serverAddress, final Timeout serverSelectionTimeout) { isTrue("is open", !isClosed()); ServerTuple serverTuple = addressToServerTupleMap.get(serverAddress); diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 15398b6e2f1..899c964def8 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -36,7 +36,6 @@ import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.selector.LatencyMinimizingServerSelector; -import com.mongodb.internal.time.StartTime; import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; import com.mongodb.selector.CompositeServerSelector; @@ -106,20 +105,19 @@ public ServerTuple selectServer(final ServerSelector serverSelector, final Opera ServerSelector compositeServerSelector = getCompositeServerSelector(serverSelector); boolean selectionFailureLogged = false; - StartTime startTime = StartTime.now(); - Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); while (true) { CountDownLatch currentPhaseLatch = phase.get(); ClusterDescription currentDescription = description; - ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, operationContext); + ServerTuple serverTuple = selectServer(compositeServerSelector, currentDescription, timeout); throwIfIncompatible(currentDescription); if (serverTuple != null) { return serverTuple; } if (timeout.hasExpired()) { - throw createTimeoutException(serverSelector, currentDescription, startTime); + throw createTimeoutException(serverSelector, currentDescription); } if (!selectionFailureLogged) { logServerSelectionFailure(serverSelector, currentDescription, timeout); @@ -140,16 +138,15 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati if (LOGGER.isTraceEnabled()) { LOGGER.trace(format("Asynchronously selecting server with selector %s", serverSelector)); } - StartTime startTime = StartTime.now(); - Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); ServerSelectionRequest request = new ServerSelectionRequest( - serverSelector, getCompositeServerSelector(serverSelector), timeout, startTime, callback); + serverSelector, getCompositeServerSelector(serverSelector), timeout, callback); CountDownLatch currentPhase = phase.get(); ClusterDescription currentDescription = description; - if (!handleServerSelectionRequest(request, currentPhase, currentDescription, operationContext)) { - notifyWaitQueueHandler(request, operationContext); + if (!handleServerSelectionRequest(request, currentPhase, currentDescription)) { + notifyWaitQueueHandler(request); } } @@ -226,7 +223,7 @@ private Timeout startMinWaitHeartbeatTimeout() { private boolean handleServerSelectionRequest( final ServerSelectionRequest request, final CountDownLatch currentPhase, - final ClusterDescription description, final OperationContext operationContext) { + final ClusterDescription description) { try { if (currentPhase != request.phase) { CountDownLatch prevPhase = request.phase; @@ -239,7 +236,7 @@ private boolean handleServerSelectionRequest( return true; } - ServerTuple serverTuple = selectServer(request.compositeSelector, description, operationContext); + ServerTuple serverTuple = selectServer(request.compositeSelector, description, request.getTimeout()); if (serverTuple != null) { if (LOGGER.isTraceEnabled()) { LOGGER.trace(format("Asynchronously selected server %s", @@ -257,8 +254,7 @@ private boolean handleServerSelectionRequest( if (LOGGER.isTraceEnabled()) { LOGGER.trace("Asynchronously failed server selection after timeout"); } - request.onResult(null, createTimeoutException( - request.originalSelector, description, request.getStartTime())); + request.onResult(null, createTimeoutException(request.originalSelector, description)); return true; } @@ -284,8 +280,11 @@ private void logServerSelectionFailure(final ServerSelector serverSelector, @Nullable private ServerTuple selectServer(final ServerSelector serverSelector, - final ClusterDescription clusterDescription, final OperationContext operationContext) { - return selectServer(serverSelector, clusterDescription, serverAddress -> getServer(serverAddress, operationContext)); + final ClusterDescription clusterDescription, final Timeout serverSelectionTimeout) { + return selectServer( + serverSelector, + clusterDescription, + serverAddress -> getServer(serverAddress, serverSelectionTimeout)); } @Nullable @@ -364,10 +363,9 @@ private MongoIncompatibleDriverException createIncompatibleException(final Clust } private MongoTimeoutException createTimeoutException(final ServerSelector serverSelector, - final ClusterDescription curDescription, final StartTime startTime) { + final ClusterDescription curDescription) { return new MongoTimeoutException(format( - "Timed out after %d ms while waiting for a server that matches %s. Client view of cluster state is %s", - startTime.elapsed().toMillis(), + "Timed out while waiting for a server that matches %s. Client view of cluster state is %s", serverSelector, curDescription.getShortDescription())); } @@ -377,15 +375,13 @@ private static final class ServerSelectionRequest { private final ServerSelector compositeSelector; private final SingleResultCallback callback; private final Timeout timeout; - private final StartTime startTime; private CountDownLatch phase; ServerSelectionRequest(final ServerSelector serverSelector, final ServerSelector compositeSelector, - final Timeout timeout, final StartTime startTime, final SingleResultCallback callback) { + final Timeout timeout, final SingleResultCallback callback) { this.originalSelector = serverSelector; this.compositeSelector = compositeSelector; this.timeout = timeout; - this.startTime = startTime; this.callback = callback; } @@ -400,13 +396,9 @@ void onResult(@Nullable final ServerTuple serverTuple, @Nullable final Throwable Timeout getTimeout() { return timeout; } - - StartTime getStartTime() { - return startTime; - } } - private void notifyWaitQueueHandler(final ServerSelectionRequest request, final OperationContext operationContext) { + private void notifyWaitQueueHandler(final ServerSelectionRequest request) { withLock(() -> { if (isClosed) { return; @@ -415,7 +407,7 @@ private void notifyWaitQueueHandler(final ServerSelectionRequest request, final waitQueue.add(request); if (waitQueueHandler == null) { - waitQueueHandler = new Thread(new WaitQueueHandler(operationContext), "cluster-" + clusterId.getValue()); + waitQueueHandler = new Thread(new WaitQueueHandler(), "cluster-" + clusterId.getValue()); waitQueueHandler.setDaemon(true); waitQueueHandler.start(); } else { @@ -433,10 +425,8 @@ private void stopWaitQueueHandler() { } private final class WaitQueueHandler implements Runnable { - private final OperationContext operationContext; - WaitQueueHandler(final OperationContext operationContext) { - this.operationContext = operationContext; + WaitQueueHandler() { } public void run() { @@ -447,12 +437,12 @@ public void run() { Timeout timeout = Timeout.infinite(); for (Iterator iter = waitQueue.iterator(); iter.hasNext();) { - ServerSelectionRequest nextRequest = iter.next(); - if (handleServerSelectionRequest(nextRequest, currentPhase, curDescription, operationContext)) { + ServerSelectionRequest currentRequest = iter.next(); + if (handleServerSelectionRequest(currentRequest, currentPhase, curDescription)) { iter.remove(); } else { timeout = timeout - .orEarlier(nextRequest.getTimeout()) + .orEarlier(currentRequest.getTimeout()) .orEarlier(startMinWaitHeartbeatTimeout()); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java index 201470cc8ed..acb251dcb56 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Cluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Cluster.java @@ -24,6 +24,7 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.connection.ClusterDescription; import com.mongodb.connection.ClusterSettings; +import com.mongodb.internal.time.Timeout; import com.mongodb.lang.Nullable; import com.mongodb.selector.ServerSelector; @@ -45,7 +46,7 @@ public interface Cluster extends Closeable { @Nullable @VisibleForTesting(otherwise = PRIVATE) - ClusterableServer getServer(ServerAddress serverAddress, OperationContext operationContext); + ClusterableServer getServer(ServerAddress serverAddress, Timeout serverSelectionTimeout); /** * Get the current description of this cluster. diff --git a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java index 3ebe70afd97..aa7d018e821 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/LoadBalancedCluster.java @@ -180,9 +180,9 @@ public ClusterId getClusterId() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { + public ClusterableServer getServer(final ServerAddress serverAddress, final Timeout serverSelectionTimeout) { isTrue("open", !isClosed()); - waitForSrv(operationContext); + waitForSrv(serverSelectionTimeout); return assertNotNull(server); } @@ -201,29 +201,27 @@ public ClusterClock getClock() { @Override public ServerTuple selectServer(final ServerSelector serverSelector, final OperationContext operationContext) { isTrue("open", !isClosed()); - waitForSrv(operationContext); + Timeout serverSelectionTimeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); + waitForSrv(serverSelectionTimeout); if (srvRecordResolvedToMultipleHosts) { throw createResolvedToMultipleHostsException(); } return new ServerTuple(assertNotNull(server), description.getServerDescriptions().get(0)); } - - private void waitForSrv(final OperationContext operationContext) { + private void waitForSrv(final Timeout serverSelectionTimeout) { if (initializationCompleted) { return; } Locks.withLock(lock, () -> { - StartTime startTime = StartTime.now(); - Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(startTime); while (!initializationCompleted) { if (isClosed()) { throw createShutdownException(); } - if (timeout.hasExpired()) { - throw createTimeoutException(startTime); + if (serverSelectionTimeout.hasExpired()) { + throw createTimeoutException(); } - timeout.awaitOn(condition, () -> format("resolving SRV records for %s", settings.getSrvHost())); + serverSelectionTimeout.awaitOn(condition, () -> format("resolving SRV records for %s", settings.getSrvHost())); } }); } @@ -235,7 +233,7 @@ public void selectServerAsync(final ServerSelector serverSelector, final Operati callback.onResult(null, createShutdownException()); return; } - Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(StartTime.now()); + Timeout timeout = operationContext.getTimeoutContext().startServerSelectionTimeout(); ServerSelectionRequest serverSelectionRequest = new ServerSelectionRequest(timeout, callback); if (initializationCompleted) { handleServerSelectionRequest(serverSelectionRequest); @@ -295,15 +293,15 @@ private MongoClientException createResolvedToMultipleHostsException() { + "to multiple hosts"); } - private MongoTimeoutException createTimeoutException(final StartTime startTime) { + private MongoTimeoutException createTimeoutException() { MongoException localSrvResolutionException = srvResolutionException; if (localSrvResolutionException == null) { - return new MongoTimeoutException(format("Timed out after %d ms while waiting to resolve SRV records for %s.", - startTime.elapsed().toMillis(), settings.getSrvHost())); + return new MongoTimeoutException(format("Timed out while waiting to resolve SRV records for %s.", + settings.getSrvHost())); } else { - return new MongoTimeoutException(format("Timed out after %d ms while waiting to resolve SRV records for %s. " + return new MongoTimeoutException(format("Timed out while waiting to resolve SRV records for %s. " + "Resolution exception was '%s'", - startTime.elapsed().toMillis(), settings.getSrvHost(), localSrvResolutionException)); + settings.getSrvHost(), localSrvResolutionException)); } } @@ -332,7 +330,6 @@ private void notifyWaitQueueHandler(final ServerSelectionRequest request) { private final class WaitQueueHandler implements Runnable { public void run() { - StartTime startTime = StartTime.now(); List timeoutList = new ArrayList<>(); while (!(isClosed() || initializationCompleted)) { lockInterruptibly(lock); @@ -363,7 +360,7 @@ public void run() { } finally { lock.unlock(); } - timeoutList.forEach(request -> request.onError(createTimeoutException(startTime))); + timeoutList.forEach(request -> request.onError(createTimeoutException())); timeoutList.clear(); } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java index ff77a9a9812..590caf3977a 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SingleServerCluster.java @@ -28,6 +28,7 @@ import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.event.ServerDescriptionChangedEvent; +import com.mongodb.internal.time.Timeout; import java.util.concurrent.atomic.AtomicReference; @@ -69,7 +70,7 @@ protected void connect() { } @Override - public ClusterableServer getServer(final ServerAddress serverAddress, final OperationContext operationContext) { + public ClusterableServer getServer(final ServerAddress serverAddress, final Timeout serverSelectionTimeout) { isTrue("open", !isClosed()); return assertNotNull(server.get()); } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java index 145a0b8fcc4..63b55fe0e2b 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/SingleServerClusterTest.java @@ -104,7 +104,7 @@ public void shouldGetServerWithOkDescription() { public void shouldSuccessfullyQueryASecondaryWithPrimaryReadPreference() { // given OperationContext operationContext = OPERATION_CONTEXT; - ServerAddress secondary = getSecondary(operationContext); + ServerAddress secondary = getSecondary(); setUpCluster(secondary); String collectionName = getClass().getName(); Connection connection = cluster.selectServer(new ServerAddressSelector(secondary), operationContext).getServer() diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java index 64d4fcc9468..16047ad9eab 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/AbstractServerDiscoveryAndMonitoringTest.java @@ -28,6 +28,7 @@ import com.mongodb.connection.ServerType; import com.mongodb.event.ClusterListener; import com.mongodb.internal.connection.SdamServerDescriptionManager.SdamIssue; +import com.mongodb.internal.time.Timeout; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonInt32; @@ -80,14 +81,15 @@ protected void applyResponse(final BsonArray response) { } protected void applyApplicationError(final BsonDocument applicationError) { + Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().startServerSelectionTimeout(); ServerAddress serverAddress = new ServerAddress(applicationError.getString("address").getValue()); int errorGeneration = applicationError.getNumber("generation", - new BsonInt32(((DefaultServer) getCluster().getServer(serverAddress, OPERATION_CONTEXT)).getConnectionPool().getGeneration())).intValue(); + new BsonInt32(((DefaultServer) getCluster().getServer(serverAddress, serverSelectionTimeout)).getConnectionPool().getGeneration())).intValue(); int maxWireVersion = applicationError.getNumber("maxWireVersion").intValue(); String when = applicationError.getString("when").getValue(); String type = applicationError.getString("type").getValue(); - DefaultServer server = (DefaultServer) cluster.getServer(serverAddress, OPERATION_CONTEXT); + DefaultServer server = (DefaultServer) cluster.getServer(serverAddress, serverSelectionTimeout); RuntimeException exception; switch (type) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy index 8e132cb7194..54ceba271f7 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/BaseClusterSpecification.groovy @@ -34,6 +34,7 @@ import com.mongodb.event.ServerDescriptionChangedEvent import com.mongodb.internal.selector.ReadPreferenceServerSelector import com.mongodb.internal.selector.ServerAddressSelector import com.mongodb.internal.selector.WritableServerSelector +import com.mongodb.internal.time.Timeout import spock.lang.Specification import util.spock.annotations.Slow @@ -68,7 +69,7 @@ class BaseClusterSpecification extends Specification { } @Override - ClusterableServer getServer(final ServerAddress serverAddress, OperationContext operationContext) { + ClusterableServer getServer(final ServerAddress serverAddress, Timeout serverSelectionTimeout) { throw new UnsupportedOperationException() } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy index df6e6f31a22..b7c8a8e39f1 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerSpecification.groovy @@ -39,6 +39,7 @@ import com.mongodb.event.ServerListener import com.mongodb.internal.async.SingleResultCallback import com.mongodb.internal.inject.SameObjectProvider import com.mongodb.internal.session.SessionContext +import com.mongodb.internal.time.Timeout import com.mongodb.internal.validator.NoOpFieldNameValidator import org.bson.BsonDocument import org.bson.BsonInt32 @@ -392,7 +393,7 @@ class DefaultServerSpecification extends Specification { } @Override - ClusterableServer getServer(final ServerAddress serverAddress, OperationContext operationContext) { + ClusterableServer getServer(final ServerAddress serverAddress, Timeout serverSelectionTimeout) { throw new UnsupportedOperationException() } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy index c681eb2bf80..2c381165acd 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DnsMultiServerClusterSpecification.groovy @@ -25,7 +25,6 @@ import spock.lang.Specification import java.util.concurrent.TimeUnit -import static com.mongodb.ClusterFixture.OPERATION_CONTEXT import static com.mongodb.connection.ClusterConnectionMode.MULTIPLE import static com.mongodb.connection.ClusterType.SHARDED import static com.mongodb.connection.ServerType.SHARD_ROUTER diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java index 03ca8f92cb5..f96abc0f2e8 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/LoadBalancedClusterTest.java @@ -234,7 +234,7 @@ public void shouldTimeoutSelectServerWhenThereIsSRVLookup() { MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, () -> cluster.selectServer(mock(ServerSelector.class), OPERATION_CONTEXT)); - assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("while waiting to resolve SRV records for foo.bar.com")); } @Test @@ -262,7 +262,7 @@ public void shouldTimeoutSelectServerWhenThereIsSRVLookupException() { MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, () -> cluster.selectServer(mock(ServerSelector.class), OPERATION_CONTEXT)); - assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("while waiting to resolve SRV records for foo.bar.com")); assertTrue(exception.getMessage().contains("Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'")); } @@ -292,7 +292,7 @@ public void shouldTimeoutSelectServerAsynchronouslyWhenThereIsSRVLookup() { cluster.selectServerAsync(mock(ServerSelector.class), OPERATION_CONTEXT, callback); MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, callback::get); - assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("while waiting to resolve SRV records for foo.bar.com")); } @Test @@ -321,7 +321,7 @@ public void shouldTimeoutSelectServerAsynchronouslyWhenThereIsSRVLookupException cluster.selectServerAsync(mock(ServerSelector.class), OPERATION_CONTEXT, callback); MongoTimeoutException exception = assertThrows(MongoTimeoutException.class, callback::get); - assertTrue(exception.getMessage().contains("ms while waiting to resolve SRV records for foo.bar.com")); + assertTrue(exception.getMessage().contains("while waiting to resolve SRV records for foo.bar.com")); assertTrue(exception.getMessage().contains("Resolution exception was 'com.mongodb.MongoConfigurationException: Unable to resolve SRV record'")); } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy index 36e2d58caa0..17b91599f59 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/MultiServerClusterSpecification.groovy @@ -95,7 +95,7 @@ class MultiServerClusterSpecification extends Specification { cluster.close() when: - cluster.getServer(firstServer, OPERATION_CONTEXT) + cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().startServerSelectionTimeout()) then: thrown(IllegalStateException) diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java index 05ae1e5bd17..bc76cb96be6 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/ServerDiscoveryAndMonitoringTest.java @@ -19,6 +19,7 @@ import com.mongodb.ServerAddress; import com.mongodb.connection.ClusterType; import com.mongodb.connection.ServerDescription; +import com.mongodb.internal.time.Timeout; import org.bson.BsonDocument; import org.bson.BsonNull; import org.bson.BsonValue; @@ -121,7 +122,8 @@ private void assertServer(final String serverName, final BsonDocument expectedSe if (expectedServerDescriptionDocument.isDocument("pool")) { int expectedGeneration = expectedServerDescriptionDocument.getDocument("pool").getNumber("generation").intValue(); - DefaultServer server = (DefaultServer) getCluster().getServer(new ServerAddress(serverName), OPERATION_CONTEXT); + Timeout serverSelectionTimeout = OPERATION_CONTEXT.getTimeoutContext().startServerSelectionTimeout(); + DefaultServer server = (DefaultServer) getCluster().getServer(new ServerAddress(serverName), serverSelectionTimeout); assertEquals(expectedGeneration, server.getConnectionPool().getGeneration()); } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy index ff2acb3d0dc..0896bf6c6ef 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/SingleServerClusterSpecification.groovy @@ -77,7 +77,8 @@ class SingleServerClusterSpecification extends Specification { sendNotification(firstServer, STANDALONE) then: - cluster.getServer(firstServer, OPERATION_CONTEXT) == factory.getServer(firstServer) + cluster.getServer(firstServer, + OPERATION_CONTEXT.getTimeoutContext().startServerSelectionTimeout()) == factory.getServer(firstServer) cleanup: cluster?.close() @@ -91,7 +92,7 @@ class SingleServerClusterSpecification extends Specification { cluster.close() when: - cluster.getServer(firstServer, OPERATION_CONTEXT) + cluster.getServer(firstServer, OPERATION_CONTEXT.getTimeoutContext().startServerSelectionTimeout()) then: thrown(IllegalStateException)