Skip to content

Commit

Permalink
Support the serverMonitoringMode connection string option
Browse files Browse the repository at this point in the history
This change is with accordance to source/uri-options/uri-options.rst.

JAVA-4936
  • Loading branch information
stIncMale committed Feb 21, 2024
1 parent 63562c6 commit b139199
Show file tree
Hide file tree
Showing 10 changed files with 420 additions and 25 deletions.
33 changes: 31 additions & 2 deletions driver-core/src/main/com/mongodb/ConnectionString.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.mongodb.connection.ClusterSettings;
import com.mongodb.connection.ConnectionPoolSettings;
import com.mongodb.connection.ServerMonitoringMode;
import com.mongodb.connection.ServerSettings;
import com.mongodb.connection.SocketSettings;
import com.mongodb.event.ConnectionCheckOutStartedEvent;
import com.mongodb.event.ConnectionCheckedInEvent;
Expand Down Expand Up @@ -111,6 +113,13 @@
* <ul>
* <li>{@code heartbeatFrequencyMS=ms}: The frequency that the driver will attempt to determine the current state of each server in the
* cluster.</li>
* <li>{@code serverMonitoringMode=enum}: The server monitoring mode, which defines the monitoring protocol to use. Enumerated values:
* <ul>
* <li>{@code stream};</li>
* <li>{@code poll};</li>
* <li>{@code auto} - the default.</li>
* </ul>
* </li>
* </ul>
* <p>Replica set configuration:</p>
* <ul>
Expand Down Expand Up @@ -307,6 +316,7 @@ public class ConnectionString {
private Integer serverSelectionTimeout;
private Integer localThreshold;
private Integer heartbeatFrequency;
private ServerMonitoringMode serverMonitoringMode;
private String applicationName;
private List<MongoCompressor> compressorList;
private UuidRepresentation uuidRepresentation;
Expand Down Expand Up @@ -529,6 +539,7 @@ public ConnectionString(final String connectionString, @Nullable final DnsClient
GENERAL_OPTIONS_KEYS.add("serverselectiontimeoutms");
GENERAL_OPTIONS_KEYS.add("localthresholdms");
GENERAL_OPTIONS_KEYS.add("heartbeatfrequencyms");
GENERAL_OPTIONS_KEYS.add("servermonitoringmode");
GENERAL_OPTIONS_KEYS.add("retrywrites");
GENERAL_OPTIONS_KEYS.add("retryreads");

Expand Down Expand Up @@ -665,6 +676,9 @@ private void translateOptions(final Map<String, List<String>> optionsMap) {
case "heartbeatfrequencyms":
heartbeatFrequency = parseInteger(value, "heartbeatfrequencyms");
break;
case "servermonitoringmode":
serverMonitoringMode = ServerMonitoringMode.fromString(value);
break;
case "appname":
applicationName = value;
break;
Expand Down Expand Up @@ -1623,6 +1637,20 @@ public Integer getHeartbeatFrequency() {
return heartbeatFrequency;
}

/**
* The server monitoring mode, which defines the monitoring protocol to use.
* <p>
* Default is {@link ServerMonitoringMode#AUTO}.</p>
*
* @return The {@link ServerMonitoringMode}, or {@code null} if unset and the default is to be used.
* @see ServerSettings#getServerMonitoringMode()
* @since 5.1
*/
@Nullable
public ServerMonitoringMode getServerMonitoringMode() {
return serverMonitoringMode;
}

/**
* Gets the logical name of the application. The application name may be used by the client to identify the application to the server,
* for use in server logs, slow query logs, and profile collection.
Expand Down Expand Up @@ -1704,6 +1732,7 @@ public boolean equals(final Object o) {
&& Objects.equals(serverSelectionTimeout, that.serverSelectionTimeout)
&& Objects.equals(localThreshold, that.localThreshold)
&& Objects.equals(heartbeatFrequency, that.heartbeatFrequency)
&& Objects.equals(serverMonitoringMode, that.serverMonitoringMode)
&& Objects.equals(applicationName, that.applicationName)
&& Objects.equals(compressorList, that.compressorList)
&& Objects.equals(uuidRepresentation, that.uuidRepresentation)
Expand All @@ -1717,7 +1746,7 @@ public int hashCode() {
writeConcern, retryWrites, retryReads, readConcern, minConnectionPoolSize, maxConnectionPoolSize, maxWaitTime,
maxConnectionIdleTime, maxConnectionLifeTime, maxConnecting, connectTimeout, socketTimeout, sslEnabled,
sslInvalidHostnameAllowed, requiredReplicaSetName, serverSelectionTimeout, localThreshold, heartbeatFrequency,
applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost, proxyPort,
proxyUsername, proxyPassword);
serverMonitoringMode, applicationName, compressorList, uuidRepresentation, srvServiceName, srvMaxHosts, proxyHost,
proxyPort, proxyUsername, proxyPassword);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2008-present MongoDB, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.mongodb.connection;

import static com.mongodb.assertions.Assertions.notNull;
import static java.lang.String.format;

/**
* The server monitoring mode, which defines the monitoring protocol to use.
*
* @since 5.1
*/
public enum ServerMonitoringMode {
/**
* Use the streaming protocol whe the server supports it or fall back to the polling protocol otherwise.
*/
STREAM("stream"),
/**
* Use the polling protocol.
*/
POLL("poll"),
/**
* Behave the same as {@link #POLL} if running in a FaaS environment, otherwise behave as {@link #STREAM}.
* This is the default.
*/
AUTO("auto");

private final String value;

ServerMonitoringMode(final String value) {
this.value = value;
}

/**
* Parses a string into {@link ServerMonitoringMode}.
*
* @param serverMonitoringMode A server monitoring mode string.
* @return The corresponding {@link ServerMonitoringMode} value.
* @see #getValue()
*/
public static ServerMonitoringMode fromString(final String serverMonitoringMode) {
notNull("serverMonitoringMode", serverMonitoringMode);
for (ServerMonitoringMode mode : ServerMonitoringMode.values()) {
if (serverMonitoringMode.equalsIgnoreCase(mode.value)) {
return mode;
}
}
throw new IllegalArgumentException(format("'%s' is not a valid %s",
serverMonitoringMode, ServerMonitoringMode.class.getSimpleName()));
}

/**
* The string value.
*
* @return The string value.
* @see #fromString(String)
*/
public String getValue() {
return value;
}
}
72 changes: 49 additions & 23 deletions driver-core/src/main/com/mongodb/connection/ServerSettings.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.notNull;
Expand All @@ -38,6 +39,7 @@
public class ServerSettings {
private final long heartbeatFrequencyMS;
private final long minHeartbeatFrequencyMS;
private final ServerMonitoringMode serverMonitoringMode;
private final List<ServerListener> serverListeners;
private final List<ServerMonitorListener> serverMonitorListeners;

Expand Down Expand Up @@ -68,6 +70,7 @@ public static Builder builder(final ServerSettings serverSettings) {
public static final class Builder {
private long heartbeatFrequencyMS = 10000;
private long minHeartbeatFrequencyMS = 500;
private ServerMonitoringMode serverMonitoringMode = ServerMonitoringMode.AUTO;
private List<ServerListener> serverListeners = new ArrayList<>();
private List<ServerMonitorListener> serverMonitorListeners = new ArrayList<>();

Expand All @@ -87,6 +90,7 @@ public Builder applySettings(final ServerSettings serverSettings) {
notNull("serverSettings", serverSettings);
heartbeatFrequencyMS = serverSettings.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = serverSettings.minHeartbeatFrequencyMS;
serverMonitoringMode = serverSettings.serverMonitoringMode;
serverListeners = new ArrayList<>(serverSettings.serverListeners);
serverMonitorListeners = new ArrayList<>(serverSettings.serverMonitorListeners);
return this;
Expand Down Expand Up @@ -117,6 +121,20 @@ public Builder minHeartbeatFrequency(final long minHeartbeatFrequency, final Tim
return this;
}

/**
* Sets the server monitoring mode, which defines the monitoring protocol to use.
* The default value is {@link ServerMonitoringMode#AUTO}.
*
* @param serverMonitoringMode The {@link ServerMonitoringMode}.
* @return {@code this}.
* @see #getServerMonitoringMode()
* @since 5.1
*/
public Builder serverMonitoringMode(final ServerMonitoringMode serverMonitoringMode) {
this.serverMonitoringMode = notNull("serverMonitoringMode", serverMonitoringMode);
return this;
}

/**
* Add a server listener.
*
Expand Down Expand Up @@ -181,6 +199,10 @@ public Builder applyConnectionString(final ConnectionString connectionString) {
if (heartbeatFrequency != null) {
heartbeatFrequencyMS = heartbeatFrequency;
}
ServerMonitoringMode serverMonitoringMode = connectionString.getServerMonitoringMode();
if (serverMonitoringMode != null) {
this.serverMonitoringMode = serverMonitoringMode;
}
return this;
}

Expand Down Expand Up @@ -215,6 +237,19 @@ public long getMinHeartbeatFrequency(final TimeUnit timeUnit) {
return timeUnit.convert(minHeartbeatFrequencyMS, TimeUnit.MILLISECONDS);
}

/**
* Gets the server monitoring mode, which defines the monitoring protocol to use.
* The default value is {@link ServerMonitoringMode#AUTO}.
*
* @return The {@link ServerMonitoringMode}.
* @see Builder#serverMonitoringMode(ServerMonitoringMode)
* @see ConnectionString#getServerMonitoringMode()
* @since 5.1
*/
public ServerMonitoringMode getServerMonitoringMode() {
return serverMonitoringMode;
}

/**
* Gets the server listeners. The default value is an empty list.
*
Expand Down Expand Up @@ -243,40 +278,30 @@ public boolean equals(final Object o) {
if (o == null || getClass() != o.getClass()) {
return false;
}

ServerSettings that = (ServerSettings) o;

if (heartbeatFrequencyMS != that.heartbeatFrequencyMS) {
return false;
}
if (minHeartbeatFrequencyMS != that.minHeartbeatFrequencyMS) {
return false;
}

if (!serverListeners.equals(that.serverListeners)) {
return false;
}
if (!serverMonitorListeners.equals(that.serverMonitorListeners)) {
return false;
}

return true;
final ServerSettings that = (ServerSettings) o;
return heartbeatFrequencyMS == that.heartbeatFrequencyMS
&& minHeartbeatFrequencyMS == that.minHeartbeatFrequencyMS
&& serverMonitoringMode == that.serverMonitoringMode
&& Objects.equals(serverListeners, that.serverListeners)
&& Objects.equals(serverMonitorListeners, that.serverMonitorListeners);
}

@Override
public int hashCode() {
int result = (int) (heartbeatFrequencyMS ^ (heartbeatFrequencyMS >>> 32));
result = 31 * result + (int) (minHeartbeatFrequencyMS ^ (minHeartbeatFrequencyMS >>> 32));
result = 31 * result + serverListeners.hashCode();
result = 31 * result + serverMonitorListeners.hashCode();
return result;
return Objects.hash(
heartbeatFrequencyMS,
minHeartbeatFrequencyMS,
serverMonitoringMode,
serverListeners,
serverMonitorListeners);
}

@Override
public String toString() {
return "ServerSettings{"
+ "heartbeatFrequencyMS=" + heartbeatFrequencyMS
+ ", minHeartbeatFrequencyMS=" + minHeartbeatFrequencyMS
+ ", serverMonitoringMode=" + serverMonitoringMode
+ ", serverListeners='" + serverListeners + '\''
+ ", serverMonitorListeners='" + serverMonitorListeners + '\''
+ '}';
Expand All @@ -285,6 +310,7 @@ public String toString() {
ServerSettings(final Builder builder) {
heartbeatFrequencyMS = builder.heartbeatFrequencyMS;
minHeartbeatFrequencyMS = builder.minHeartbeatFrequencyMS;
serverMonitoringMode = builder.serverMonitoringMode;
serverListeners = unmodifiableList(builder.serverListeners);
serverMonitorListeners = unmodifiableList(builder.serverMonitorListeners);
}
Expand Down
46 changes: 46 additions & 0 deletions driver-core/src/test/resources/uri-options/sdam-options.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
{
"tests": [
{
"description": "serverMonitoringMode=auto",
"uri": "mongodb://example.com/?serverMonitoringMode=auto",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "auto"
}
},
{
"description": "serverMonitoringMode=stream",
"uri": "mongodb://example.com/?serverMonitoringMode=stream",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "stream"
}
},
{
"description": "serverMonitoringMode=poll",
"uri": "mongodb://example.com/?serverMonitoringMode=poll",
"valid": true,
"warning": false,
"hosts": null,
"auth": null,
"options": {
"serverMonitoringMode": "poll"
}
},
{
"description": "invalid serverMonitoringMode",
"uri": "mongodb://example.com/?serverMonitoringMode=invalid",
"valid": true,
"warning": true,
"hosts": null,
"auth": null,
"options": {}
}
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,9 @@ protected void testValidOptions() {
} else if (option.getKey().equalsIgnoreCase("heartbeatfrequencyms")) {
int expected = option.getValue().asInt32().getValue();
assertEquals(expected, connectionString.getHeartbeatFrequency().intValue());
} else if (option.getKey().equalsIgnoreCase("servermonitoringmode")) {
String expected = option.getValue().asString().getValue();
assertEquals(expected, connectionString.getServerMonitoringMode().getValue());
} else if (option.getKey().equalsIgnoreCase("localthresholdms")) {
int expected = option.getValue().asInt32().getValue();
assertEquals(expected, connectionString.getLocalThreshold().intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ import static com.mongodb.ReadPreference.secondaryPreferred
import static java.util.Arrays.asList
import static java.util.concurrent.TimeUnit.MILLISECONDS

/**
* Update {@link ConnectionStringUnitTest} instead.
*/
class ConnectionStringSpecification extends Specification {
static final LONG_STRING = new String((1..256).collect { (byte) 1 } as byte[])

Expand Down
Loading

0 comments on commit b139199

Please sign in to comment.