Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add operation id and elapsed time to connection check out/in events #1105

Merged
merged 4 commits into from
Apr 25, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,36 @@ public enum Reason {
}

private final ServerId serverId;
private final long operationId;

private final Reason reason;

/**
* Construct an instance
*
* @param serverId the server id
* @param operationId the operation id
* @param reason the reason the connection check out failed
* @since 4.10
*/
public ConnectionCheckOutFailedEvent(final ServerId serverId, final Reason reason) {
public ConnectionCheckOutFailedEvent(final ServerId serverId, final long operationId, final Reason reason) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
this.serverId = notNull("serverId", serverId);
this.operationId = operationId;
this.reason = notNull("reason", reason);
}

/**
* Construct an instance
*
* @param serverId the server id
* @param reason the reason the connection check out failed
* @deprecated Prefer {@link #ConnectionCheckOutFailedEvent(ServerId, long, Reason)}
*/
@Deprecated
public ConnectionCheckOutFailedEvent(final ServerId serverId, final Reason reason) {
this(serverId, -1, reason);
}

/**
* Gets the server id
*
Expand All @@ -75,6 +92,16 @@ public ServerId getServerId() {
return serverId;
}

/**
* Gets the operation identifier
*
* @return the operation identifier
* @since 4.10
*/
public long getOperationId() {
return operationId;
}
stIncMale marked this conversation as resolved.
Show resolved Hide resolved

/**
* Gets the reason for the check out failure.
*
Expand All @@ -90,6 +117,7 @@ public String toString() {
return "ConnectionCheckOutFailedEvent{"
+ "server=" + serverId.getAddress()
+ ", clusterId=" + serverId.getClusterId()
+ ", operationId=" + operationId
+ ", reason=" + reason
+ '}';
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,29 @@
*/
public final class ConnectionCheckOutStartedEvent {
private final ServerId serverId;
private final long operationId;

/**
* Construct an instance
*
* @param serverId the server id
* @param operationId the operation id
* @since 4.10
*/
public ConnectionCheckOutStartedEvent(final ServerId serverId) {
public ConnectionCheckOutStartedEvent(final ServerId serverId, final long operationId) {
stIncMale marked this conversation as resolved.
Show resolved Hide resolved
this.serverId = notNull("serverId", serverId);
this.operationId = operationId;
}

/**
* Construct an instance
*
* @param serverId the server id
* @deprecated Prefer {@link ConnectionCheckOutStartedEvent#ConnectionCheckOutStartedEvent(ServerId, long)}
*/
@Deprecated
public ConnectionCheckOutStartedEvent(final ServerId serverId) {
this(serverId, -1);
}

/**
Expand All @@ -46,11 +61,22 @@ public ServerId getServerId() {
return serverId;
}

/**
* Gets the operation identifier
*
* @return the operation identifier
* @since 4.10
*/
public long getOperationId() {
return operationId;
}

@Override
public String toString() {
return "ConnectionCheckOutStartedEvent{"
+ "server=" + serverId.getAddress()
+ ", clusterId=" + serverId.getClusterId()
+ ", operationId=" + operationId
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,30 @@
*/
public final class ConnectionCheckedInEvent {
private final ConnectionId connectionId;
private final long operationId;


/**
* Construct an instance
*
* @param connectionId the connectionId
* @param operationId the operation id
* @since 4.10
*/
public ConnectionCheckedInEvent(final ConnectionId connectionId) {
public ConnectionCheckedInEvent(final ConnectionId connectionId, final long operationId) {
this.connectionId = notNull("connectionId", connectionId);
this.operationId = operationId;
}

/**
* Construct an instance
*
* @param connectionId the connectionId
* @deprecated Prefer {@link #ConnectionCheckedInEvent(ConnectionId, long)}
*/
@Deprecated
public ConnectionCheckedInEvent(final ConnectionId connectionId) {
this(connectionId, -1);
}

/**
Expand All @@ -46,12 +62,23 @@ public ConnectionId getConnectionId() {
return connectionId;
}

/**
* Gets the operation identifier
*
* @return the operation identifier
* @since 4.10
*/
public long getOperationId() {
return operationId;
}

@Override
public String toString() {
return "ConnectionCheckedInEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ ", operationId=" + operationId
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,29 @@
*/
public final class ConnectionCheckedOutEvent {
private final ConnectionId connectionId;
private final long operationId;

/**
* Construct an instance
*
* @param connectionId the connectionId
* @param operationId the operation id
* @since 4.10
*/
public ConnectionCheckedOutEvent(final ConnectionId connectionId) {
public ConnectionCheckedOutEvent(final ConnectionId connectionId, final long operationId) {
this.connectionId = notNull("connectionId", connectionId);
this.operationId = operationId;
}

/**
* Construct an instance
*
* @param connectionId the connectionId
* @deprecated Prefer {@link #ConnectionCheckedOutEvent(ConnectionId, long)}
*/
@Deprecated
public ConnectionCheckedOutEvent(final ConnectionId connectionId) {
this(connectionId, -1);
}

/**
Expand All @@ -46,12 +61,23 @@ public ConnectionId getConnectionId() {
return connectionId;
}

/**
* Gets the operation identifier
*
* @return the operation identifier
* @since 4.10
*/
public long getOperationId() {
return operationId;
}

@Override
public String toString() {
return "ConnectionCheckedOutEvent{"
+ "connectionId=" + connectionId
+ ", server=" + connectionId.getServerId().getAddress()
+ ", clusterId=" + connectionId.getServerId().getClusterId()
+ ", operationId=" + operationId
+ '}';
}
}
1 change: 1 addition & 0 deletions driver-core/src/main/com/mongodb/internal/Timeout.java
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ public static Timeout immediate() {
return IMMEDIATE;
}


stIncMale marked this conversation as resolved.
Show resolved Hide resolved
/**
* Must not be called on {@linkplain #isInfinite() infinite} or {@linkplain #isImmediate() immediate} timeouts.
* <p>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,19 @@ public InternalConnection get(final OperationContext operationContext) {

@Override
public InternalConnection get(final OperationContext operationContext, final long timeoutValue, final TimeUnit timeUnit) {
connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId));
connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId, operationContext.getId()));
Timeout timeout = Timeout.startNow(timeoutValue, timeUnit);
try {
stateAndGeneration.throwIfClosedOrPaused();
PooledConnection connection = getPooledConnection(timeout);
if (!connection.opened()) {
connection = openConcurrencyLimiter.openOrGetAvailable(connection, timeout);
}
connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection)));
connection.checkedOutForOperation(operationContext);
connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection), operationContext.getId()));
return connection;
} catch (Exception e) {
throw (RuntimeException) checkOutFailed(e);
throw (RuntimeException) checkOutFailed(e, operationContext);
}
}

Expand All @@ -188,15 +189,16 @@ public void getAsync(final OperationContext operationContext, final SingleResult
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Asynchronously getting a connection from the pool for server %s", serverId));
}
connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId));
connectionPoolListener.connectionCheckOutStarted(new ConnectionCheckOutStartedEvent(serverId, operationContext.getId()));
Timeout timeout = Timeout.startNow(settings.getMaxWaitTime(NANOSECONDS));
SingleResultCallback<InternalConnection> eventSendingCallback = (result, failure) -> {
SingleResultCallback<PooledConnection> eventSendingCallback = (connection, failure) -> {
SingleResultCallback<InternalConnection> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
if (failure == null) {
connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(result)));
errHandlingCallback.onResult(result, null);
connection.checkedOutForOperation(operationContext);
connectionPoolListener.connectionCheckedOut(new ConnectionCheckedOutEvent(getId(connection), operationContext.getId()));
errHandlingCallback.onResult(connection, null);
} else {
errHandlingCallback.onResult(null, checkOutFailed(failure));
errHandlingCallback.onResult(null, checkOutFailed(failure, operationContext));
}
};
try {
Expand Down Expand Up @@ -238,20 +240,22 @@ public void getAsync(final OperationContext operationContext, final SingleResult
* and returns {@code t} if it is not {@link MongoOpenConnectionInternalException},
* or returns {@code t.}{@linkplain MongoOpenConnectionInternalException#getCause() getCause()} otherwise.
*/
private Throwable checkOutFailed(final Throwable t) {
private Throwable checkOutFailed(final Throwable t, final OperationContext operationContext) {
Throwable result = t;
Reason reason;
if (t instanceof MongoTimeoutException) {
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.TIMEOUT));
reason = Reason.TIMEOUT;
} else if (t instanceof MongoOpenConnectionInternalException) {
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.CONNECTION_ERROR));
reason = Reason.CONNECTION_ERROR;
result = t.getCause();
} else if (t instanceof MongoConnectionPoolClearedException) {
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.CONNECTION_ERROR));
reason = Reason.CONNECTION_ERROR;
} else if (ConcurrentPool.isPoolClosedException(t)) {
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.POOL_CLOSED));
reason = Reason.POOL_CLOSED;
} else {
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, Reason.UNKNOWN));
reason = Reason.UNKNOWN;
}
connectionPoolListener.connectionCheckOutFailed(new ConnectionCheckOutFailedEvent(serverId, operationContext.getId(), reason));
return result;
}

Expand Down Expand Up @@ -516,6 +520,7 @@ private class PooledConnection implements InternalConnection {
private final UsageTrackingInternalConnection wrapped;
private final AtomicBoolean isClosed = new AtomicBoolean();
private Connection.PinningMode pinningMode;
private volatile long operationId;

PooledConnection(final UsageTrackingInternalConnection wrapped) {
this.wrapped = notNull("wrapped", wrapped);
Expand All @@ -526,6 +531,13 @@ public int getGeneration() {
return wrapped.getGeneration();
}

/**
* Associates this with the operation context and establishes the checked out start time
*/
public void checkedOutForOperation(final OperationContext operationContext) {
this.operationId = operationContext.getId();
}

@Override
public void open() {
assertFalse(isClosed.get());
Expand Down Expand Up @@ -559,7 +571,7 @@ public void close() {
// All but the first call is a no-op
if (!isClosed.getAndSet(true)) {
unmarkAsPinned();
connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped)));
connectionPoolListener.connectionCheckedIn(new ConnectionCheckedInEvent(getId(wrapped), operationId));
if (LOGGER.isTraceEnabled()) {
LOGGER.trace(format("Checked in connection [%s] to server %s", getId(wrapped), serverId.getAddress()));
}
Expand Down Expand Up @@ -731,7 +743,7 @@ public ServerDescription getInitialServerDescription() {
/**
* This internal exception is used to express an exceptional situation encountered when opening a connection.
* It exists because it allows consolidating the code that sends events for exceptional situations in a
* {@linkplain #checkOutFailed(Throwable) single place}, it must not be observable by an external code.
* {@linkplain #checkOutFailed(Throwable, OperationContext) single place}, it must not be observable by an external code.
*/
private static final class MongoOpenConnectionInternalException extends RuntimeException {
private static final long serialVersionUID = 1;
Expand Down Expand Up @@ -919,7 +931,7 @@ private PooledConnection openWithConcurrencyLimit(final PooledConnection connect
* </ul>
*/
void openAsyncWithConcurrencyLimit(
final PooledConnection connection, final Timeout timeout, final SingleResultCallback<InternalConnection> callback) {
final PooledConnection connection, final Timeout timeout, final SingleResultCallback<PooledConnection> callback) {
PooledConnection availableConnection;
try {//phase one
availableConnection = acquirePermitOrGetAvailableOpenedConnection(true, timeout);
Expand Down