diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java index 66b9d1a97f2..13166eb53ab 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java @@ -21,6 +21,8 @@ import com.mongodb.lang.Nullable; import org.bson.BsonDocument; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; + /** * An operation that aborts a transaction. * @@ -44,8 +46,8 @@ protected String getCommandName() { } @Override - CommandOperationHelper.CommandCreator getCommandCreator() { - CommandOperationHelper.CommandCreator creator = super.getCommandCreator(); + CommandCreator getCommandCreator() { + CommandCreator creator = super.getCommandCreator(); if (recoveryToken != null) { return (serverDescription, connectionDescription) -> creator.create(serverDescription, connectionDescription).append("recoveryToken", recoveryToken); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java index bccd3716986..4379845bdd1 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java @@ -25,9 +25,6 @@ import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.client.model.AggregationLevel; import com.mongodb.internal.connection.QueryResult; -import com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonArray; @@ -43,14 +40,18 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; class AggregateOperationImpl implements AsyncReadOperation>, ReadOperation> { private static final String RESULT = "result"; @@ -196,8 +197,9 @@ public BatchCursor execute(final ReadBinding binding) { @Override public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER); - executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()), - CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads, errHandlingCallback); + executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()), + CommandResultDocumentCodec.create(this.decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads, + errHandlingCallback); } private CommandCreator getCommandCreator(final SessionContext sessionContext) { @@ -238,10 +240,11 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe } private QueryResult createQueryResult(final BsonDocument result, final ConnectionDescription description) { + assertNotNull(result); return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress()); } - private CommandReadTransformer> transformer() { + private CommandReadTransformer> transformer() { return (result, source, connection) -> { QueryResult queryResult = createQueryResult(result, connection.getDescription()); return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment, diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java index 568952f8c49..f41d0e4a462 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java @@ -40,9 +40,9 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.ServerVersionHelper.FIVE_DOT_ZERO_WIRE_VERSION; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError; diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java index 0eadbc1805a..9ccd2f17b0a 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncChangeStreamBatchCursor.java @@ -33,10 +33,10 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.assertNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncReadConnectionSource; import static com.mongodb.internal.operation.ChangeStreamBatchCursor.convertAndProduceLastId; import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection; import static java.lang.String.format; final class AsyncChangeStreamBatchCursor implements AsyncAggregateResponseBatchCursor { @@ -211,7 +211,7 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback> callback, final boolean tryNext) { - withAsyncReadConnection(binding, (source, t) -> { + withAsyncReadConnectionSource(binding, (source, t) -> { if (t != null) { callback.onResult(null, t); } else { diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java new file mode 100644 index 00000000000..21b10cdff08 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java @@ -0,0 +1,449 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.Function; +import com.mongodb.MongoException; +import com.mongodb.MongoNamespace; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; +import com.mongodb.assertions.Assertions; +import com.mongodb.internal.async.AsyncBatchCursor; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.async.function.AsyncCallbackBiFunction; +import com.mongodb.internal.async.function.AsyncCallbackFunction; +import com.mongodb.internal.async.function.AsyncCallbackSupplier; +import com.mongodb.internal.async.function.RetryState; +import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier; +import com.mongodb.internal.binding.AsyncConnectionSource; +import com.mongodb.internal.binding.AsyncReadBinding; +import com.mongodb.internal.binding.AsyncWriteBinding; +import com.mongodb.internal.binding.ReferenceCounted; +import com.mongodb.internal.connection.AsyncConnection; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.connection.QueryResult; +import com.mongodb.internal.operation.retry.AttachmentKeys; +import com.mongodb.internal.validator.NoOpFieldNameValidator; +import com.mongodb.lang.Nullable; +import org.bson.BsonDocument; +import org.bson.BsonValue; +import org.bson.FieldNameValidator; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.Decoder; + +import java.util.Collections; +import java.util.List; + +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; +import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel; +import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; +import static com.mongodb.internal.operation.CommandOperationHelper.isRetryWritesEnabled; +import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute; +import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException; +import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult; +import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError; + +final class AsyncOperationHelper { + + interface AsyncCallableWithConnection { + void call(@Nullable AsyncConnection connection, @Nullable Throwable t); + } + + interface AsyncCallableWithSource { + void call(@Nullable AsyncConnectionSource source, @Nullable Throwable t); + } + + interface CommandWriteTransformerAsync { + + /** + * Yield an appropriate result object for the input object. + * + * @param t the input object + * @return the function result + */ + @Nullable + R apply(T t, AsyncConnection connection); + } + + interface CommandReadTransformerAsync { + + /** + * Yield an appropriate result object for the input object. + * + * @param t the input object + * @return the function result + */ + @Nullable + R apply(T t, AsyncConnectionSource source, AsyncConnection connection); + } + + + static void withAsyncReadConnectionSource(final AsyncReadBinding binding, final AsyncCallableWithSource callable) { + binding.getReadConnectionSource(errorHandlingCallback(new AsyncCallableWithSourceCallback(callable), OperationHelper.LOGGER)); + } + + static void withAsyncConnection(final AsyncWriteBinding binding, final AsyncCallableWithConnection callable) { + binding.getWriteConnectionSource(errorHandlingCallback(new AsyncCallableWithConnectionCallback(callable), OperationHelper.LOGGER)); + } + + /** + * @see #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) + */ + static void withAsyncSourceAndConnection(final AsyncCallbackSupplier sourceSupplier, + final boolean wrapConnectionSourceException, final SingleResultCallback callback, + final AsyncCallbackBiFunction asyncFunction) + throws OperationHelper.ResourceSupplierInternalException { + SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER); + withAsyncSuppliedResource(sourceSupplier, wrapConnectionSourceException, errorHandlingCallback, + (source, sourceReleasingCallback) -> + withAsyncSuppliedResource(source::getConnection, wrapConnectionSourceException, sourceReleasingCallback, + (connection, connectionAndSourceReleasingCallback) -> + asyncFunction.apply(source, connection, connectionAndSourceReleasingCallback))); + } + + + static void withAsyncSuppliedResource(final AsyncCallbackSupplier resourceSupplier, + final boolean wrapSourceConnectionException, final SingleResultCallback callback, + final AsyncCallbackFunction function) throws OperationHelper.ResourceSupplierInternalException { + SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, OperationHelper.LOGGER); + resourceSupplier.get((resource, supplierException) -> { + if (supplierException != null) { + if (wrapSourceConnectionException) { + supplierException = new OperationHelper.ResourceSupplierInternalException(supplierException); + } + errorHandlingCallback.onResult(null, supplierException); + } else { + Assertions.assertNotNull(resource); + AsyncCallbackSupplier curriedFunction = c -> function.apply(resource, c); + curriedFunction.whenComplete(resource::release).get(errorHandlingCallback); + } + }); + } + + static void withAsyncConnectionSourceCallableConnection(final AsyncConnectionSource source, + final AsyncCallableWithConnection callable) { + source.getConnection((connection, t) -> { + source.release(); + if (t != null) { + callable.call(null, t); + } else { + callable.call(connection, null); + } + }); + } + + static void withAsyncConnectionSource(final AsyncConnectionSource source, final AsyncCallableWithSource callable) { + callable.call(source, null); + } + + static void executeRetryableReadAsync( + final AsyncReadBinding binding, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformerAsync transformer, + final boolean retryReads, + final SingleResultCallback callback) { + executeRetryableReadAsync(binding, binding::getReadConnectionSource, database, commandCreator, decoder, transformer, retryReads, + callback); + } + + static void executeRetryableReadAsync( + final AsyncReadBinding binding, + final AsyncCallbackSupplier sourceAsyncSupplier, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformerAsync transformer, + final boolean retryReads, + final SingleResultCallback callback) { + RetryState retryState = initialRetryState(retryReads); + binding.retain(); + AsyncCallbackSupplier asyncRead = decorateReadWithRetriesAsync(retryState, binding.getOperationContext(), + (AsyncCallbackSupplier) funcCallback -> + withAsyncSourceAndConnection(sourceAsyncSupplier, false, funcCallback, + (source, connection, releasingCallback) -> { + if (retryState.breakAndCompleteIfRetryAnd( + () -> !OperationHelper.canRetryRead(source.getServerDescription(), + binding.getSessionContext()), + releasingCallback)) { + return; + } + createReadCommandAndExecuteAsync(retryState, binding, source, + database, commandCreator, + decoder, transformer, + connection, + releasingCallback); + }) + ).whenComplete(binding::release); + asyncRead.get(errorHandlingCallback(callback, OperationHelper.LOGGER)); + } + + static void executeCommandAsync(final AsyncWriteBinding binding, + final String database, + final BsonDocument command, + final AsyncConnection connection, + final CommandWriteTransformerAsync transformer, + final SingleResultCallback callback) { + Assertions.notNull("binding", binding); + SingleResultCallback addingRetryableLabelCallback = addingRetryableLabelCallback(callback, + connection.getDescription().getMaxWireVersion()); + connection.commandAsync(database, command, new NoOpFieldNameValidator(), ReadPreference.primary(), new BsonDocumentCodec(), + binding, transformingWriteCallback(transformer, connection, addingRetryableLabelCallback)); + } + + static void executeRetryableWriteAsync( + final AsyncWriteBinding binding, + final String database, + @Nullable final ReadPreference readPreference, + final FieldNameValidator fieldNameValidator, + final Decoder commandResultDecoder, + final CommandCreator commandCreator, + final CommandWriteTransformerAsync transformer, + final Function retryCommandModifier, + final SingleResultCallback callback) { + RetryState retryState = initialRetryState(true); + binding.retain(); + + AsyncCallbackSupplier asyncWrite = decorateWriteWithRetriesAsync(retryState, binding.getOperationContext(), + (AsyncCallbackSupplier) funcCallback -> { + boolean firstAttempt = retryState.isFirstAttempt(); + if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) { + binding.getSessionContext().clearTransactionContext(); + } + withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback, + (source, connection, releasingCallback) -> { + int maxWireVersion = connection.getDescription().getMaxWireVersion(); + SingleResultCallback addingRetryableLabelCallback = firstAttempt + ? releasingCallback + : addingRetryableLabelCallback(releasingCallback, maxWireVersion); + if (retryState.breakAndCompleteIfRetryAnd(() -> !OperationHelper.canRetryWrite(connection.getDescription(), binding.getSessionContext()), + addingRetryableLabelCallback)) { + return; + } + BsonDocument command; + try { + command = retryState.attachment(AttachmentKeys.command()) + .map(previousAttemptCommand -> { + Assertions.assertFalse(firstAttempt); + return retryCommandModifier.apply(previousAttemptCommand); + }).orElseGet(() -> commandCreator.create(source.getServerDescription(), connection.getDescription())); + // attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry + retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true) + .attach(AttachmentKeys.retryableCommandFlag(), isRetryWritesEnabled(command), true) + .attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false) + .attach(AttachmentKeys.command(), command, false); + } catch (Throwable t) { + addingRetryableLabelCallback.onResult(null, t); + return; + } + connection.commandAsync(database, command, fieldNameValidator, readPreference, commandResultDecoder, binding, + transformingWriteCallback(transformer, connection, addingRetryableLabelCallback)); + }); + }).whenComplete(binding::release); + + asyncWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, OperationHelper.LOGGER))); + } + + static void createReadCommandAndExecuteAsync( + final RetryState retryState, + final AsyncReadBinding binding, + final AsyncConnectionSource source, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformerAsync transformer, + final AsyncConnection connection, + final SingleResultCallback callback) { + BsonDocument command; + try { + command = commandCreator.create(source.getServerDescription(), connection.getDescription()); + retryState.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false); + } catch (IllegalArgumentException e) { + callback.onResult(null, e); + return; + } + connection.commandAsync(database, command, new NoOpFieldNameValidator(), source.getReadPreference(), decoder, + binding, transformingReadCallback(transformer, source, connection, callback)); + } + + static AsyncCallbackSupplier decorateReadWithRetriesAsync(final RetryState retryState, final OperationContext operationContext, + final AsyncCallbackSupplier asyncReadFunction) { + return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException, + CommandOperationHelper::shouldAttemptToRetryRead, callback -> { + logRetryExecute(retryState, operationContext); + asyncReadFunction.get(callback); + }); + } + + static AsyncCallbackSupplier decorateWriteWithRetriesAsync(final RetryState retryState, final OperationContext operationContext, + final AsyncCallbackSupplier asyncWriteFunction) { + return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException, + CommandOperationHelper::shouldAttemptToRetryWrite, callback -> { + logRetryExecute(retryState, operationContext); + asyncWriteFunction.get(callback); + }); + } + + static CommandWriteTransformerAsync writeConcernErrorTransformerAsync() { + return (result, connection) -> { + assertNotNull(result); + throwOnWriteConcernError(result, connection.getDescription().getServerAddress(), connection.getDescription().getMaxWireVersion()); + return null; + }; + } + + static AsyncBatchCursor createEmptyAsyncBatchCursor(final MongoNamespace namespace, final ServerAddress serverAddress) { + return new AsyncSingleBatchQueryCursor<>(new QueryResult<>(namespace, Collections.emptyList(), 0L, serverAddress)); + } + + static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder decoder, + final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection, final int batchSize) { + return new AsyncQueryBatchCursor<>(cursorDocumentToQueryResult(cursorDocument, + source.getServerDescription().getAddress()), + 0, batchSize, 0, decoder, comment, source, connection, cursorDocument); + } + + static SingleResultCallback releasingCallback(final SingleResultCallback wrapped, final AsyncConnection connection) { + return new ReferenceCountedReleasingWrappedCallback<>(wrapped, Collections.singletonList(connection)); + } + + static SingleResultCallback exceptionTransformingCallback(final SingleResultCallback callback) { + return (result, t) -> { + if (t != null) { + if (t instanceof MongoException) { + callback.onResult(null, transformWriteException((MongoException) t)); + } else { + callback.onResult(null, t); + } + } else { + callback.onResult(result, null); + } + }; + } + + private static SingleResultCallback transformingWriteCallback(final CommandWriteTransformerAsync transformer, + final AsyncConnection connection, final SingleResultCallback callback) { + return (result, t) -> { + if (t != null) { + callback.onResult(null, t); + } else { + R transformedResult; + try { + transformedResult = transformer.apply(assertNotNull(result), connection); + } catch (Throwable e) { + callback.onResult(null, e); + return; + } + callback.onResult(transformedResult, null); + } + }; + } + + + private static class AsyncCallableWithConnectionCallback implements SingleResultCallback { + private final AsyncCallableWithConnection callable; + + AsyncCallableWithConnectionCallback(final AsyncCallableWithConnection callable) { + this.callable = callable; + } + + @Override + public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) { + if (t != null) { + callable.call(null, t); + } else { + withAsyncConnectionSourceCallableConnection(Assertions.assertNotNull(source), callable); + } + } + } + + private static class AsyncCallableWithSourceCallback implements SingleResultCallback { + private final AsyncCallableWithSource callable; + + AsyncCallableWithSourceCallback(final AsyncCallableWithSource callable) { + this.callable = callable; + } + + @Override + public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) { + if (t != null) { + callable.call(null, t); + } else { + withAsyncConnectionSource(Assertions.assertNotNull(source), callable); + } + } + } + + private static class ReferenceCountedReleasingWrappedCallback implements SingleResultCallback { + private final SingleResultCallback wrapped; + private final List referenceCounted; + + ReferenceCountedReleasingWrappedCallback(final SingleResultCallback wrapped, + final List referenceCounted) { + this.wrapped = wrapped; + this.referenceCounted = Assertions.notNull("referenceCounted", referenceCounted); + } + + @Override + public void onResult(@Nullable final T result, @Nullable final Throwable t) { + for (ReferenceCounted cur : referenceCounted) { + if (cur != null) { + cur.release(); + } + } + wrapped.onResult(result, t); + } + } + + private static SingleResultCallback addingRetryableLabelCallback(final SingleResultCallback callback, + final int maxWireVersion) { + return (result, t) -> { + if (t != null) { + if (t instanceof MongoException) { + addRetryableWriteErrorLabel((MongoException) t, maxWireVersion); + } + callback.onResult(null, t); + } else { + callback.onResult(result, null); + } + }; + } + + private static SingleResultCallback transformingReadCallback(final CommandReadTransformerAsync transformer, + final AsyncConnectionSource source, final AsyncConnection connection, final SingleResultCallback callback) { + return (result, t) -> { + if (t != null) { + callback.onResult(null, t); + } else { + R transformedResult; + try { + transformedResult = transformer.apply(assertNotNull(result), source, connection); + } catch (Throwable e) { + callback.onResult(null, e); + return; + } + callback.onResult(transformedResult, null); + } + }; + } + + private AsyncOperationHelper() { + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java index 4242fa2831c..b7f721b5fc4 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java @@ -55,7 +55,7 @@ import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; -import static com.mongodb.internal.operation.OperationHelper.getMoreCursorDocumentToQueryResult; +import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult; import static com.mongodb.internal.operation.QueryHelper.translateCommandException; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour; import static java.lang.String.format; diff --git a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java index 820da388d22..e3ae79fa589 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java @@ -36,13 +36,13 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableWriteAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWrite; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWriteAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; import static com.mongodb.internal.operation.OperationHelper.validateHintForFindAndModify; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableWrite; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java index 945d677a758..acf70090457 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamBatchCursor.java @@ -21,7 +21,6 @@ import com.mongodb.ServerAddress; import com.mongodb.ServerCursor; import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.operation.OperationHelper.CallableWithSource; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonTimestamp; @@ -35,7 +34,7 @@ import java.util.function.Function; import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError; -import static com.mongodb.internal.operation.OperationHelper.withReadConnectionSource; +import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource; final class ChangeStreamBatchCursor implements AggregateResponseBatchCursor { private final ReadBinding binding; @@ -195,7 +194,7 @@ R resumeableOperation(final Function) source -> { + withReadConnectionSource(binding, source -> { changeStreamOperation.setChangeStreamOptionsForResume(resumeToken, source.getServerDescription().getMaxWireVersion()); return null; }); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java index 0457cecf561..a2ba029eb56 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java @@ -26,7 +26,6 @@ import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.client.model.changestream.ChangeStreamLevel; -import com.mongodb.internal.operation.OperationHelper.CallableWithSource; import com.mongodb.lang.Nullable; import org.bson.BsonArray; import org.bson.BsonBoolean; @@ -44,8 +43,8 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection; -import static com.mongodb.internal.operation.OperationHelper.withReadConnectionSource; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncReadConnectionSource; +import static com.mongodb.internal.operation.SyncOperationHelper.withReadConnectionSource; /** * An operation that executes an {@code $changeStream} aggregation. @@ -183,7 +182,7 @@ public ChangeStreamOperation showExpandedEvents(final boolean showExpandedEve @Override public BatchCursor execute(final ReadBinding binding) { - return withReadConnectionSource(binding, (CallableWithSource>) source -> { + return withReadConnectionSource(binding, source -> { AggregateResponseBatchCursor cursor = (AggregateResponseBatchCursor) wrapped.execute(binding); return new ChangeStreamBatchCursor<>(ChangeStreamOperation.this, cursor, binding, @@ -200,7 +199,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb } else { AsyncAggregateResponseBatchCursor cursor = (AsyncAggregateResponseBatchCursor) result; - withAsyncReadConnection(binding, (source, t1) -> { + withAsyncReadConnectionSource(binding, (source, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java index c96f9d5ef38..fb1cc3c2da2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java @@ -16,7 +16,6 @@ package com.mongodb.internal.operation; -import com.mongodb.Function; import com.mongodb.MongoClientException; import com.mongodb.MongoCommandException; import com.mongodb.MongoConnectionPoolClearedException; @@ -26,131 +25,34 @@ import com.mongodb.MongoSecurityException; import com.mongodb.MongoServerException; import com.mongodb.MongoSocketException; -import com.mongodb.ReadPreference; import com.mongodb.assertions.Assertions; import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerDescription; -import com.mongodb.internal.VisibleForTesting; -import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.async.function.AsyncCallbackSupplier; import com.mongodb.internal.async.function.RetryState; -import com.mongodb.internal.async.function.RetryingAsyncCallbackSupplier; -import com.mongodb.internal.async.function.RetryingSyncSupplier; -import com.mongodb.internal.binding.AsyncConnectionSource; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; -import com.mongodb.internal.binding.ConnectionSource; -import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.binding.WriteBinding; -import com.mongodb.internal.connection.AsyncConnection; -import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.OperationContext; import com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException; import com.mongodb.internal.operation.retry.AttachmentKeys; -import com.mongodb.internal.validator.NoOpFieldNameValidator; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; -import org.bson.FieldNameValidator; -import org.bson.codecs.BsonDocumentCodec; -import org.bson.codecs.Decoder; import java.util.List; import java.util.function.Supplier; -import static com.mongodb.ReadPreference.primary; import static com.mongodb.assertions.Assertions.assertFalse; -import static com.mongodb.assertions.Assertions.assertNotNull; -import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; -import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.canRetryRead; -import static com.mongodb.internal.operation.OperationHelper.canRetryWrite; -import static com.mongodb.internal.operation.OperationHelper.withAsyncSourceAndConnection; -import static com.mongodb.internal.operation.OperationHelper.withSourceAndConnection; import static java.lang.String.format; import static java.util.Arrays.asList; @SuppressWarnings("overloads") final class CommandOperationHelper { - interface CommandReadTransformer { - - /** - * Yield an appropriate result object for the input object. - * - * @param t the input object - * @return the function result - */ - @Nullable - R apply(T t, ConnectionSource source, Connection connection); - } - - interface CommandWriteTransformer { - - /** - * Yield an appropriate result object for the input object. - * - * @param t the input object - * @return the function result - */ - @Nullable - R apply(T t, Connection connection); - } - - interface CommandWriteTransformerAsync { - - /** - * Yield an appropriate result object for the input object. - * - * @param t the input object - * @return the function result - */ - @Nullable - R apply(T t, AsyncConnection connection); - } - - interface CommandReadTransformerAsync { - - /** - * Yield an appropriate result object for the input object. - * - * @param t the input object - * @return the function result - */ - @Nullable - R apply(T t, AsyncConnectionSource source, AsyncConnection connection); - } - - static CommandWriteTransformer writeConcernErrorTransformer() { - return (result, connection) -> { - WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress(), - connection.getDescription().getMaxWireVersion()); - return null; - }; - } - - static CommandWriteTransformerAsync writeConcernErrorWriteTransformer() { - return (result, connection) -> { - WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress(), - connection.getDescription().getMaxWireVersion()); - return null; - }; - } - - static CommandWriteTransformerAsync writeConcernErrorTransformerAsync() { - return (result, connection) -> { - WriteConcernHelper.throwOnWriteConcernError(result, connection.getDescription().getServerAddress(), - connection.getDescription().getMaxWireVersion()); - return null; - }; - } interface CommandCreator { BsonDocument create(ServerDescription serverDescription, ConnectionDescription connectionDescription); } - private static Throwable chooseRetryableReadException( + + static Throwable chooseRetryableReadException( @Nullable final Throwable previouslyChosenException, final Throwable mostRecentAttemptException) { assertFalse(mostRecentAttemptException instanceof ResourceSupplierInternalException); if (previouslyChosenException == null @@ -184,330 +86,6 @@ static RetryState initialRetryState(final boolean retry) { return new RetryState(retry ? RetryState.RETRIES : 0); } - static Supplier decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext, - final Supplier readFunction) { - return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException, - CommandOperationHelper::shouldAttemptToRetryRead, () -> { - logRetryExecute(retryState, operationContext); - return readFunction.get(); - }); - } - - static AsyncCallbackSupplier decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext, - final AsyncCallbackSupplier asyncReadFunction) { - return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException, - CommandOperationHelper::shouldAttemptToRetryRead, callback -> { - logRetryExecute(retryState, operationContext); - asyncReadFunction.get(callback); - }); - } - - static T executeRetryableRead( - final ReadBinding binding, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformer transformer, - final boolean retryReads) { - return executeRetryableRead(binding, binding::getReadConnectionSource, database, commandCreator, decoder, transformer, retryReads); - } - - static T executeRetryableRead( - final ReadBinding binding, - final Supplier readConnectionSourceSupplier, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformer transformer, - final boolean retryReads) { - RetryState retryState = initialRetryState(retryReads); - Supplier read = decorateReadWithRetries(retryState, binding.getOperationContext(), () -> - withSourceAndConnection(readConnectionSourceSupplier, false, (source, connection) -> { - retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext())); - return createReadCommandAndExecute(retryState, binding, source, database, commandCreator, decoder, transformer, connection); - }) - ); - return read.get(); - } - - @Nullable - static T createReadCommandAndExecute( - final RetryState retryState, - final ReadBinding binding, - final ConnectionSource source, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformer transformer, - final Connection connection) { - BsonDocument command = commandCreator.create(source.getServerDescription(), connection.getDescription()); - retryState.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false); - return transformer.apply(assertNotNull(connection.command(database, command, new NoOpFieldNameValidator(), - source.getReadPreference(), decoder, binding)), source, connection); - } - - /* Write Binding Helpers */ - - @VisibleForTesting(otherwise = PRIVATE) - static T executeCommand(final WriteBinding binding, final String database, final BsonDocument command, - final Decoder decoder, final CommandWriteTransformer transformer) { - return withSourceAndConnection(binding::getWriteConnectionSource, false, (source, connection) -> - transformer.apply(assertNotNull( - connection.command(database, command, new NoOpFieldNameValidator(), primary(), decoder, binding)), connection)); - } - - @Nullable - static T executeCommand(final WriteBinding binding, final String database, final BsonDocument command, - final Connection connection, final CommandWriteTransformer transformer) { - notNull("binding", binding); - return transformer.apply(assertNotNull( - connection.command(database, command, new NoOpFieldNameValidator(), primary(), new BsonDocumentCodec(), binding)), - connection); - } - - /* Async Read Binding Helpers */ - - static void executeRetryableReadAsync( - final AsyncReadBinding binding, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformerAsync transformer, - final boolean retryReads, - final SingleResultCallback callback) { - executeRetryableReadAsync(binding, binding::getReadConnectionSource, database, commandCreator, decoder, transformer, retryReads, - callback); - } - - static void executeRetryableReadAsync( - final AsyncReadBinding binding, - final AsyncCallbackSupplier sourceAsyncSupplier, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformerAsync transformer, - final boolean retryReads, - final SingleResultCallback callback) { - RetryState retryState = initialRetryState(retryReads); - binding.retain(); - AsyncCallbackSupplier asyncRead = CommandOperationHelper.decorateReadWithRetries(retryState, binding.getOperationContext(), - funcCallback -> - withAsyncSourceAndConnection(sourceAsyncSupplier, false, funcCallback, - (source, connection, releasingCallback) -> { - if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), - binding.getSessionContext()), releasingCallback)) { - return; - } - createReadCommandAndExecuteAsync(retryState, binding, source, database, commandCreator, decoder, transformer, - connection, releasingCallback); - }) - ).whenComplete(binding::release); - asyncRead.get(errorHandlingCallback(callback, LOGGER)); - } - - static void createReadCommandAndExecuteAsync( - final RetryState retryState, - final AsyncReadBinding binding, - final AsyncConnectionSource source, - final String database, - final CommandCreator commandCreator, - final Decoder decoder, - final CommandReadTransformerAsync transformer, - final AsyncConnection connection, - final SingleResultCallback callback) { - BsonDocument command; - try { - command = commandCreator.create(source.getServerDescription(), connection.getDescription()); - retryState.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false); - } catch (IllegalArgumentException e) { - callback.onResult(null, e); - return; - } - connection.commandAsync(database, command, new NoOpFieldNameValidator(), source.getReadPreference(), decoder, - binding, transformingReadCallback(transformer, source, connection, callback)); - } - - private static SingleResultCallback transformingReadCallback(final CommandReadTransformerAsync transformer, - final AsyncConnectionSource source, final AsyncConnection connection, final SingleResultCallback callback) { - return (result, t) -> { - if (t != null) { - callback.onResult(null, t); - } else { - R transformedResult; - try { - transformedResult = transformer.apply(result, source, connection); - } catch (Throwable e) { - callback.onResult(null, e); - return; - } - callback.onResult(transformedResult, null); - } - }; - } - - private static SingleResultCallback transformingWriteCallback(final CommandWriteTransformerAsync transformer, - final AsyncConnection connection, final SingleResultCallback callback) { - return (result, t) -> { - if (t != null) { - callback.onResult(null, t); - } else { - R transformedResult; - try { - transformedResult = transformer.apply(result, connection); - } catch (Throwable e) { - callback.onResult(null, e); - return; - } - callback.onResult(transformedResult, null); - } - }; - } - - /* Async Write Binding Helpers */ - - static void executeCommandAsync(final AsyncWriteBinding binding, - final String database, - final BsonDocument command, - final AsyncConnection connection, - final CommandWriteTransformerAsync transformer, - final SingleResultCallback callback) { - notNull("binding", binding); - SingleResultCallback addingRetryableLabelCallback = addingRetryableLabelCallback(callback, - connection.getDescription().getMaxWireVersion()); - connection.commandAsync(database, command, new NoOpFieldNameValidator(), primary(), new BsonDocumentCodec(), - binding, transformingWriteCallback(transformer, connection, addingRetryableLabelCallback)); - } - - static Supplier decorateWriteWithRetries(final RetryState retryState, - final OperationContext operationContext, final Supplier writeFunction) { - return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException, - CommandOperationHelper::shouldAttemptToRetryWrite, () -> { - logRetryExecute(retryState, operationContext); - return writeFunction.get(); - }); - } - - static AsyncCallbackSupplier decorateWriteWithRetries(final RetryState retryState, final OperationContext operationContext, - final AsyncCallbackSupplier asyncWriteFunction) { - return new RetryingAsyncCallbackSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException, - CommandOperationHelper::shouldAttemptToRetryWrite, callback -> { - logRetryExecute(retryState, operationContext); - asyncWriteFunction.get(callback); - }); - } - - static R executeRetryableWrite( - final WriteBinding binding, - final String database, - @Nullable final ReadPreference readPreference, - final FieldNameValidator fieldNameValidator, - final Decoder commandResultDecoder, - final CommandCreator commandCreator, - final CommandWriteTransformer transformer, - final Function retryCommandModifier) { - RetryState retryState = initialRetryState(true); - Supplier retryingWrite = decorateWriteWithRetries(retryState, binding.getOperationContext(), () -> { - boolean firstAttempt = retryState.isFirstAttempt(); - if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) { - binding.getSessionContext().clearTransactionContext(); - } - return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> { - int maxWireVersion = connection.getDescription().getMaxWireVersion(); - try { - retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext())); - BsonDocument command = retryState.attachment(AttachmentKeys.command()) - .map(previousAttemptCommand -> { - assertFalse(firstAttempt); - return retryCommandModifier.apply(previousAttemptCommand); - }).orElseGet(() -> commandCreator.create(source.getServerDescription(), connection.getDescription())); - // attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry - retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true) - .attach(AttachmentKeys.retryableCommandFlag(), isRetryWritesEnabled(command), true) - .attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false) - .attach(AttachmentKeys.command(), command, false); - return transformer.apply(connection.command(database, command, fieldNameValidator, readPreference, - commandResultDecoder, binding), - connection); - } catch (MongoException e) { - if (!firstAttempt) { - addRetryableWriteErrorLabel(e, maxWireVersion); - } - throw e; - } - }); - }); - try { - return retryingWrite.get(); - } catch (MongoException e) { - throw transformWriteException(e); - } - } - - static void executeRetryableWriteAsync( - final AsyncWriteBinding binding, - final String database, - @Nullable final ReadPreference readPreference, - final FieldNameValidator fieldNameValidator, - final Decoder commandResultDecoder, - final CommandCreator commandCreator, - final CommandWriteTransformerAsync transformer, - final Function retryCommandModifier, - final SingleResultCallback callback) { - RetryState retryState = initialRetryState(true); - binding.retain(); - AsyncCallbackSupplier asyncWrite = CommandOperationHelper.decorateWriteWithRetries(retryState, - binding.getOperationContext(), funcCallback -> { - boolean firstAttempt = retryState.isFirstAttempt(); - if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) { - binding.getSessionContext().clearTransactionContext(); - } - withAsyncSourceAndConnection(binding::getWriteConnectionSource, true, funcCallback, - (source, connection, releasingCallback) -> { - int maxWireVersion = connection.getDescription().getMaxWireVersion(); - SingleResultCallback addingRetryableLabelCallback = firstAttempt - ? releasingCallback - : addingRetryableLabelCallback(releasingCallback, maxWireVersion); - if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()), - addingRetryableLabelCallback)) { - return; - } - BsonDocument command; - try { - command = retryState.attachment(AttachmentKeys.command()) - .map(previousAttemptCommand -> { - assertFalse(firstAttempt); - return retryCommandModifier.apply(previousAttemptCommand); - }).orElseGet(() -> commandCreator.create(source.getServerDescription(), connection.getDescription())); - // attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry - retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true) - .attach(AttachmentKeys.retryableCommandFlag(), isRetryWritesEnabled(command), true) - .attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false) - .attach(AttachmentKeys.command(), command, false); - } catch (Throwable t) { - addingRetryableLabelCallback.onResult(null, t); - return; - } - connection.commandAsync(database, command, fieldNameValidator, readPreference, commandResultDecoder, binding, - transformingWriteCallback(transformer, connection, addingRetryableLabelCallback)); - }); - }).whenComplete(binding::release); - asyncWrite.get(exceptionTransformingCallback(errorHandlingCallback(callback, LOGGER))); - } - - private static SingleResultCallback addingRetryableLabelCallback(final SingleResultCallback callback, - final int maxWireVersion) { - return (result, t) -> { - if (t != null) { - if (t instanceof MongoException) { - addRetryableWriteErrorLabel((MongoException) t, maxWireVersion); - } - callback.onResult(null, t); - } else { - callback.onResult(result, null); - } - }; - } - private static final List RETRYABLE_ERROR_CODES = asList(6, 7, 89, 91, 189, 262, 9001, 13436, 13435, 11602, 11600, 10107); static boolean isRetryableException(final Throwable t) { if (!(t instanceof MongoException)) { @@ -544,7 +122,7 @@ static boolean isNamespaceError(final Throwable t) { } } - private static boolean shouldAttemptToRetryRead(final RetryState retryState, final Throwable attemptFailure) { + static boolean shouldAttemptToRetryRead(final RetryState retryState, final Throwable attemptFailure) { assertFalse(attemptFailure instanceof ResourceSupplierInternalException); boolean decision = isRetryableException(attemptFailure) || (attemptFailure instanceof MongoSecurityException @@ -579,7 +157,7 @@ static boolean shouldAttemptToRetryWrite(final RetryState retryState, final Thro return decision; } - private static boolean isRetryWritesEnabled(@Nullable final BsonDocument command) { + static boolean isRetryWritesEnabled(@Nullable final BsonDocument command) { return (command != null && (command.containsKey("txnNumber") || command.getFirstKey().equals("commitTransaction") || command.getFirstKey().equals("abortTransaction"))); } @@ -641,20 +219,6 @@ static MongoException transformWriteException(final MongoException exception) { return exception; } - static SingleResultCallback exceptionTransformingCallback(final SingleResultCallback callback) { - return (result, t) -> { - if (t != null) { - if (t instanceof MongoException) { - callback.onResult(null, transformWriteException((MongoException) t)); - } else { - callback.onResult(null, t); - } - } else { - callback.onResult(result, null); - } - }; - } - private CommandOperationHelper() { } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java index 53adf1aa7f0..47b807f91ec 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java @@ -23,9 +23,9 @@ import org.bson.codecs.Decoder; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; /** * An operation that executes an arbitrary command that reads from the server. diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java index 28e25aabe80..92779bc61ae 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java @@ -28,7 +28,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.WriteBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt32; @@ -40,6 +39,7 @@ import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL; import static java.util.Arrays.asList; import static java.util.concurrent.TimeUnit.MILLISECONDS; diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java index d14e5e6cbcd..43298bae4bf 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java @@ -22,8 +22,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -35,12 +33,14 @@ import java.util.concurrent.TimeUnit; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; /** *

This class is not part of the public API and may be removed or changed at any time

diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java index feb296d6558..c78fee6838e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java @@ -45,18 +45,18 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfFalse; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsLessThanVersionSevenDotZero; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -240,8 +240,8 @@ public Void execute(final WriteBinding binding) { return withConnection(binding, connection -> { checkEncryptedFieldsSupported(connection.getDescription()); getCommandFunctions().forEach(commandCreator -> - executeCommand(binding, databaseName, commandCreator.get(), connection, - writeConcernErrorTransformer()) + executeCommand(binding, databaseName, commandCreator.get(), connection, + writeConcernErrorTransformer()) ); return null; }); @@ -425,7 +425,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) { finalCallback.onResult(null, null); } else { executeCommandAsync(binding, databaseName, nextCommandFunction.get(), - connection, writeConcernErrorWriteTransformer(), this); + connection, writeConcernErrorTransformerAsync(), this); } } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java index a176861ba10..b47b45a5eee 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java @@ -47,17 +47,17 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.IndexHelper.generateIndexName; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; /** @@ -147,7 +147,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall SingleResultCallback wrappedCallback = releasingCallback(errHandlingCallback, connection); try { executeCommandAsync(binding, namespace.getDatabaseName(), - getCommand(connection.getDescription()), connection, writeConcernErrorWriteTransformer(), + getCommand(connection.getDescription()), connection, writeConcernErrorTransformerAsync(), (result, t12) -> wrappedCallback.onResult(null, translateException(t12))); } catch (Throwable t1) { wrappedCallback.onResult(null, t1); diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java index 12da1e2a49d..8d1e98de6b8 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java @@ -31,14 +31,14 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; /** @@ -138,10 +138,9 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall SingleResultCallback errHandlingCallback = errorHandlingCallback(callback, LOGGER); if (t != null) { errHandlingCallback.onResult(null, t); - } - else { + } else { SingleResultCallback wrappedCallback = releasingCallback(errHandlingCallback, connection); - executeCommandAsync(binding, databaseName, getCommand(), connection, writeConcernErrorWriteTransformer(), + executeCommandAsync(binding, databaseName, getCommand(), connection, writeConcernErrorTransformerAsync(), wrappedCallback); } }); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java index 350154c8383..a64c4cbfadd 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java @@ -24,8 +24,6 @@ import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.connection.QueryResult; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -38,13 +36,15 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; /** * Finds the distinct values for a specified field across a single collection. diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java index 6f5cadf430b..6ddc087bdee 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java @@ -38,16 +38,16 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; import static java.util.Arrays.asList; import static java.util.Collections.singletonList; @@ -252,7 +252,7 @@ public void onResult(@Nullable final Void result, @Nullable final Throwable t) { finalCallback.onResult(null, null); } else { executeCommandAsync(binding, namespace.getDatabaseName(), nextCommandFunction.get(), - connection, writeConcernErrorWriteTransformer(), this); + connection, writeConcernErrorTransformerAsync(), this); } } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java index 1f9c0fb5f83..2dad7dda177 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java @@ -26,14 +26,14 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; /** @@ -75,7 +75,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall errHandlingCallback.onResult(null, t); } else { executeCommandAsync(binding, databaseName, getCommand(), connection, - writeConcernErrorWriteTransformer(), releasingCallback(errHandlingCallback, connection)); + writeConcernErrorTransformerAsync(), releasingCallback(errHandlingCallback, connection)); } }); diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java index 08a47d85a12..66bb8f408fb 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java @@ -31,17 +31,17 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; /** @@ -116,7 +116,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall } else { SingleResultCallback releasingCallback = releasingCallback(errHandlingCallback, connection); executeCommandAsync(binding, namespace.getDatabaseName(), getCommand(), - connection, writeConcernErrorWriteTransformer(), (result, t1) -> { + connection, writeConcernErrorTransformerAsync(), (result, t1) -> { if (t1 != null && !isNamespaceError(t1)) { releasingCallback.onResult(null, t1); } else { diff --git a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java index 8417138cad8..571de884582 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java @@ -22,8 +22,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -36,13 +34,15 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; import static java.util.Collections.singletonList; /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndModifyHelper.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndModifyHelper.java index 8152ded83db..8358ccf2a7a 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindAndModifyHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndModifyHelper.java @@ -19,14 +19,14 @@ import com.mongodb.MongoWriteConcernException; import com.mongodb.ServerAddress; import com.mongodb.WriteConcernResult; -import com.mongodb.internal.operation.CommandOperationHelper.CommandWriteTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandWriteTransformerAsync; import com.mongodb.lang.Nullable; import org.bson.BsonArray; import org.bson.BsonBoolean; import org.bson.BsonDocument; import org.bson.BsonInt32; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandWriteTransformerAsync; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandWriteTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.createWriteConcernError; import static com.mongodb.internal.operation.WriteConcernHelper.hasWriteConcernError; diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java index 2b71b9c7549..dcb94211fcf 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java @@ -46,12 +46,11 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.createReadCommandAndExecuteAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.decorateReadWithRetriesAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecuteAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.decorateReadWithRetries; import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNullOrEmpty; @@ -59,10 +58,12 @@ import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.canRetryRead; import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult; -import static com.mongodb.internal.operation.OperationHelper.withAsyncSourceAndConnection; -import static com.mongodb.internal.operation.OperationHelper.withSourceAndConnection; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; import static com.mongodb.internal.operation.ServerVersionHelper.MIN_WIRE_VERSION; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute; +import static com.mongodb.internal.operation.SyncOperationHelper.decorateReadWithRetries; +import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection; /** * An operation that queries a collection using the provided criteria. @@ -337,8 +338,8 @@ public BatchCursor execute(final ReadBinding binding) { public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { RetryState retryState = initialRetryState(retryReads); binding.retain(); - AsyncCallbackSupplier> asyncRead = CommandOperationHelper.>decorateReadWithRetries( - retryState, binding.getOperationContext(), funcCallback -> + AsyncCallbackSupplier> asyncRead = decorateReadWithRetriesAsync( + retryState, binding.getOperationContext(), (AsyncCallbackSupplier>) funcCallback -> withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback, (source, connection, releasingCallback) -> { if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), @@ -470,7 +471,7 @@ private boolean isAwaitData() { return cursorType == CursorType.TailableAwait; } - private CommandReadTransformer> transformer() { + private CommandReadTransformer> transformer() { return (result, source, connection) -> { QueryResult queryResult = cursorDocumentToQueryResult(result.getDocument("cursor"), connection.getDescription().getServerAddress()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java index 505e2f2d7fc..fa2a5dcd995 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java @@ -25,8 +25,6 @@ import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.lang.Nullable; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -41,10 +39,12 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecuteAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.decorateReadWithRetries; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor; +import static com.mongodb.internal.operation.AsyncOperationHelper.createReadCommandAndExecuteAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor; +import static com.mongodb.internal.operation.AsyncOperationHelper.decorateReadWithRetriesAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; @@ -52,12 +52,12 @@ import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.canRetryRead; -import static com.mongodb.internal.operation.OperationHelper.createEmptyAsyncBatchCursor; import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToAsyncBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.withAsyncSourceAndConnection; -import static com.mongodb.internal.operation.OperationHelper.withSourceAndConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute; +import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor; +import static com.mongodb.internal.operation.SyncOperationHelper.decorateReadWithRetries; +import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection; /** * An operation that provides a cursor allowing iteration through the metadata of all the collections in a database. This operation @@ -160,8 +160,8 @@ public BatchCursor execute(final ReadBinding binding) { public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { RetryState retryState = initialRetryState(retryReads); binding.retain(); - AsyncCallbackSupplier> asyncRead = CommandOperationHelper.>decorateReadWithRetries( - retryState, binding.getOperationContext(), funcCallback -> + AsyncCallbackSupplier> asyncRead = decorateReadWithRetriesAsync( + retryState, binding.getOperationContext(), (AsyncCallbackSupplier>) funcCallback -> withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback, (source, connection, releasingCallback) -> { if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), @@ -197,7 +197,7 @@ private CommandReadTransformer> commandTransformer( return (result, source, connection) -> cursorDocumentToBatchCursor(result.getDocument("cursor"), decoder, comment, source, connection, batchSize); } - private CommandCreator getCommandCreator() { + private CommandOperationHelper.CommandCreator getCommandCreator() { return (serverDescription, connectionDescription) -> getCommand(); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java index f0aad9bdd3c..bacf64601c9 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java @@ -22,8 +22,6 @@ import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.connection.QueryResult; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.lang.Nullable; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -36,11 +34,13 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java index a5567f97d58..62ecdc953bd 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java @@ -25,8 +25,6 @@ import com.mongodb.internal.binding.AsyncConnectionSource; import com.mongodb.internal.binding.AsyncReadBinding; import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt64; @@ -40,10 +38,13 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor; +import static com.mongodb.internal.operation.AsyncOperationHelper.createReadCommandAndExecuteAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.cursorDocumentToAsyncBatchCursor; +import static com.mongodb.internal.operation.AsyncOperationHelper.decorateReadWithRetriesAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecute; -import static com.mongodb.internal.operation.CommandOperationHelper.createReadCommandAndExecuteAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.decorateReadWithRetries; import static com.mongodb.internal.operation.CommandOperationHelper.initialRetryState; import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError; import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError; @@ -51,12 +52,12 @@ import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.canRetryRead; -import static com.mongodb.internal.operation.OperationHelper.createEmptyAsyncBatchCursor; import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToAsyncBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToBatchCursor; -import static com.mongodb.internal.operation.OperationHelper.withAsyncSourceAndConnection; -import static com.mongodb.internal.operation.OperationHelper.withSourceAndConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.createReadCommandAndExecute; +import static com.mongodb.internal.operation.SyncOperationHelper.cursorDocumentToBatchCursor; +import static com.mongodb.internal.operation.SyncOperationHelper.decorateReadWithRetries; +import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection; /** * An operation that lists the indexes that have been created on a collection. For flexibility, the type of each document returned is @@ -138,8 +139,8 @@ public BatchCursor execute(final ReadBinding binding) { public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) { RetryState retryState = initialRetryState(retryReads); binding.retain(); - AsyncCallbackSupplier> asyncRead = CommandOperationHelper.>decorateReadWithRetries( - retryState, binding.getOperationContext(), funcCallback -> + AsyncCallbackSupplier> asyncRead = decorateReadWithRetriesAsync( + retryState, binding.getOperationContext(), (AsyncCallbackSupplier>) funcCallback -> withAsyncSourceAndConnection(binding::getReadConnectionSource, false, funcCallback, (source, connection, releasingCallback) -> { if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java index 7209b348a7f..482b4261d10 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java @@ -24,8 +24,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.WriteBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandWriteTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandWriteTransformerAsync; import com.mongodb.lang.Nullable; import org.bson.BsonBoolean; import org.bson.BsonDocument; @@ -40,16 +38,18 @@ import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandWriteTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.DocumentHelper.putIfTrue; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandWriteTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError; import static java.util.Arrays.asList; diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java index e7c5c56192c..131591dd6e2 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceWithInlineResultsOperation.java @@ -25,8 +25,6 @@ import com.mongodb.internal.binding.ReadBinding; import com.mongodb.internal.connection.NoOpSessionContext; import com.mongodb.internal.connection.QueryResult; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer; -import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync; import com.mongodb.internal.session.SessionContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -40,9 +38,9 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero; import static com.mongodb.internal.operation.DocumentHelper.putIfTrue; @@ -50,6 +48,8 @@ import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand; import static com.mongodb.internal.operation.ServerVersionHelper.MIN_WIRE_VERSION; +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** diff --git a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java index a30abf0cfa3..fb54fb33994 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java @@ -58,16 +58,16 @@ import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.exceptionTransformingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncSourceAndConnection; import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel; -import static com.mongodb.internal.operation.CommandOperationHelper.exceptionTransformingCallback; import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute; import static com.mongodb.internal.operation.CommandOperationHelper.transformWriteException; import static com.mongodb.internal.operation.OperationHelper.LOGGER; import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite; import static com.mongodb.internal.operation.OperationHelper.validateWriteRequests; import static com.mongodb.internal.operation.OperationHelper.validateWriteRequestsAndCompleteIfInvalid; -import static com.mongodb.internal.operation.OperationHelper.withAsyncSourceAndConnection; -import static com.mongodb.internal.operation.OperationHelper.withSourceAndConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.withSourceAndConnection; /** * An operation to execute a series of write operations in bulk. diff --git a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java index 9ba182ebb58..387bb2f5da6 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/OperationHelper.java @@ -24,23 +24,12 @@ import com.mongodb.connection.ConnectionDescription; import com.mongodb.connection.ServerDescription; import com.mongodb.connection.ServerType; -import com.mongodb.internal.async.AsyncBatchCursor; import com.mongodb.internal.async.SingleResultCallback; -import com.mongodb.internal.async.function.AsyncCallbackBiFunction; import com.mongodb.internal.async.function.AsyncCallbackFunction; import com.mongodb.internal.async.function.AsyncCallbackSupplier; -import com.mongodb.internal.binding.AsyncConnectionSource; -import com.mongodb.internal.binding.AsyncReadBinding; -import com.mongodb.internal.binding.AsyncWriteBinding; -import com.mongodb.internal.binding.ConnectionSource; -import com.mongodb.internal.binding.ReadBinding; -import com.mongodb.internal.binding.ReferenceCounted; -import com.mongodb.internal.binding.WriteBinding; import com.mongodb.internal.bulk.DeleteRequest; import com.mongodb.internal.bulk.UpdateRequest; import com.mongodb.internal.bulk.WriteRequest; -import com.mongodb.internal.connection.AsyncConnection; -import com.mongodb.internal.connection.Connection; import com.mongodb.internal.connection.QueryResult; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; @@ -49,47 +38,22 @@ import com.mongodb.lang.Nullable; import org.bson.BsonDocument; import org.bson.BsonInt64; -import org.bson.BsonValue; import org.bson.codecs.Decoder; import org.bson.conversions.Bson; import java.util.Collections; import java.util.List; -import java.util.function.BiFunction; import java.util.function.Function; import java.util.function.Supplier; import static com.mongodb.assertions.Assertions.assertNotNull; -import static com.mongodb.assertions.Assertions.notNull; -import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsLessThanVersionFourDotFour; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsLessThanVersionFourDotTwo; import static java.lang.String.format; -import static java.util.Collections.singletonList; final class OperationHelper { public static final Logger LOGGER = Loggers.getLogger("operation"); - interface CallableWithConnection { - T call(Connection connection); - } - - interface CallableWithSource { - T call(ConnectionSource source); - } - - interface AsyncCallableWithConnection { - void call(@Nullable AsyncConnection connection, @Nullable Throwable t); - } - - interface AsyncCallableWithSource { - void call(@Nullable AsyncConnectionSource source, @Nullable Throwable t); - } - - interface AsyncCallableWithConnectionAndSource { - void call(@Nullable AsyncConnectionSource source, @Nullable AsyncConnection connection, @Nullable Throwable t); - } - static void validateCollationAndWriteConcern(@Nullable final Collation collation, final WriteConcern writeConcern) { if (collation != null && !writeConcern.isAcknowledged()) { throw new MongoClientException("Specifying collation with an unacknowledged WriteConcern is not supported"); @@ -127,7 +91,7 @@ static void validateHintForFindAndModify(final ConnectionDescription connectionD } } - static void validateWriteRequestCollations(final List requests, final WriteConcern writeConcern) { + private static void validateWriteRequestCollations(final List requests, final WriteConcern writeConcern) { Collation collation = null; for (WriteRequest request : requests) { if (request instanceof UpdateRequest) { @@ -142,7 +106,7 @@ static void validateWriteRequestCollations(final List re validateCollationAndWriteConcern(collation, writeConcern); } - static void validateUpdateRequestArrayFilters(final List requests, final WriteConcern writeConcern) { + private static void validateUpdateRequestArrayFilters(final List requests, final WriteConcern writeConcern) { for (WriteRequest request : requests) { List arrayFilters = null; if (request instanceof UpdateRequest) { @@ -155,7 +119,8 @@ static void validateUpdateRequestArrayFilters(final List } } - static void validateWriteRequestHints(final ConnectionDescription connectionDescription, final List requests, + private static void validateWriteRequestHints(final ConnectionDescription connectionDescription, + final List requests, final WriteConcern writeConcern) { for (WriteRequest request : requests) { Bson hint = null; @@ -194,7 +159,7 @@ static boolean validateWriteRequestsAndCompleteIfInvalid(final ConnectionDes } } - static void checkBypassDocumentValidationIsSupported(@Nullable final Boolean bypassDocumentValidation, + private static void checkBypassDocumentValidationIsSupported(@Nullable final Boolean bypassDocumentValidation, final WriteConcern writeConcern) { if (bypassDocumentValidation != null && !writeConcern.isAcknowledged()) { throw new MongoClientException("Specifying bypassDocumentValidation with an unacknowledged WriteConcern is not supported"); @@ -242,277 +207,36 @@ static QueryBatchCursor createEmptyBatchCursor(final MongoNamespace names 0, batchSize, decoder); } - static AsyncBatchCursor createEmptyAsyncBatchCursor(final MongoNamespace namespace, final ServerAddress serverAddress) { - return new AsyncSingleBatchQueryCursor<>(new QueryResult<>(namespace, Collections.emptyList(), 0L, serverAddress)); - } - - static BatchCursor cursorDocumentToBatchCursor(final BsonDocument cursorDocument, final Decoder decoder, - final BsonValue comment, final ConnectionSource source, final Connection connection, final int batchSize) { - return new QueryBatchCursor<>(OperationHelper.cursorDocumentToQueryResult(cursorDocument, - source.getServerDescription().getAddress()), - 0, batchSize, 0, decoder, comment, source, connection); - } - - static AsyncBatchCursor cursorDocumentToAsyncBatchCursor(final BsonDocument cursorDocument, final Decoder decoder, - final BsonValue comment, final AsyncConnectionSource source, final AsyncConnection connection, final int batchSize) { - return new AsyncQueryBatchCursor<>(OperationHelper.cursorDocumentToQueryResult(cursorDocument, - source.getServerDescription().getAddress()), - 0, batchSize, 0, decoder, comment, source, connection, cursorDocument); - } - - static QueryResult cursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress) { return cursorDocumentToQueryResult(cursorDocument, serverAddress, "firstBatch"); } - static QueryResult getMoreCursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress) { - return cursorDocumentToQueryResult(cursorDocument, serverAddress, "nextBatch"); - } - - private static QueryResult cursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress, - final String fieldNameContainingBatch) { + static QueryResult cursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress, + final String fieldNameContainingBatch) { long cursorId = ((BsonInt64) cursorDocument.get("id")).getValue(); MongoNamespace queryResultNamespace = new MongoNamespace(cursorDocument.getString("ns").getValue()); return new QueryResult<>(queryResultNamespace, BsonDocumentWrapperHelper.toList(cursorDocument, fieldNameContainingBatch), cursorId, serverAddress); } - static SingleResultCallback releasingCallback(final SingleResultCallback wrapped, final AsyncConnection connection) { - return new ReferenceCountedReleasingWrappedCallback<>(wrapped, singletonList(connection)); - } - - private static class ReferenceCountedReleasingWrappedCallback implements SingleResultCallback { - private final SingleResultCallback wrapped; - private final List referenceCounted; - - ReferenceCountedReleasingWrappedCallback(final SingleResultCallback wrapped, - final List referenceCounted) { - this.wrapped = wrapped; - this.referenceCounted = notNull("referenceCounted", referenceCounted); - } - - @Override - public void onResult(@Nullable final T result, @Nullable final Throwable t) { - for (ReferenceCounted cur : referenceCounted) { - if (cur != null) { - cur.release(); - } - } - wrapped.onResult(result, t); - } - } - - static T withReadConnectionSource(final ReadBinding binding, final CallableWithSource callable) { - ConnectionSource source = binding.getReadConnectionSource(); - try { - return callable.call(source); - } finally { - source.release(); - } - } - - static T withConnection(final WriteBinding binding, final CallableWithConnection callable) { - ConnectionSource source = binding.getWriteConnectionSource(); - try { - return withConnectionSource(source, callable); - } finally { - source.release(); - } - } - - static T withConnectionSource(final ConnectionSource source, final CallableWithConnection callable) { - Connection connection = source.getConnection(); - try { - return callable.call(connection); - } finally { - connection.release(); - } - } - - /** - * Gets a {@link ConnectionSource} and a {@link Connection} from the {@code sourceSupplier} and executes the {@code function} with them. - * Guarantees to {@linkplain ReferenceCounted#release() release} the source and the connection after completion of the {@code function}. - * - * @param wrapSourceConnectionException See {@link #withSuppliedResource(Supplier, boolean, Function)}. - * @see #withSuppliedResource(Supplier, boolean, Function) - * @see #withAsyncSourceAndConnection(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackBiFunction) - */ - static R withSourceAndConnection(final Supplier sourceSupplier, - final boolean wrapSourceConnectionException, - final BiFunction function) throws ResourceSupplierInternalException { - return withSuppliedResource(sourceSupplier, wrapSourceConnectionException, source -> - withSuppliedResource(source::getConnection, wrapSourceConnectionException, connection -> - function.apply(source, connection))); - } - - /** - * Gets a {@link ReferenceCounted} resource from the {@code resourceSupplier} and applies the {@code function} to it. - * Guarantees to {@linkplain ReferenceCounted#release() release} the resource after completion of the {@code function}. - * - * @param wrapSupplierException If {@code true} and {@code resourceSupplier} completes abruptly, then the exception is wrapped - * into {@link ResourceSupplierInternalException}, such that it can be accessed - * via {@link ResourceSupplierInternalException#getCause()}. - * @see #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) - */ - static R withSuppliedResource(final Supplier resourceSupplier, - final boolean wrapSupplierException, final Function function) throws ResourceSupplierInternalException { - T resource = null; - try { - try { - resource = resourceSupplier.get(); - } catch (Exception supplierException) { - if (wrapSupplierException) { - throw new ResourceSupplierInternalException(supplierException); - } else { - throw supplierException; - } - } - return function.apply(resource); - } finally { - if (resource != null) { - resource.release(); - } - } - } - - static void withAsyncConnection(final AsyncWriteBinding binding, final AsyncCallableWithConnection callable) { - binding.getWriteConnectionSource(errorHandlingCallback(new AsyncCallableWithConnectionCallback(callable), LOGGER)); - } - - static void withAsyncConnection(final AsyncWriteBinding binding, final AsyncCallableWithConnectionAndSource callable) { - binding.getWriteConnectionSource(errorHandlingCallback(new AsyncCallableWithConnectionAndSourceCallback(callable), LOGGER)); - } - - static void withAsyncReadConnection(final AsyncReadBinding binding, final AsyncCallableWithSource callable) { - binding.getReadConnectionSource(errorHandlingCallback(new AsyncCallableWithSourceCallback(callable), LOGGER)); - } - - /** - * @see #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) - * @see #withSourceAndConnection(Supplier, boolean, BiFunction) - */ - static void withAsyncSourceAndConnection(final AsyncCallbackSupplier sourceAsyncSupplier, - final boolean wrapSourceConnectionException, - final SingleResultCallback callback, - final AsyncCallbackBiFunction asyncFunction) - throws ResourceSupplierInternalException { - SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, LOGGER); - withAsyncSuppliedResource(sourceAsyncSupplier, wrapSourceConnectionException, errorHandlingCallback, - (source, sourceReleasingCallback) -> - withAsyncSuppliedResource(source::getConnection, wrapSourceConnectionException, sourceReleasingCallback, - (connection, connectionAndSourceReleasingCallback) -> - asyncFunction.apply(source, connection, connectionAndSourceReleasingCallback))); - } - - /** - * @see #withSuppliedResource(Supplier, boolean, Function) - */ - static void withAsyncSuppliedResource(final AsyncCallbackSupplier resourceSupplier, - final boolean wrapSourceConnectionException, final SingleResultCallback callback, - final AsyncCallbackFunction function) throws ResourceSupplierInternalException { - SingleResultCallback errorHandlingCallback = errorHandlingCallback(callback, LOGGER); - resourceSupplier.get((resource, supplierException) -> { - if (supplierException != null) { - if (wrapSourceConnectionException) { - supplierException = new ResourceSupplierInternalException(supplierException); - } - errorHandlingCallback.onResult(null, supplierException); - } else { - assertNotNull(resource); - AsyncCallbackSupplier curriedFunction = clbk -> function.apply(resource, clbk); - curriedFunction.whenComplete(resource::release) - .get(errorHandlingCallback); - } - }); - } - - private static class AsyncCallableWithConnectionCallback implements SingleResultCallback { - private final AsyncCallableWithConnection callable; - AsyncCallableWithConnectionCallback(final AsyncCallableWithConnection callable) { - this.callable = callable; - } - @Override - public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) { - if (t != null) { - callable.call(null, t); - } else { - withAsyncConnectionSourceCallableConnection(assertNotNull(source), callable); - } - } - } - - private static class AsyncCallableWithSourceCallback implements SingleResultCallback { - private final AsyncCallableWithSource callable; - AsyncCallableWithSourceCallback(final AsyncCallableWithSource callable) { - this.callable = callable; - } - @Override - public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) { - if (t != null) { - callable.call(null, t); - } else { - withAsyncConnectionSource(assertNotNull(source), callable); - } - } - } - - private static void withAsyncConnectionSourceCallableConnection(final AsyncConnectionSource source, - final AsyncCallableWithConnection callable) { - source.getConnection((connection, t) -> { - source.release(); - if (t != null) { - callable.call(null, t); - } else { - callable.call(connection, null); - } - }); - } - - private static void withAsyncConnectionSource(final AsyncConnectionSource source, final AsyncCallableWithSource callable) { - callable.call(source, null); - } - - private static void withAsyncConnectionSource(final AsyncConnectionSource source, final AsyncCallableWithConnectionAndSource callable) { - source.getConnection((result, t) -> callable.call(source, result, t)); - } - - private static class AsyncCallableWithConnectionAndSourceCallback implements SingleResultCallback { - private final AsyncCallableWithConnectionAndSource callable; - - AsyncCallableWithConnectionAndSourceCallback(final AsyncCallableWithConnectionAndSource callable) { - this.callable = callable; - } - - @Override - public void onResult(@Nullable final AsyncConnectionSource source, @Nullable final Throwable t) { - if (t != null) { - callable.call(null, null, t); - } else { - withAsyncConnectionSource(assertNotNull(source), callable); - } - } - } - - private OperationHelper() { - } /** * This internal exception is used to *
    - *
  • on one hand allow propagating exceptions from {@link #withSuppliedResource(Supplier, boolean, Function)} / - * {@link #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction)} and similar - * methods so that they can be properly retried, which is useful, e.g., + *
  • on one hand allow propagating exceptions from {@link SyncOperationHelper#withSuppliedResource(Supplier, boolean, Function)} / + * {@link AsyncOperationHelper#withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction)} + * and similar methods so that they can be properly retried, which is useful, e.g., * for {@link com.mongodb.MongoConnectionPoolClearedException};
  • *
  • on the other hand to prevent them from propagation once the retry decision is made.
  • *
* - * @see #withSuppliedResource(Supplier, boolean, Function) - * @see #withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) + * @see SyncOperationHelper#withSuppliedResource(Supplier, boolean, Function) + * @see AsyncOperationHelper#withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) */ - static final class ResourceSupplierInternalException extends RuntimeException { + public static final class ResourceSupplierInternalException extends RuntimeException { private static final long serialVersionUID = 0; - private ResourceSupplierInternalException(final Throwable cause) { + ResourceSupplierInternalException(final Throwable cause) { super(assertNotNull(cause)); } @@ -522,4 +246,7 @@ public Throwable getCause() { return assertNotNull(super.getCause()); } } + + private OperationHelper() { + } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java index a59955bac0e..139c3e6fd27 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java +++ b/driver-core/src/main/com/mongodb/internal/operation/QueryBatchCursor.java @@ -59,7 +59,7 @@ import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn; import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull; -import static com.mongodb.internal.operation.OperationHelper.getMoreCursorDocumentToQueryResult; +import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult; import static com.mongodb.internal.operation.QueryHelper.translateCommandException; import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour; import static java.lang.String.format; diff --git a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java index abec482ad6c..d6f7ee897ae 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/RenameCollectionOperation.java @@ -29,14 +29,14 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand; -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorWriteTransformer; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.releasingCallback; +import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncConnection; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; import static com.mongodb.internal.operation.OperationHelper.LOGGER; -import static com.mongodb.internal.operation.OperationHelper.releasingCallback; -import static com.mongodb.internal.operation.OperationHelper.withAsyncConnection; -import static com.mongodb.internal.operation.OperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand; +import static com.mongodb.internal.operation.SyncOperationHelper.withConnection; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand; /** @@ -90,7 +90,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall errHandlingCallback.onResult(null, t); } else { executeCommandAsync(binding, "admin", getCommand(), assertNotNull(connection), - writeConcernErrorWriteTransformer(), releasingCallback(errHandlingCallback, connection)); + writeConcernErrorTransformerAsync(), releasingCallback(errHandlingCallback, connection)); } }); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java new file mode 100644 index 00000000000..67d5acf9c37 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperationHelper.java @@ -0,0 +1,318 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.MongoException; +import com.mongodb.ReadPreference; +import com.mongodb.ServerAddress; +import com.mongodb.internal.VisibleForTesting; +import com.mongodb.internal.async.SingleResultCallback; +import com.mongodb.internal.async.function.AsyncCallbackBiFunction; +import com.mongodb.internal.async.function.AsyncCallbackFunction; +import com.mongodb.internal.async.function.AsyncCallbackSupplier; +import com.mongodb.internal.async.function.RetryState; +import com.mongodb.internal.async.function.RetryingSyncSupplier; +import com.mongodb.internal.binding.ConnectionSource; +import com.mongodb.internal.binding.ReadBinding; +import com.mongodb.internal.binding.ReferenceCounted; +import com.mongodb.internal.binding.WriteBinding; +import com.mongodb.internal.connection.Connection; +import com.mongodb.internal.connection.OperationContext; +import com.mongodb.internal.connection.QueryResult; +import com.mongodb.internal.operation.retry.AttachmentKeys; +import com.mongodb.internal.validator.NoOpFieldNameValidator; +import com.mongodb.lang.Nullable; +import org.bson.BsonDocument; +import org.bson.BsonValue; +import org.bson.FieldNameValidator; +import org.bson.codecs.BsonDocumentCodec; +import org.bson.codecs.Decoder; + +import java.util.function.BiFunction; +import java.util.function.Function; +import java.util.function.Supplier; + +import static com.mongodb.ReadPreference.primary; +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.assertions.Assertions.assertNotNull; +import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; +import static com.mongodb.internal.operation.CommandOperationHelper.logRetryExecute; +import static com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException; +import static com.mongodb.internal.operation.OperationHelper.canRetryRead; +import static com.mongodb.internal.operation.OperationHelper.canRetryWrite; +import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult; +import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError; + +final class SyncOperationHelper { + + interface CallableWithConnection { + T call(Connection connection); + } + + interface CallableWithSource { + T call(ConnectionSource source); + } + + interface CommandReadTransformer { + + /** + * Yield an appropriate result object for the input object. + * + * @param t the input object + * @return the function result + */ + @Nullable + R apply(T t, ConnectionSource source, Connection connection); + } + + interface CommandWriteTransformer { + + /** + * Yield an appropriate result object for the input object. + * + * @param t the input object + * @return the function result + */ + @Nullable + R apply(T t, Connection connection); + } + + static T withReadConnectionSource(final ReadBinding binding, final CallableWithSource callable) { + ConnectionSource source = binding.getReadConnectionSource(); + try { + return callable.call(source); + } finally { + source.release(); + } + } + + static T withConnection(final WriteBinding binding, final CallableWithConnection callable) { + ConnectionSource source = binding.getWriteConnectionSource(); + try { + return withConnectionSource(source, callable); + } finally { + source.release(); + } + } + + /** + * Gets a {@link ConnectionSource} and a {@link Connection} from the {@code sourceSupplier} and executes the {@code function} with them. + * Guarantees to {@linkplain ReferenceCounted#release() release} the source and the connection after completion of the {@code function}. + * + * @param wrapConnectionSourceException See {@link #withSuppliedResource(Supplier, boolean, Function)}. + * @see #withSuppliedResource(Supplier, boolean, Function) + * @see AsyncOperationHelper#withAsyncSourceAndConnection(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackBiFunction) + */ + static R withSourceAndConnection(final Supplier sourceSupplier, + final boolean wrapConnectionSourceException, + final BiFunction function) throws ResourceSupplierInternalException { + return withSuppliedResource(sourceSupplier, wrapConnectionSourceException, source -> + withSuppliedResource(source::getConnection, wrapConnectionSourceException, connection -> + function.apply(source, connection))); + } + + /** + * Gets a {@link ReferenceCounted} resource from the {@code resourceSupplier} and applies the {@code function} to it. + * Guarantees to {@linkplain ReferenceCounted#release() release} the resource after completion of the {@code function}. + * + * @param wrapSupplierException If {@code true} and {@code resourceSupplier} completes abruptly, then the exception is wrapped + * into {@link OperationHelper.ResourceSupplierInternalException}, such that it can be accessed + * via {@link OperationHelper.ResourceSupplierInternalException#getCause()}. + * @see AsyncOperationHelper#withAsyncSuppliedResource(AsyncCallbackSupplier, boolean, SingleResultCallback, AsyncCallbackFunction) + */ + static R withSuppliedResource(final Supplier resourceSupplier, + final boolean wrapSupplierException, final Function function) throws OperationHelper.ResourceSupplierInternalException { + T resource = null; + try { + try { + resource = resourceSupplier.get(); + } catch (Exception supplierException) { + if (wrapSupplierException) { + throw new ResourceSupplierInternalException(supplierException); + } else { + throw supplierException; + } + } + return function.apply(resource); + } finally { + if (resource != null) { + resource.release(); + } + } + } + + private static T withConnectionSource(final ConnectionSource source, final CallableWithConnection callable) { + Connection connection = source.getConnection(); + try { + return callable.call(connection); + } finally { + connection.release(); + } + } + + static T executeRetryableRead( + final ReadBinding binding, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformer transformer, + final boolean retryReads) { + return executeRetryableRead(binding, binding::getReadConnectionSource, database, commandCreator, decoder, transformer, retryReads); + } + + static T executeRetryableRead( + final ReadBinding binding, + final Supplier readConnectionSourceSupplier, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformer transformer, + final boolean retryReads) { + RetryState retryState = CommandOperationHelper.initialRetryState(retryReads); + Supplier read = decorateReadWithRetries(retryState, binding.getOperationContext(), () -> + withSourceAndConnection(readConnectionSourceSupplier, false, (source, connection) -> { + retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext())); + return createReadCommandAndExecute(retryState, binding, source, database, commandCreator, decoder, transformer, connection); + }) + ); + return read.get(); + } + + @VisibleForTesting(otherwise = PRIVATE) + static T executeCommand(final WriteBinding binding, final String database, final BsonDocument command, + final Decoder decoder, final CommandWriteTransformer transformer) { + return withSourceAndConnection(binding::getWriteConnectionSource, false, (source, connection) -> + transformer.apply(assertNotNull( + connection.command(database, command, new NoOpFieldNameValidator(), primary(), decoder, binding)), connection)); + } + + @Nullable + static T executeCommand(final WriteBinding binding, final String database, final BsonDocument command, + final Connection connection, final CommandWriteTransformer transformer) { + notNull("binding", binding); + return transformer.apply(assertNotNull( + connection.command(database, command, new NoOpFieldNameValidator(), primary(), new BsonDocumentCodec(), binding)), + connection); + } + + static R executeRetryableWrite( + final WriteBinding binding, + final String database, + @Nullable final ReadPreference readPreference, + final FieldNameValidator fieldNameValidator, + final Decoder commandResultDecoder, + final CommandCreator commandCreator, + final CommandWriteTransformer transformer, + final com.mongodb.Function retryCommandModifier) { + RetryState retryState = CommandOperationHelper.initialRetryState(true); + Supplier retryingWrite = decorateWriteWithRetries(retryState, binding.getOperationContext(), () -> { + boolean firstAttempt = retryState.isFirstAttempt(); + if (!firstAttempt && binding.getSessionContext().hasActiveTransaction()) { + binding.getSessionContext().clearTransactionContext(); + } + return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> { + int maxWireVersion = connection.getDescription().getMaxWireVersion(); + try { + retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext())); + BsonDocument command = retryState.attachment(AttachmentKeys.command()) + .map(previousAttemptCommand -> { + assertFalse(firstAttempt); + return retryCommandModifier.apply(previousAttemptCommand); + }).orElseGet(() -> commandCreator.create(source.getServerDescription(), connection.getDescription())); + // attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry + retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true) + .attach(AttachmentKeys.retryableCommandFlag(), CommandOperationHelper.isRetryWritesEnabled(command), true) + .attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false) + .attach(AttachmentKeys.command(), command, false); + return transformer.apply(assertNotNull(connection.command(database, command, fieldNameValidator, readPreference, + commandResultDecoder, binding)), + connection); + } catch (MongoException e) { + if (!firstAttempt) { + CommandOperationHelper.addRetryableWriteErrorLabel(e, maxWireVersion); + } + throw e; + } + }); + }); + try { + return retryingWrite.get(); + } catch (MongoException e) { + throw CommandOperationHelper.transformWriteException(e); + } + } + + @Nullable + static T createReadCommandAndExecute( + final RetryState retryState, + final ReadBinding binding, + final ConnectionSource source, + final String database, + final CommandCreator commandCreator, + final Decoder decoder, + final CommandReadTransformer transformer, + final Connection connection) { + BsonDocument command = commandCreator.create(source.getServerDescription(), connection.getDescription()); + retryState.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false); + return transformer.apply(assertNotNull(connection.command(database, command, new NoOpFieldNameValidator(), + source.getReadPreference(), decoder, binding)), source, connection); + } + + + static Supplier decorateWriteWithRetries(final RetryState retryState, + final OperationContext operationContext, final Supplier writeFunction) { + return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableWriteException, + CommandOperationHelper::shouldAttemptToRetryWrite, () -> { + logRetryExecute(retryState, operationContext); + return writeFunction.get(); + }); + } + + static Supplier decorateReadWithRetries(final RetryState retryState, final OperationContext operationContext, + final Supplier readFunction) { + return new RetryingSyncSupplier<>(retryState, CommandOperationHelper::chooseRetryableReadException, + CommandOperationHelper::shouldAttemptToRetryRead, () -> { + logRetryExecute(retryState, operationContext); + return readFunction.get(); + }); + } + + + static CommandWriteTransformer writeConcernErrorTransformer() { + return (result, connection) -> { + assertNotNull(result); + throwOnWriteConcernError(result, connection.getDescription().getServerAddress(), + connection.getDescription().getMaxWireVersion()); + return null; + }; + } + + static BatchCursor cursorDocumentToBatchCursor(final BsonDocument cursorDocument, final Decoder decoder, + final BsonValue comment, final ConnectionSource source, final Connection connection, final int batchSize) { + return new QueryBatchCursor<>(cursorDocumentToQueryResult(cursorDocument, source.getServerDescription().getAddress()), + 0, batchSize, 0, decoder, comment, source, connection); + } + + static QueryResult getMoreCursorDocumentToQueryResult(final BsonDocument cursorDocument, final ServerAddress serverAddress) { + return cursorDocumentToQueryResult(cursorDocument, serverAddress, "nextBatch"); + } + + private SyncOperationHelper() { + } +} diff --git a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java index 3f3d680ddd2..499623ebcce 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java @@ -21,7 +21,6 @@ import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.WriteBinding; -import com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; import com.mongodb.internal.validator.NoOpFieldNameValidator; import org.bson.BsonDocument; import org.bson.BsonInt32; @@ -30,11 +29,12 @@ import static com.mongodb.assertions.Assertions.isTrue; import static com.mongodb.assertions.Assertions.notNull; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWrite; -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWriteAsync; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformer; -import static com.mongodb.internal.operation.CommandOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableWriteAsync; +import static com.mongodb.internal.operation.AsyncOperationHelper.writeConcernErrorTransformerAsync; +import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator; import static com.mongodb.internal.operation.OperationHelper.LOGGER; +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableWrite; +import static com.mongodb.internal.operation.SyncOperationHelper.writeConcernErrorTransformer; /** * A base class for transaction-related operations diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncOperationHelperSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncOperationHelperSpecification.groovy new file mode 100644 index 00000000000..f897413e12d --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/AsyncOperationHelperSpecification.groovy @@ -0,0 +1,160 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation + + +import com.mongodb.MongoWriteConcernException +import com.mongodb.ReadConcern +import com.mongodb.ReadPreference +import com.mongodb.connection.ConnectionDescription +import com.mongodb.connection.ServerDescription +import com.mongodb.connection.ServerType +import com.mongodb.internal.async.SingleResultCallback +import com.mongodb.internal.binding.AsyncConnectionSource +import com.mongodb.internal.binding.AsyncReadBinding +import com.mongodb.internal.binding.AsyncWriteBinding +import com.mongodb.internal.connection.AsyncConnection +import com.mongodb.internal.session.SessionContext +import com.mongodb.internal.validator.NoOpFieldNameValidator +import org.bson.BsonDocument +import org.bson.BsonNull +import org.bson.codecs.BsonDocumentCodec +import org.bson.codecs.Decoder +import spock.lang.Specification + +import static com.mongodb.ReadPreference.primary +import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync +import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync +import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableWriteAsync +import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion + +class AsyncOperationHelperSpecification extends Specification { + + def 'should retry with retryable exception async'() { + given: + def dbName = 'db' + def command = BsonDocument.parse('''{findAndModify: "coll", query: {a: 1}, new: false, update: {$inc: {a :1}}, txnNumber: 1}''') + def serverDescription = Stub(ServerDescription) + def connectionDescription = Stub(ConnectionDescription) { + getMaxWireVersion() >> getMaxWireVersionForServerVersion([4, 0, 0]) + getServerType() >> ServerType.REPLICA_SET_PRIMARY + } + def commandCreator = { serverDesc, connectionDesc -> command } + def callback = new SingleResultCallback() { + def result + def throwable + @Override + void onResult(final Object result, final Throwable t) { + this.result = result + this.throwable = t + } + } + def decoder = new BsonDocumentCodec() + def results = [ + BsonDocument.parse('{ok: 1.0, writeConcernError: {code: 91, errmsg: "Replication is being shut down"}}'), + BsonDocument.parse('{ok: 1.0, writeConcernError: {code: -1, errmsg: "UnknownError"}}')] as Queue + + def connection = Mock(AsyncConnection) { + _ * getDescription() >> connectionDescription + } + + def connectionSource = Stub(AsyncConnectionSource) { + getServerApi() >> null + getConnection(_) >> { it[0].onResult(connection, null) } + _ * getServerDescription() >> serverDescription + } + def asyncWriteBinding = Stub(AsyncWriteBinding) { + getServerApi() >> null + getWriteConnectionSource(_) >> { it[0].onResult(connectionSource, null) } + getSessionContext() >> Stub(SessionContext) { + hasSession() >> true + hasActiveTransaction() >> false + getReadConcern() >> ReadConcern.DEFAULT + } + } + + when: + executeRetryableWriteAsync(asyncWriteBinding, dbName, primary(), new NoOpFieldNameValidator(), decoder, + commandCreator, FindAndModifyHelper.asyncTransformer(), { cmd -> cmd }, callback) + + then: + 2 * connection.commandAsync(dbName, command, _, primary(), decoder, *_) >> { it.last().onResult(results.poll(), null) } + + then: + callback.throwable instanceof MongoWriteConcernException + callback.throwable.writeConcernError.code == -1 + } + + def 'should set read preference to primary when using AsyncWriteBinding'() { + given: + def dbName = 'db' + def command = new BsonDocument() + def callback = Stub(SingleResultCallback) + def connection = Mock(AsyncConnection) + def connectionSource = Stub(AsyncConnectionSource) { + getServerApi() >> null + getConnection(_) >> { it[0].onResult(connection, null) } + } + def asyncWriteBinding = Stub(AsyncWriteBinding) { + getServerApi() >> null + getWriteConnectionSource(_) >> { it[0].onResult(connectionSource, null) } + } + def connectionDescription = Stub(ConnectionDescription) + + when: + executeCommandAsync(asyncWriteBinding, dbName, command, connection, { t, conn -> t }, callback) + + then: + _ * connection.getDescription() >> connectionDescription + 1 * connection.commandAsync(dbName, command, _, primary(), *_) >> { it.last().onResult(1, null) } +// 1 * connection.release() + } + + def 'should use the AsyncConnectionSource readPreference'() { + given: + def dbName = 'db' + def command = new BsonDocument('fakeCommandName', BsonNull.VALUE) + def commandCreator = { serverDescription, connectionDescription -> command } + def decoder = Stub(Decoder) + def callback = Stub(SingleResultCallback) + def function = Stub(CommandReadTransformerAsync) + def connection = Mock(AsyncConnection) + def connectionSource = Stub(AsyncConnectionSource) { + getServerApi() >> null + getConnection(_) >> { it[0].onResult(connection, null) } + getReadPreference() >> readPreference + } + def asyncReadBinding = Stub(AsyncReadBinding) { + getServerApi() >> null + getReadConnectionSource(_) >> { it[0].onResult(connectionSource, null) } + } + def connectionDescription = Stub(ConnectionDescription) + + when: + executeRetryableReadAsync(asyncReadBinding, dbName, commandCreator, decoder, function, false, callback) + + then: + _ * connection.getDescription() >> connectionDescription + 1 * connection.commandAsync(dbName, command, _, readPreference, decoder, *_) >> { it.last().onResult(1, null) } + 1 * connection.release() + + where: + readPreference << [primary(), ReadPreference.secondary()] + } + +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandOperationHelperSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandOperationHelperSpecification.groovy index e43f3587f86..38b3ad48f25 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/operation/CommandOperationHelperSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/CommandOperationHelperSpecification.groovy @@ -17,43 +17,15 @@ package com.mongodb.internal.operation import com.mongodb.MongoCommandException -import com.mongodb.MongoWriteConcernException -import com.mongodb.ReadConcern -import com.mongodb.ReadPreference import com.mongodb.ServerAddress -import com.mongodb.connection.ConnectionDescription -import com.mongodb.connection.ServerDescription -import com.mongodb.connection.ServerType -import com.mongodb.internal.async.SingleResultCallback -import com.mongodb.internal.binding.AsyncConnectionSource -import com.mongodb.internal.binding.AsyncReadBinding -import com.mongodb.internal.binding.AsyncWriteBinding -import com.mongodb.internal.binding.ConnectionSource -import com.mongodb.internal.binding.ReadBinding -import com.mongodb.internal.binding.WriteBinding -import com.mongodb.internal.connection.AsyncConnection -import com.mongodb.internal.connection.Connection -import com.mongodb.internal.session.SessionContext -import com.mongodb.internal.validator.NoOpFieldNameValidator import org.bson.BsonBoolean import org.bson.BsonDocument import org.bson.BsonInt32 -import org.bson.BsonNull import org.bson.BsonString -import org.bson.codecs.BsonDocumentCodec -import org.bson.codecs.Decoder import spock.lang.Specification -import static com.mongodb.ReadPreference.primary -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommand -import static com.mongodb.internal.operation.CommandOperationHelper.executeCommandAsync -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWrite -import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableWriteAsync import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError -import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion class CommandOperationHelperSpecification extends Specification { @@ -118,216 +90,4 @@ class CommandOperationHelperSpecification extends Specification { new ServerAddress()), 'some value') == 'some value' } - def 'should set read preference to primary to false when using WriteBinding'() { - given: - def dbName = 'db' - def command = new BsonDocument() - def decoder = Stub(Decoder) - def connection = Mock(Connection) - def function = Stub(CommandOperationHelper.CommandWriteTransformer) - def connectionSource = Stub(ConnectionSource) { - getServerApi() >> null - getConnection() >> connection - } - def writeBinding = Stub(WriteBinding) { - getServerApi() >> null - getWriteConnectionSource() >> connectionSource - } - def connectionDescription = Stub(ConnectionDescription) - - when: - executeCommand(writeBinding, dbName, command, decoder, function) - - then: - _ * connection.getDescription() >> connectionDescription - 1 * connection.command(dbName, command, _, primary(), decoder, writeBinding) >> new BsonDocument() - 1 * connection.release() - } - - def 'should retry with retryable exception'() { - given: - def dbName = 'db' - def command = BsonDocument.parse('''{findAndModify: "coll", query: {a: 1}, new: false, update: {$inc: {a :1}}, txnNumber: 1}''') - def commandCreator = { serverDescription, connectionDescription -> command } - def decoder = new BsonDocumentCodec() - def results = [ - BsonDocument.parse('{ok: 1.0, writeConcernError: {code: 91, errmsg: "Replication is being shut down"}}'), - BsonDocument.parse('{ok: 1.0, writeConcernError: {code: -1, errmsg: "UnknownError"}}')] as Queue - def connection = Mock(Connection) { - _ * release() - _ * getDescription() >> Stub(ConnectionDescription) { - getMaxWireVersion() >> getMaxWireVersionForServerVersion([4, 0, 0]) - getServerType() >> ServerType.REPLICA_SET_PRIMARY - } - } - def connectionSource = Stub(ConnectionSource) { - _ * getConnection() >> connection - _ * getServerDescription() >> Stub(ServerDescription) { - getLogicalSessionTimeoutMinutes() >> 1 - } - } - def writeBinding = Stub(WriteBinding) { - getWriteConnectionSource() >> connectionSource - getServerApi() >> null - getSessionContext() >> Stub(SessionContext) { - hasSession() >> true - hasActiveTransaction() >> false - getReadConcern() >> ReadConcern.DEFAULT - } - } - - when: - executeRetryableWrite(writeBinding, dbName, primary(), new NoOpFieldNameValidator(), decoder, commandCreator, - FindAndModifyHelper.transformer()) { cmd -> cmd } - - then: - 2 * connection.command(dbName, command, _, primary(), decoder, writeBinding) >> { results.poll() } - - then: - def ex = thrown(MongoWriteConcernException) - ex.writeConcernError.code == -1 - } - - def 'should retry with retryable exception async'() { - given: - def dbName = 'db' - def command = BsonDocument.parse('''{findAndModify: "coll", query: {a: 1}, new: false, update: {$inc: {a :1}}, txnNumber: 1}''') - def serverDescription = Stub(ServerDescription) - def connectionDescription = Stub(ConnectionDescription) { - getMaxWireVersion() >> getMaxWireVersionForServerVersion([4, 0, 0]) - getServerType() >> ServerType.REPLICA_SET_PRIMARY - } - def commandCreator = { serverDesc, connectionDesc -> command } - def callback = new SingleResultCallback() { - def result - def throwable - @Override - void onResult(final Object result, final Throwable t) { - this.result = result - this.throwable = t - } - } - def decoder = new BsonDocumentCodec() - def results = [ - BsonDocument.parse('{ok: 1.0, writeConcernError: {code: 91, errmsg: "Replication is being shut down"}}'), - BsonDocument.parse('{ok: 1.0, writeConcernError: {code: -1, errmsg: "UnknownError"}}')] as Queue - - def connection = Mock(AsyncConnection) { - _ * getDescription() >> connectionDescription - } - - def connectionSource = Stub(AsyncConnectionSource) { - getServerApi() >> null - getConnection(_) >> { it[0].onResult(connection, null) } - _ * getServerDescription() >> serverDescription - } - def asyncWriteBinding = Stub(AsyncWriteBinding) { - getServerApi() >> null - getWriteConnectionSource(_) >> { it[0].onResult(connectionSource, null) } - getSessionContext() >> Stub(SessionContext) { - hasSession() >> true - hasActiveTransaction() >> false - getReadConcern() >> ReadConcern.DEFAULT - } - } - - when: - executeRetryableWriteAsync(asyncWriteBinding, dbName, primary(), new NoOpFieldNameValidator(), decoder, - commandCreator, FindAndModifyHelper.asyncTransformer(), { cmd -> cmd }, callback) - - then: - 2 * connection.commandAsync(dbName, command, _, primary(), decoder, *_) >> { it.last().onResult(results.poll(), null) } - - then: - callback.throwable instanceof MongoWriteConcernException - callback.throwable.writeConcernError.code == -1 - } - - def 'should use the ConnectionSource readPreference'() { - given: - def dbName = 'db' - def command = new BsonDocument('fakeCommandName', BsonNull.VALUE) - def commandCreator = { serverDescription, connectionDescription -> command } - def decoder = Stub(Decoder) - def function = Stub(CommandOperationHelper.CommandReadTransformer) - def connection = Mock(Connection) - def connectionSource = Stub(ConnectionSource) { - getConnection() >> connection - getReadPreference() >> readPreference - } - def readBinding = Stub(ReadBinding) { - getReadConnectionSource() >> connectionSource - getServerApi() >> null - } - def connectionDescription = Stub(ConnectionDescription) - - when: - executeRetryableRead(readBinding, dbName, commandCreator, decoder, function, false) - - then: - _ * connection.getDescription() >> connectionDescription - 1 * connection.command(dbName, command, _, readPreference, decoder, readBinding) >> new BsonDocument() - 1 * connection.release() - - where: - readPreference << [primary(), ReadPreference.secondary()] - } - - def 'should set read preference to primary when using AsyncWriteBinding'() { - given: - def dbName = 'db' - def command = new BsonDocument() - def callback = Stub(SingleResultCallback) - def connection = Mock(AsyncConnection) - def connectionSource = Stub(AsyncConnectionSource) { - getServerApi() >> null - getConnection(_) >> { it[0].onResult(connection, null) } - } - def asyncWriteBinding = Stub(AsyncWriteBinding) { - getServerApi() >> null - getWriteConnectionSource(_) >> { it[0].onResult(connectionSource, null) } - } - def connectionDescription = Stub(ConnectionDescription) - - when: - executeCommandAsync(asyncWriteBinding, dbName, command, connection, { t, conn -> t }, callback) - - then: - _ * connection.getDescription() >> connectionDescription - 1 * connection.commandAsync(dbName, command, _, primary(), *_) >> { it.last().onResult(1, null) } -// 1 * connection.release() - } - - def 'should use the AsyncConnectionSource readPreference'() { - given: - def dbName = 'db' - def command = new BsonDocument('fakeCommandName', BsonNull.VALUE) - def commandCreator = { serverDescription, connectionDescription -> command } - def decoder = Stub(Decoder) - def callback = Stub(SingleResultCallback) - def function = Stub(CommandOperationHelper.CommandReadTransformerAsync) - def connection = Mock(AsyncConnection) - def connectionSource = Stub(AsyncConnectionSource) { - getServerApi() >> null - getConnection(_) >> { it[0].onResult(connection, null) } - getReadPreference() >> readPreference - } - def asyncReadBinding = Stub(AsyncReadBinding) { - getServerApi() >> null - getReadConnectionSource(_) >> { it[0].onResult(connectionSource, null) } - } - def connectionDescription = Stub(ConnectionDescription) - - when: - executeRetryableReadAsync(asyncReadBinding, dbName, commandCreator, decoder, function, false, callback) - - then: - _ * connection.getDescription() >> connectionDescription - 1 * connection.commandAsync(dbName, command, _, readPreference, decoder, *_) >> { it.last().onResult(1, null) } - 1 * connection.release() - - where: - readPreference << [primary(), ReadPreference.secondary()] - } - } diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/SyncOperationHelperSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/operation/SyncOperationHelperSpecification.groovy new file mode 100644 index 00000000000..a18148911bf --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/SyncOperationHelperSpecification.groovy @@ -0,0 +1,148 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation + + +import com.mongodb.MongoWriteConcernException +import com.mongodb.ReadConcern +import com.mongodb.ReadPreference +import com.mongodb.connection.ConnectionDescription +import com.mongodb.connection.ServerDescription +import com.mongodb.connection.ServerType +import com.mongodb.internal.binding.ConnectionSource +import com.mongodb.internal.binding.ReadBinding +import com.mongodb.internal.binding.WriteBinding +import com.mongodb.internal.connection.Connection +import com.mongodb.internal.session.SessionContext +import com.mongodb.internal.validator.NoOpFieldNameValidator +import org.bson.BsonDocument +import org.bson.BsonNull +import org.bson.codecs.BsonDocumentCodec +import org.bson.codecs.Decoder +import spock.lang.Specification + +import static com.mongodb.ReadPreference.primary +import static com.mongodb.internal.operation.OperationUnitSpecification.getMaxWireVersionForServerVersion +import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer +import static com.mongodb.internal.operation.SyncOperationHelper.CommandWriteTransformer +import static com.mongodb.internal.operation.SyncOperationHelper.executeCommand +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead +import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableWrite + +class SyncOperationHelperSpecification extends Specification { + + def 'should set read preference to primary to false when using WriteBinding'() { + given: + def dbName = 'db' + def command = new BsonDocument() + def decoder = Stub(Decoder) + def connection = Mock(Connection) + def function = Stub(CommandWriteTransformer) + def connectionSource = Stub(ConnectionSource) { + getServerApi() >> null + getConnection() >> connection + } + def writeBinding = Stub(WriteBinding) { + getServerApi() >> null + getWriteConnectionSource() >> connectionSource + } + def connectionDescription = Stub(ConnectionDescription) + + when: + executeCommand(writeBinding, dbName, command, decoder, function) + + then: + _ * connection.getDescription() >> connectionDescription + 1 * connection.command(dbName, command, _, primary(), decoder, writeBinding) >> new BsonDocument() + 1 * connection.release() + } + + def 'should retry with retryable exception'() { + given: + def dbName = 'db' + def command = BsonDocument.parse('''{findAndModify: "coll", query: {a: 1}, new: false, update: {$inc: {a :1}}, txnNumber: 1}''') + def commandCreator = { serverDescription, connectionDescription -> command } + def decoder = new BsonDocumentCodec() + def results = [ + BsonDocument.parse('{ok: 1.0, writeConcernError: {code: 91, errmsg: "Replication is being shut down"}}'), + BsonDocument.parse('{ok: 1.0, writeConcernError: {code: -1, errmsg: "UnknownError"}}')] as Queue + def connection = Mock(Connection) { + _ * release() + _ * getDescription() >> Stub(ConnectionDescription) { + getMaxWireVersion() >> getMaxWireVersionForServerVersion([4, 0, 0]) + getServerType() >> ServerType.REPLICA_SET_PRIMARY + } + } + def connectionSource = Stub(ConnectionSource) { + _ * getConnection() >> connection + _ * getServerDescription() >> Stub(ServerDescription) { + getLogicalSessionTimeoutMinutes() >> 1 + } + } + def writeBinding = Stub(WriteBinding) { + getWriteConnectionSource() >> connectionSource + getServerApi() >> null + getSessionContext() >> Stub(SessionContext) { + hasSession() >> true + hasActiveTransaction() >> false + getReadConcern() >> ReadConcern.DEFAULT + } + } + + when: + executeRetryableWrite(writeBinding, dbName, primary(), new NoOpFieldNameValidator(), decoder, commandCreator, + FindAndModifyHelper.transformer()) { cmd -> cmd } + + then: + 2 * connection.command(dbName, command, _, primary(), decoder, writeBinding) >> { results.poll() } + + then: + def ex = thrown(MongoWriteConcernException) + ex.writeConcernError.code == -1 + } + + def 'should use the ConnectionSource readPreference'() { + given: + def dbName = 'db' + def command = new BsonDocument('fakeCommandName', BsonNull.VALUE) + def commandCreator = { serverDescription, connectionDescription -> command } + def decoder = Stub(Decoder) + def function = Stub(CommandReadTransformer) + def connection = Mock(Connection) + def connectionSource = Stub(ConnectionSource) { + getConnection() >> connection + getReadPreference() >> readPreference + } + def readBinding = Stub(ReadBinding) { + getReadConnectionSource() >> connectionSource + getServerApi() >> null + } + def connectionDescription = Stub(ConnectionDescription) + + when: + executeRetryableRead(readBinding, dbName, commandCreator, decoder, function, false) + + then: + _ * connection.getDescription() >> connectionDescription + 1 * connection.command(dbName, command, _, readPreference, decoder, readBinding) >> new BsonDocument() + 1 * connection.release() + + where: + readPreference << [primary(), ReadPreference.secondary()] + } + +}