diff --git a/driver-core/src/main/com/mongodb/event/ConnectionCheckOutFailedEvent.java b/driver-core/src/main/com/mongodb/event/ConnectionCheckOutFailedEvent.java index 33aff9ae414..3d4ad307908 100644 --- a/driver-core/src/main/com/mongodb/event/ConnectionCheckOutFailedEvent.java +++ b/driver-core/src/main/com/mongodb/event/ConnectionCheckOutFailedEvent.java @@ -53,19 +53,36 @@ public enum Reason { } private final ServerId serverId; + private final long operationId; + private final Reason reason; /** * Construct an instance * * @param serverId the server id + * @param operationId the operation id * @param reason the reason the connection check out failed + * @since 4.10 */ - public ConnectionCheckOutFailedEvent(final ServerId serverId, final Reason reason) { + public ConnectionCheckOutFailedEvent(final ServerId serverId, final long operationId, final Reason reason) { this.serverId = notNull("serverId", serverId); + this.operationId = operationId; this.reason = notNull("reason", reason); } + /** + * Construct an instance + * + * @param serverId the server id + * @param reason the reason the connection check out failed + * @deprecated Prefer {@link #ConnectionCheckOutFailedEvent(ServerId, long, Reason)} + */ + @Deprecated + public ConnectionCheckOutFailedEvent(final ServerId serverId, final Reason reason) { + this(serverId, -1, reason); + } + /** * Gets the server id * @@ -75,6 +92,16 @@ public ServerId getServerId() { return serverId; } + /** + * Gets the operation identifier + * + * @return the operation identifier + * @since 4.10 + */ + public long getOperationId() { + return operationId; + } + /** * Gets the reason for the check out failure. * @@ -90,6 +117,7 @@ public String toString() { return "ConnectionCheckOutFailedEvent{" + "server=" + serverId.getAddress() + ", clusterId=" + serverId.getClusterId() + + ", operationId=" + operationId + ", reason=" + reason + '}'; } diff --git a/driver-core/src/main/com/mongodb/event/ConnectionCheckOutStartedEvent.java b/driver-core/src/main/com/mongodb/event/ConnectionCheckOutStartedEvent.java index f7f1bfdf8e9..3c8cef9e0ca 100644 --- a/driver-core/src/main/com/mongodb/event/ConnectionCheckOutStartedEvent.java +++ b/driver-core/src/main/com/mongodb/event/ConnectionCheckOutStartedEvent.java @@ -27,14 +27,29 @@ */ public final class ConnectionCheckOutStartedEvent { private final ServerId serverId; + private final long operationId; /** * Construct an instance * * @param serverId the server id + * @param operationId the operation id + * @since 4.10 */ - public ConnectionCheckOutStartedEvent(final ServerId serverId) { + public ConnectionCheckOutStartedEvent(final ServerId serverId, final long operationId) { this.serverId = notNull("serverId", serverId); + this.operationId = operationId; + } + + /** + * Construct an instance + * + * @param serverId the server id + * @deprecated Prefer {@link ConnectionCheckOutStartedEvent#ConnectionCheckOutStartedEvent(ServerId, long)} + */ + @Deprecated + public ConnectionCheckOutStartedEvent(final ServerId serverId) { + this(serverId, -1); } /** @@ -46,11 +61,22 @@ public ServerId getServerId() { return serverId; } + /** + * Gets the operation identifier + * + * @return the operation identifier + * @since 4.10 + */ + public long getOperationId() { + return operationId; + } + @Override public String toString() { return "ConnectionCheckOutStartedEvent{" + "server=" + serverId.getAddress() + ", clusterId=" + serverId.getClusterId() + + ", operationId=" + operationId + '}'; } } diff --git a/driver-core/src/main/com/mongodb/event/ConnectionCheckedInEvent.java b/driver-core/src/main/com/mongodb/event/ConnectionCheckedInEvent.java index 37e1b2a4aa8..aa72f0a1c2b 100644 --- a/driver-core/src/main/com/mongodb/event/ConnectionCheckedInEvent.java +++ b/driver-core/src/main/com/mongodb/event/ConnectionCheckedInEvent.java @@ -27,14 +27,30 @@ */ public final class ConnectionCheckedInEvent { private final ConnectionId connectionId; + private final long operationId; + /** * Construct an instance * * @param connectionId the connectionId + * @param operationId the operation id + * @since 4.10 */ - public ConnectionCheckedInEvent(final ConnectionId connectionId) { + public ConnectionCheckedInEvent(final ConnectionId connectionId, final long operationId) { this.connectionId = notNull("connectionId", connectionId); + this.operationId = operationId; + } + + /** + * Construct an instance + * + * @param connectionId the connectionId + * @deprecated Prefer {@link #ConnectionCheckedInEvent(ConnectionId, long)} + */ + @Deprecated + public ConnectionCheckedInEvent(final ConnectionId connectionId) { + this(connectionId, -1); } /** @@ -46,12 +62,23 @@ public ConnectionId getConnectionId() { return connectionId; } + /** + * Gets the operation identifier + * + * @return the operation identifier + * @since 4.10 + */ + public long getOperationId() { + return operationId; + } + @Override public String toString() { return "ConnectionCheckedInEvent{" + "connectionId=" + connectionId + ", server=" + connectionId.getServerId().getAddress() + ", clusterId=" + connectionId.getServerId().getClusterId() + + ", operationId=" + operationId + '}'; } } diff --git a/driver-core/src/main/com/mongodb/event/ConnectionCheckedOutEvent.java b/driver-core/src/main/com/mongodb/event/ConnectionCheckedOutEvent.java index 5ba6ff3f3f4..a9ddbc59d35 100644 --- a/driver-core/src/main/com/mongodb/event/ConnectionCheckedOutEvent.java +++ b/driver-core/src/main/com/mongodb/event/ConnectionCheckedOutEvent.java @@ -27,14 +27,29 @@ */ public final class ConnectionCheckedOutEvent { private final ConnectionId connectionId; + private final long operationId; /** * Construct an instance * * @param connectionId the connectionId + * @param operationId the operation id + * @since 4.10 */ - public ConnectionCheckedOutEvent(final ConnectionId connectionId) { + public ConnectionCheckedOutEvent(final ConnectionId connectionId, final long operationId) { this.connectionId = notNull("connectionId", connectionId); + this.operationId = operationId; + } + + /** + * Construct an instance + * + * @param connectionId the connectionId + * @deprecated Prefer {@link #ConnectionCheckedOutEvent(ConnectionId, long)} + */ + @Deprecated + public ConnectionCheckedOutEvent(final ConnectionId connectionId) { + this(connectionId, -1); } /** @@ -46,12 +61,23 @@ public ConnectionId getConnectionId() { return connectionId; } + /** + * Gets the operation identifier + * + * @return the operation identifier + * @since 4.10 + */ + public long getOperationId() { + return operationId; + } + @Override public String toString() { return "ConnectionCheckedOutEvent{" + "connectionId=" + connectionId + ", server=" + connectionId.getServerId().getAddress() + ", clusterId=" + connectionId.getServerId().getClusterId() + + ", operationId=" + operationId + '}'; } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java index a5d55f8e930..5264f2bd548 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultConnectionPool.java @@ -168,7 +168,7 @@ public InternalConnection get(final OperationContext operationContext) { @Override public InternalConnection get(final OperationContext operationContext, final long timeoutValue, final TimeUnit timeUnit) { - connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId)); + connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId, operationContext.getId())); Timeout timeout = Timeout.startNow(timeoutValue, timeUnit); try { stateAndGeneration.throwIfClosedOrPaused(); @@ -176,10 +176,11 @@ public InternalConnection get(final OperationContext operationContext, final lon if (!connection.opened()) { connection = openConcurrencyLimiter.openOrGetAvailable(connection, timeout); } - connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection))); + connection.checkedOutForOperation(operationContext); + connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection), operationContext.getId())); return connection; } catch (Exception e) { - throw (RuntimeException) checkOutFailed(e); + throw (RuntimeException) checkOutFailed(e, operationContext); } } @@ -188,15 +189,16 @@ public void getAsync(final OperationContext operationContext, final SingleResult if (LOGGER.isTraceEnabled()) { LOGGER.trace(format("Asynchronously getting a connection from the pool for server %s", serverId)); } - connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId)); + connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId, operationContext.getId())); Timeout timeout = Timeout.startNow(settings.getMaxWaitTime(NANOSECONDS)); - SingleResultCallback eventSendingCallback = (result, failure) -> { + SingleResultCallback eventSendingCallback = (connection, failure) -> { SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (failure == null) { - connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(result))); - errHandlingCallback.onResult(result, null); + connection.checkedOutForOperation(operationContext); + connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection), operationContext.getId())); + errHandlingCallback.onResult(connection, null); } else { - errHandlingCallback.onResult(null, checkOutFailed(failure)); + errHandlingCallback.onResult(null, checkOutFailed(failure, operationContext)); } }; try { @@ -238,20 +240,22 @@ public void getAsync(final OperationContext operationContext, final SingleResult * and returns {@code t} if it is not {@link MongoOpenConnectionInternalException}, * or returns {@code t.}{@linkplain MongoOpenConnectionInternalException#getCause() getCause()} otherwise. */ - private Throwable checkOutFailed(final Throwable t) { + private Throwable checkOutFailed(final Throwable t, final OperationContext operationContext) { Throwable result = t; + Reason reason; if (t instanceof MongoTimeoutException) { - connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.TIMEOUT)); + reason = Reason.TIMEOUT; } else if (t instanceof MongoOpenConnectionInternalException) { - connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.CONNECTION_ERROR)); + reason = Reason.CONNECTION_ERROR; result = t.getCause(); } else if (t instanceof MongoConnectionPoolClearedException) { - connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.CONNECTION_ERROR)); + reason = Reason.CONNECTION_ERROR; } else if (ConcurrentPool.isPoolClosedException(t)) { - connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.POOL_CLOSED)); + reason = Reason.POOL_CLOSED; } else { - connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.UNKNOWN)); + reason = Reason.UNKNOWN; } + connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, operationContext.getId(), reason)); return result; } @@ -516,6 +520,7 @@ private class PooledConnection implements InternalConnection { private final UsageTrackingInternalConnection wrapped; private final AtomicBoolean isClosed = new AtomicBoolean(); private Connection.PinningMode pinningMode; + private OperationContext operationContext; PooledConnection(final UsageTrackingInternalConnection wrapped) { this.wrapped = notNull("wrapped", wrapped); @@ -526,6 +531,13 @@ public int getGeneration() { return wrapped.getGeneration(); } + /** + * Associates this with the operation context and establishes the checked out start time + */ + public void checkedOutForOperation(final OperationContext operationContext) { + this.operationContext = operationContext; + } + @Override public void open() { assertFalse(isClosed.get()); @@ -559,7 +571,7 @@ public void close() { // All but the first call is a no-op if (!isClosed.getAndSet(true)) { unmarkAsPinned(); - connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped))); + connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped), operationContext.getId())); if (LOGGER.isTraceEnabled()) { LOGGER.trace(format("Checked in connection [%s] to server %s", getId(wrapped), serverId.getAddress())); } @@ -731,7 +743,7 @@ public ServerDescription getInitialServerDescription() { /** * This internal exception is used to express an exceptional situation encountered when opening a connection. * It exists because it allows consolidating the code that sends events for exceptional situations in a - * {@linkplain #checkOutFailed(Throwable) single place}, it must not be observable by an external code. + * {@linkplain #checkOutFailed(Throwable, OperationContext) single place}, it must not be observable by an external code. */ private static final class MongoOpenConnectionInternalException extends RuntimeException { private static final long serialVersionUID = 1; @@ -919,7 +931,7 @@ private PooledConnection openWithConcurrencyLimit(final PooledConnection connect * */ void openAsyncWithConcurrencyLimit( - final PooledConnection connection, final Timeout timeout, final SingleResultCallback callback) { + final PooledConnection connection, final Timeout timeout, final SingleResultCallback callback) { PooledConnection availableConnection; try {//phase one availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, timeout);