Skip to content

Commit

Permalink
Reorganized Async & Sync OperationHelpers (#1169)
Browse files Browse the repository at this point in the history
Normalized some naming conventions (mainly async oddities)
Removed any unused code

JAVA-5087
  • Loading branch information
rozza authored Aug 7, 2023
1 parent e4f39e1 commit 869610d
Show file tree
Hide file tree
Showing 37 changed files with 1,262 additions and 1,132 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;

import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;

/**
* An operation that aborts a transaction.
*
Expand All @@ -44,8 +46,8 @@ protected String getCommandName() {
}

@Override
CommandOperationHelper.CommandCreator getCommandCreator() {
CommandOperationHelper.CommandCreator creator = super.getCommandCreator();
CommandCreator getCommandCreator() {
CommandCreator creator = super.getCommandCreator();
if (recoveryToken != null) {
return (serverDescription, connectionDescription) -> creator.create(serverDescription, connectionDescription).append("recoveryToken", recoveryToken);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,6 @@
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.client.model.AggregationLevel;
import com.mongodb.internal.connection.QueryResult;
import com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformer;
import com.mongodb.internal.operation.CommandOperationHelper.CommandReadTransformerAsync;
import com.mongodb.internal.session.SessionContext;
import com.mongodb.lang.Nullable;
import org.bson.BsonArray;
Expand All @@ -43,14 +40,18 @@
import java.util.List;
import java.util.concurrent.TimeUnit;

import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead;
import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.cursorDocumentToQueryResult;
import static com.mongodb.internal.operation.OperationReadConcernHelper.appendReadConcernToCommand;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;

class AggregateOperationImpl<T> implements AsyncReadOperation<AsyncBatchCursor<T>>, ReadOperation<BatchCursor<T>> {
private static final String RESULT = "result";
Expand Down Expand Up @@ -196,8 +197,9 @@ public BatchCursor<T> execute(final ReadBinding binding) {
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback<AsyncBatchCursor<T>> callback) {
SingleResultCallback<AsyncBatchCursor<T>> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads, errHandlingCallback);
executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
CommandResultDocumentCodec.create(this.decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads,
errHandlingCallback);
}

private CommandCreator getCommandCreator(final SessionContext sessionContext) {
Expand Down Expand Up @@ -238,10 +240,11 @@ BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVe
}

private QueryResult<T> createQueryResult(final BsonDocument result, final ConnectionDescription description) {
assertNotNull(result);
return cursorDocumentToQueryResult(result.getDocument(CURSOR), description.getServerAddress());
}

private CommandReadTransformer<BsonDocument, AggregateResponseBatchCursor<T>> transformer() {
private CommandReadTransformer<BsonDocument, QueryBatchCursor<T>> transformer() {
return (result, source, connection) -> {
QueryResult<T> queryResult = createQueryResult(result, connection.getDescription());
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,9 @@

import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableRead;
import static com.mongodb.internal.operation.CommandOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.ServerVersionHelper.FIVE_DOT_ZERO_WIRE_VERSION;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand;
import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.assertNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncReadConnectionSource;
import static com.mongodb.internal.operation.ChangeStreamBatchCursor.convertAndProduceLastId;
import static com.mongodb.internal.operation.ChangeStreamBatchCursorHelper.isResumableError;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.withAsyncReadConnection;
import static java.lang.String.format;

final class AsyncChangeStreamBatchCursor<T> implements AsyncAggregateResponseBatchCursor<T> {
Expand Down Expand Up @@ -211,7 +211,7 @@ private void resumeableOperation(final AsyncBlock asyncBlock, final SingleResult

private void retryOperation(final AsyncBlock asyncBlock, final SingleResultCallback<List<T>> callback,
final boolean tryNext) {
withAsyncReadConnection(binding, (source, t) -> {
withAsyncReadConnectionSource(binding, (source, t) -> {
if (t != null) {
callback.onResult(null, t);
} else {
Expand Down
Loading

0 comments on commit 869610d

Please sign in to comment.